Sends events to a Databricks Unity Catalog table.
to_databricks workspace=string, catalog=string, schema=string, table=string, client_id=string, client_secret=string, warehouse_id=string, [staging_volume=string], [partition_by=list<field>], [sort_by=list<field>], [flush_interval=duration], [file_size=int], [ingest_mode=string]Description
Section titled “Description”The to_databricks operator writes events to a managed Delta table in
Databricks Unity Catalog.
It stages optimized Parquet files in a Unity Catalog Volume and commits them
atomically via COPY INTO, making data immediately queryable in Databricks
SQL, notebooks, and downstream tools.
See the Databricks integration overview for architecture details and configuration guidance.
workspace = string
Section titled “workspace = string”The URL of your Databricks workspace. This is the base URL you see when logged into Databricks.
| Cloud | Example |
|---|---|
| AWS | https://dbc-a1b2c3d4-e5f6.cloud.databricks.com |
| Azure | https://adb-1234567890123456.7.azuredatabricks.net |
| GCP | https://1234567890123456.7.gcp.databricks.com |
The operator derives all API endpoints from this URL automatically, including
the Unity Catalog REST API (/api/2.1/unity-catalog/) and the SQL Statement
API (/api/2.0/sql/statements).
catalog = string
Section titled “catalog = string”The Unity Catalog catalog name. Catalogs are the top-level namespace and typically represent environments or data domains.
Common patterns include environment-based naming (prod, dev, staging) or
domain-based naming (analytics, security, finance). New workspaces have a
default catalog named main.
schema = string
Section titled “schema = string”The schema (also called database) within the catalog. Schemas group related tables and control access at a finer granularity than catalogs.
For data lake architectures, consider medallion-style naming: bronze for raw
ingested data, silver for cleaned and conformed data, gold for
business-ready aggregates. Alternatively, use domain-specific names like
network_logs, application_events, or audit_trail.
The schema must exist before ingestion. Create it with:
CREATE SCHEMA IF NOT EXISTS my_catalog.bronze;table = string
Section titled “table = string”The name of the target table. The operator creates the table if it does not exist, inferring the schema from the first batch of events.
Table naming conventions vary by use case. For raw ingestion, names often
reflect the data source: zeek_conn, cloudtrail_events, firewall_logs.
For normalized data, names may reflect the event type or entity:
network_connections, user_authentications, file_operations.
client_id = string
Section titled “client_id = string”The Application (client) ID of a Databricks service principal for OAuth machine-to-machine authentication.
Create a service principal in Account Console → User management → Service
principals. The Application ID displayed after creation is your client_id.
client_secret = string
Section titled “client_secret = string”The OAuth secret for the service principal. Generate this in the service principal’s Secrets tab. The secret is shown only once at creation time.
The service principal requires these Unity Catalog privileges:
GRANT USE CATALOG ON CATALOG my_catalog TO `my-service-principal`;GRANT USE SCHEMA ON SCHEMA my_catalog.bronze TO `my-service-principal`;GRANT CREATE TABLE ON SCHEMA my_catalog.bronze TO `my-service-principal`;GRANT READ VOLUME, WRITE VOLUME ON VOLUME my_catalog.bronze.staging TO `my-service-principal`;warehouse_id = string
Section titled “warehouse_id = string”The ID of the SQL Warehouse that executes COPY INTO statements.
Find this in SQL Warehouses → [Your Warehouse] → Connection details. The
warehouse ID is the final segment of the HTTP Path, after /sql/1.0/warehouses/.
Serverless SQL Warehouses are recommended for ingestion workloads due to fast
startup times and automatic scaling. Each COPY INTO execution consumes DBUs
at your warehouse’s rate (typically $0.07–0.70/DBU depending on tier).
staging_volume = string (optional)
Section titled “staging_volume = string (optional)”The Unity Catalog Volume where Parquet files are staged before being committed to the table. If not specified, the operator creates a temporary staging location.
Volumes provide managed storage within Unity Catalog:
CREATE VOLUME IF NOT EXISTS my_catalog.bronze.ingestion_staging;Staged files are automatically removed after successful commits.
partition_by = list<field> (optional)
Section titled “partition_by = list<field> (optional)”List of column names to partition by. The columns must exist in the events
being written. Delta tables use Hive-style partitioning, creating a directory
structure like day=2025-01-15/class_uid=4001/.
If the target table already exists, the operator queries Unity Catalog for the
existing partition scheme and ignores this parameter—partition columns are
immutable after table creation. Staging files are automatically aligned with
the target table’s partitions for optimal COPY INTO performance.
If the table does not exist, these columns define the partition scheme for the newly created table. If omitted, creates an unpartitioned table.
To partition by derived values (e.g., daily buckets from a timestamp), add the partition column to your events before writing:
subscribe "events"day = time.round(1d)to_databricks ... partition_by=[day, class_uid]Avoid over-partitioning. Each partition should contain at least 1GB of data for optimal file sizes and query performance.
sort_by = list<field> (optional)
Section titled “sort_by = list<field> (optional)”Columns to sort rows by within each output file. Sorting improves query performance by enabling data skipping: Databricks reads Parquet column statistics (min/max values per row group) to skip files that cannot contain matching rows.
When partition_by is specified, rows are always sorted by partition columns
first to ensure partition alignment (each file contains data for exactly one
partition). The sort_by columns are applied as secondary sort keys.
For best results, sort by columns that appear frequently in WHERE clauses:
subscribe "events"day = time.round(1d)to_databricks ... partition_by=[day], sort_by=[src_ip, dst_ip]This produces files where queries like WHERE src_ip = '10.0.0.1' can skip
entire files based on min/max statistics.
flush_interval = duration (optional)
Section titled “flush_interval = duration (optional)”Maximum time to buffer events before committing to the table. This controls data freshness—the maximum delay before events become queryable.
Defaults to 5m. When this interval elapses, the operator commits whatever
data has accumulated, even if file_size has not been reached.
| Interval | Data Latency | Recommended For |
|---|---|---|
1m | ~1 minute | Real-time dashboards, alerting |
5m | ~5 minutes | General analytics, SOC workflows |
15m | ~15 minutes | Batch analytics, cost optimization |
1h | ~1 hour | Archival, compliance retention |
Shorter intervals improve freshness but may produce smaller files. The operator optimizes file layout regardless of flush timing.
file_size = int (optional)
Section titled “file_size = int (optional)”Target size for output Parquet files.
Defaults to 1Gi. When the buffer reaches this size, the operator flushes
immediately regardless of flush_interval. This produces optimally-sized
files that minimize the need for later compaction via OPTIMIZE.
Databricks recommends files between 256MB and 1GB for best query performance. Larger files reduce metadata overhead and improve scan efficiency; smaller files enable finer-grained data skipping.
For high-volume streams, the file_size threshold triggers most flushes. For
low-volume streams, flush_interval ensures timely commits despite smaller
file sizes.
ingest_mode = string (optional)
Section titled “ingest_mode = string (optional)”Controls table creation and append behavior:
create_append: Creates the table if it does not exist, otherwise appends.create: Creates the table, failing if it already exists.append: Appends to an existing table, failing if it does not exist.
Defaults to create_append.
When creating tables, the operator infers the schema from the first batch of
events. Subsequent batches with additional fields trigger automatic schema
evolution via mergeSchema. Fields present in the table but missing from
events are filled with null.
Write-Time Optimizations
Section titled “Write-Time Optimizations”The operator applies several optimizations to produce query-efficient files that minimize the need for post-write maintenance:
Partition-aligned files: Each output file contains data for exactly one partition value. This enables partition pruning—queries filtering on partition columns skip irrelevant directories without opening files.
Sorted rows: Rows are sorted by partition columns (if specified) followed
by sort_by columns. Sorting produces tight min/max statistics in Parquet row
groups, enabling aggressive data skipping during queries.
Optimal file sizing: The file_size parameter (default 1GB) produces
files in Databricks’ recommended size range, reducing metadata overhead and
the urgency of running OPTIMIZE for compaction.
Efficient encoding: The operator leverages Apache Arrow’s Parquet writer with dictionary encoding, run-length encoding, and Zstd compression for compact, fast-to-read files.
Examples
Section titled “Examples”Send events to a bronze layer table
Section titled “Send events to a bronze layer table”Ingest raw Zeek connection logs into a medallion-architecture data lake:
let $client_id = secret("databricks-client-id")let $client_secret = secret("databricks-client-secret")
from_file "/var/log/zeek/conn.log", read=read_zeek_tsvday = ts.round(1d)to_databricks workspace="https://adb-1234567890.azuredatabricks.net", catalog="security", schema="bronze", table="zeek_conn", client_id=$client_id, client_secret=$client_secret, warehouse_id="abc123def456", partition_by=[day]Stream OCSF events with partitioning
Section titled “Stream OCSF events with partitioning”Write normalized security events with OCSF-optimized partitioning:
let $client_id = secret("databricks-client-id")let $client_secret = secret("databricks-client-secret")
subscribe "ocsf"day = time.round(1d)to_databricks workspace="https://dbc-a1b2c3d4.cloud.databricks.com", catalog="prod", schema="silver", table="security_events", client_id=$client_id, client_secret=$client_secret, warehouse_id="abc123def456", staging_volume="tenzir_staging", partition_by=[day, class_uid]Low-latency ingestion for alerting
Section titled “Low-latency ingestion for alerting”Minimize data latency for real-time security monitoring:
let $client_id = secret("databricks-client-id")let $client_secret = secret("databricks-client-secret")
subscribe "alerts"to_databricks workspace="https://1234567890123456.7.gcp.databricks.com", catalog="security", schema="realtime", table="high_priority_alerts", client_id=$client_id, client_secret=$client_secret, warehouse_id="abc123def456", flush_interval=1m, file_size=256MiOptimized ingestion for network analytics
Section titled “Optimized ingestion for network analytics”High-volume network telemetry with sorting for common query patterns:
let $client_id = secret("databricks-client-id")let $client_secret = secret("databricks-client-secret")
subscribe "network"day = time.round(1d)to_databricks workspace="https://adb-1234567890.azuredatabricks.net", catalog="analytics", schema="silver", table="network_flows", client_id=$client_id, client_secret=$client_secret, warehouse_id="abc123def456", partition_by=[day], sort_by=[src_ip, dst_ip, dst_port], flush_interval=5m, file_size=1GiCost-optimized batch ingestion
Section titled “Cost-optimized batch ingestion”Reduce DBU consumption for high-volume archival workloads:
let $client_id = secret("databricks-client-id")let $client_secret = secret("databricks-client-secret")
exportday = time.round(1d)to_databricks workspace="https://dbc-a1b2c3d4-e5f6.cloud.databricks.com", catalog="archive", schema="compliance", table="audit_logs", client_id=$client_id, client_secret=$client_secret, warehouse_id="abc123def456", flush_interval=15m, file_size=1Gi, partition_by=[day, source_type]