Skip to content

Sends events to a Databricks Unity Catalog table.

to_databricks workspace=string, catalog=string, schema=string, table=string,
client_id=string, client_secret=string, warehouse_id=string,
[staging_volume=string], [partition_by=list<field>],
[sort_by=list<field>], [flush_interval=duration], [file_size=int],
[ingest_mode=string]

The to_databricks operator writes events to a managed Delta table in Databricks Unity Catalog. It stages optimized Parquet files in a Unity Catalog Volume and commits them atomically via COPY INTO, making data immediately queryable in Databricks SQL, notebooks, and downstream tools.

See the Databricks integration overview for architecture details and configuration guidance.

The URL of your Databricks workspace. This is the base URL you see when logged into Databricks.

CloudExample
AWShttps://dbc-a1b2c3d4-e5f6.cloud.databricks.com
Azurehttps://adb-1234567890123456.7.azuredatabricks.net
GCPhttps://1234567890123456.7.gcp.databricks.com

The operator derives all API endpoints from this URL automatically, including the Unity Catalog REST API (/api/2.1/unity-catalog/) and the SQL Statement API (/api/2.0/sql/statements).

The Unity Catalog catalog name. Catalogs are the top-level namespace and typically represent environments or data domains.

Common patterns include environment-based naming (prod, dev, staging) or domain-based naming (analytics, security, finance). New workspaces have a default catalog named main.

The schema (also called database) within the catalog. Schemas group related tables and control access at a finer granularity than catalogs.

For data lake architectures, consider medallion-style naming: bronze for raw ingested data, silver for cleaned and conformed data, gold for business-ready aggregates. Alternatively, use domain-specific names like network_logs, application_events, or audit_trail.

The schema must exist before ingestion. Create it with:

CREATE SCHEMA IF NOT EXISTS my_catalog.bronze;

The name of the target table. The operator creates the table if it does not exist, inferring the schema from the first batch of events.

Table naming conventions vary by use case. For raw ingestion, names often reflect the data source: zeek_conn, cloudtrail_events, firewall_logs. For normalized data, names may reflect the event type or entity: network_connections, user_authentications, file_operations.

The Application (client) ID of a Databricks service principal for OAuth machine-to-machine authentication.

Create a service principal in Account Console → User management → Service principals. The Application ID displayed after creation is your client_id.

The OAuth secret for the service principal. Generate this in the service principal’s Secrets tab. The secret is shown only once at creation time.

The service principal requires these Unity Catalog privileges:

GRANT USE CATALOG ON CATALOG my_catalog TO `my-service-principal`;
GRANT USE SCHEMA ON SCHEMA my_catalog.bronze TO `my-service-principal`;
GRANT CREATE TABLE ON SCHEMA my_catalog.bronze TO `my-service-principal`;
GRANT READ VOLUME, WRITE VOLUME ON VOLUME my_catalog.bronze.staging
TO `my-service-principal`;

The ID of the SQL Warehouse that executes COPY INTO statements.

Find this in SQL Warehouses → [Your Warehouse] → Connection details. The warehouse ID is the final segment of the HTTP Path, after /sql/1.0/warehouses/.

Serverless SQL Warehouses are recommended for ingestion workloads due to fast startup times and automatic scaling. Each COPY INTO execution consumes DBUs at your warehouse’s rate (typically $0.07–0.70/DBU depending on tier).

The Unity Catalog Volume where Parquet files are staged before being committed to the table. If not specified, the operator creates a temporary staging location.

Volumes provide managed storage within Unity Catalog:

CREATE VOLUME IF NOT EXISTS my_catalog.bronze.ingestion_staging;

Staged files are automatically removed after successful commits.

List of column names to partition by. The columns must exist in the events being written. Delta tables use Hive-style partitioning, creating a directory structure like day=2025-01-15/class_uid=4001/.

If the target table already exists, the operator queries Unity Catalog for the existing partition scheme and ignores this parameter—partition columns are immutable after table creation. Staging files are automatically aligned with the target table’s partitions for optimal COPY INTO performance.

If the table does not exist, these columns define the partition scheme for the newly created table. If omitted, creates an unpartitioned table.

To partition by derived values (e.g., daily buckets from a timestamp), add the partition column to your events before writing:

subscribe "events"
day = time.round(1d)
to_databricks
...
partition_by=[day, class_uid]

Avoid over-partitioning. Each partition should contain at least 1GB of data for optimal file sizes and query performance.

Columns to sort rows by within each output file. Sorting improves query performance by enabling data skipping: Databricks reads Parquet column statistics (min/max values per row group) to skip files that cannot contain matching rows.

When partition_by is specified, rows are always sorted by partition columns first to ensure partition alignment (each file contains data for exactly one partition). The sort_by columns are applied as secondary sort keys.

For best results, sort by columns that appear frequently in WHERE clauses:

subscribe "events"
day = time.round(1d)
to_databricks
...
partition_by=[day],
sort_by=[src_ip, dst_ip]

This produces files where queries like WHERE src_ip = '10.0.0.1' can skip entire files based on min/max statistics.

Maximum time to buffer events before committing to the table. This controls data freshness—the maximum delay before events become queryable.

Defaults to 5m. When this interval elapses, the operator commits whatever data has accumulated, even if file_size has not been reached.

IntervalData LatencyRecommended For
1m~1 minuteReal-time dashboards, alerting
5m~5 minutesGeneral analytics, SOC workflows
15m~15 minutesBatch analytics, cost optimization
1h~1 hourArchival, compliance retention

Shorter intervals improve freshness but may produce smaller files. The operator optimizes file layout regardless of flush timing.

Target size for output Parquet files.

Defaults to 1Gi. When the buffer reaches this size, the operator flushes immediately regardless of flush_interval. This produces optimally-sized files that minimize the need for later compaction via OPTIMIZE.

Databricks recommends files between 256MB and 1GB for best query performance. Larger files reduce metadata overhead and improve scan efficiency; smaller files enable finer-grained data skipping.

For high-volume streams, the file_size threshold triggers most flushes. For low-volume streams, flush_interval ensures timely commits despite smaller file sizes.

Controls table creation and append behavior:

  • create_append: Creates the table if it does not exist, otherwise appends.
  • create: Creates the table, failing if it already exists.
  • append: Appends to an existing table, failing if it does not exist.

Defaults to create_append.

When creating tables, the operator infers the schema from the first batch of events. Subsequent batches with additional fields trigger automatic schema evolution via mergeSchema. Fields present in the table but missing from events are filled with null.

The operator applies several optimizations to produce query-efficient files that minimize the need for post-write maintenance:

Partition-aligned files: Each output file contains data for exactly one partition value. This enables partition pruning—queries filtering on partition columns skip irrelevant directories without opening files.

Sorted rows: Rows are sorted by partition columns (if specified) followed by sort_by columns. Sorting produces tight min/max statistics in Parquet row groups, enabling aggressive data skipping during queries.

Optimal file sizing: The file_size parameter (default 1GB) produces files in Databricks’ recommended size range, reducing metadata overhead and the urgency of running OPTIMIZE for compaction.

Efficient encoding: The operator leverages Apache Arrow’s Parquet writer with dictionary encoding, run-length encoding, and Zstd compression for compact, fast-to-read files.

Ingest raw Zeek connection logs into a medallion-architecture data lake:

let $client_id = secret("databricks-client-id")
let $client_secret = secret("databricks-client-secret")
from_file "/var/log/zeek/conn.log", read=read_zeek_tsv
day = ts.round(1d)
to_databricks
workspace="https://adb-1234567890.azuredatabricks.net",
catalog="security",
schema="bronze",
table="zeek_conn",
client_id=$client_id,
client_secret=$client_secret,
warehouse_id="abc123def456",
partition_by=[day]

Write normalized security events with OCSF-optimized partitioning:

let $client_id = secret("databricks-client-id")
let $client_secret = secret("databricks-client-secret")
subscribe "ocsf"
day = time.round(1d)
to_databricks
workspace="https://dbc-a1b2c3d4.cloud.databricks.com",
catalog="prod",
schema="silver",
table="security_events",
client_id=$client_id,
client_secret=$client_secret,
warehouse_id="abc123def456",
staging_volume="tenzir_staging",
partition_by=[day, class_uid]

Minimize data latency for real-time security monitoring:

let $client_id = secret("databricks-client-id")
let $client_secret = secret("databricks-client-secret")
subscribe "alerts"
to_databricks
workspace="https://1234567890123456.7.gcp.databricks.com",
catalog="security",
schema="realtime",
table="high_priority_alerts",
client_id=$client_id,
client_secret=$client_secret,
warehouse_id="abc123def456",
flush_interval=1m,
file_size=256Mi

High-volume network telemetry with sorting for common query patterns:

let $client_id = secret("databricks-client-id")
let $client_secret = secret("databricks-client-secret")
subscribe "network"
day = time.round(1d)
to_databricks
workspace="https://adb-1234567890.azuredatabricks.net",
catalog="analytics",
schema="silver",
table="network_flows",
client_id=$client_id,
client_secret=$client_secret,
warehouse_id="abc123def456",
partition_by=[day],
sort_by=[src_ip, dst_ip, dst_port],
flush_interval=5m,
file_size=1Gi

Reduce DBU consumption for high-volume archival workloads:

let $client_id = secret("databricks-client-id")
let $client_secret = secret("databricks-client-secret")
export
day = time.round(1d)
to_databricks
workspace="https://dbc-a1b2c3d4-e5f6.cloud.databricks.com",
catalog="archive",
schema="compliance",
table="audit_logs",
client_id=$client_id,
client_secret=$client_secret,
warehouse_id="abc123def456",
flush_interval=15m,
file_size=1Gi,
partition_by=[day, source_type]

Last updated: