Skip to content

This guide shows you how to connect pipelines using publish and subscribe operators. You’ll learn to split event streams for parallel processing and merge multiple sources into a single pipeline.

The publish operator sends events to a named channel (topic) on a node. The subscribe operator receives events from that channel. Multiple subscribers can receive the same events, and multiple publishers can write to the same topic.

// Publisher pipeline
from_file "events.json"
publish "events"

A separate pipeline subscribes to receive these events:

// Subscriber pipeline
subscribe "events"
to_splunk "https://splunk:8088", hec_token=secret("HEC_TOKEN")

Send the same events to multiple destinations by having multiple subscribers:

// Pipeline 1: Ingest and publish
from_file "/var/log/*.json", watch=10s
publish "logs"

One subscriber archives events to storage:

// Pipeline 2: Archive to storage
subscribe "logs"
import

Another forwards only high-severity events to a SIEM:

// Pipeline 3: Forward to SIEM
subscribe "logs"
where severity in ["high", "critical"]
to_splunk "https://splunk:8088", hec_token=secret("HEC_TOKEN")

A third sends authentication failures to a dedicated alerting channel:

// Pipeline 4: Real-time alerting
subscribe "logs"
where event_type == "auth" and outcome == "failure"
to_kafka "alerts"

All subscriber pipelines receive the same events independently.

Route events to different topics based on content:

from_file "eve.json" {
read_suricata
}
publish f"suricata.{event_type}"

Subscribers can then listen to specific event types:

// Only DNS events
subscribe "suricata.dns"

Or subscribe to alerts only:

// Only alert events
subscribe "suricata.alert"

Combine multiple sources into a single stream by publishing to the same topic:

// Pipeline 1: Zeek logs
from_file "/var/log/zeek/*.log", watch=10s {
read_zeek_tsv
}
publish "network"

A second pipeline publishes Suricata alerts to the same topic:

// Pipeline 2: Suricata alerts
from_file "/var/log/suricata/eve.json", watch=10s {
read_suricata
}
publish "network"

A third pipeline consumes the merged stream:

// Pipeline 3: Consume merged stream
subscribe "network"
import

The subscriber receives events from both Zeek and Suricata in a single stream.

A single subscriber can listen to multiple topics:

subscribe "alerts", "notices", "critical"
to_kafka "all-priority-events"

On shutdown, Tenzir drains in-flight publish/subscribe data before it stops pipelines. This guarantee is strongest when you use fixed topic names and an acyclic pipeline graph.

Avoid cyclic publish/subscribe topologies. If a pipeline publishes back into a stream that eventually feeds the same pipeline again, Tenzir can’t guarantee a fully graceful drain and may drop data during shutdown.

There is also a small probability of data loss during shutdown when using dynamic topics. In practice, this is likely to happen only for newly started nodes or newly observed topics during shutdown.

Use fork with publish to send copies of events while continuing the main pipeline:

from_file "events.json"
fork {
publish "raw-events"
}
// Continue processing in main pipeline
where severity >= "high"
import

Subscribers propagate back pressure to publishers. If a subscribing pipeline can’t keep up, publishers slow down to match, preventing data loss.

Pipelines not visible on the overview page at app.tenzir.com drop data instead of slowing publishers. This prevents slow ad-hoc queries from blocking production pipelines.

Last updated: