# Split and merge streams

This guide shows you how to connect pipelines using [`publish`](https://preview.docs.tenzir.com/375/375/reference/operators/publish.md) and [`subscribe`](https://preview.docs.tenzir.com/375/375/reference/operators/subscribe.md) operators. You’ll learn to split event streams for parallel processing and merge multiple sources into a single pipeline.

## How publish/subscribe works

The [`publish`](https://preview.docs.tenzir.com/375/375/reference/operators/publish.md) operator sends events to a named channel (topic) on a node. The [`subscribe`](https://preview.docs.tenzir.com/375/375/reference/operators/subscribe.md) operator receives events from that channel. Multiple subscribers can receive the same events, and multiple publishers can write to the same topic.

```tql
// Publisher pipeline
from_file "events.json"
publish "events"
```

A separate pipeline subscribes to receive these events:

```tql
// Subscriber pipeline
subscribe "events"
to_splunk "https://splunk:8088", hec_token=secret("HEC_TOKEN")
```

## Fan-out: split streams

Send the same events to multiple destinations by having multiple subscribers:

### Multiple subscribers on one topic

```tql
// Pipeline 1: Ingest and publish
from_file "/var/log/*.json", watch=10s
publish "logs"
```

One subscriber archives events to storage:

```tql
// Pipeline 2: Archive to storage
subscribe "logs"
import
```

Another forwards only high-severity events to a SIEM:

```tql
// Pipeline 3: Forward to SIEM
subscribe "logs"
where severity in ["high", "critical"]
to_splunk "https://splunk:8088", hec_token=secret("HEC_TOKEN")
```

A third sends authentication failures to a dedicated alerting channel:

```tql
// Pipeline 4: Real-time alerting
subscribe "logs"
where event_type == "auth" and outcome == "failure"
to_kafka "alerts"
```

All subscriber pipelines receive the same events independently.

### Dynamic topics

Route events to different topics based on content:

```tql
from_file "eve.json" {
  read_suricata
}
publish f"suricata.{event_type}"
```

Subscribers can then listen to specific event types:

```tql
// Only DNS events
subscribe "suricata.dns"
```

Or subscribe to alerts only:

```tql
// Only alert events
subscribe "suricata.alert"
```

## Fan-in: merge streams

Combine multiple sources into a single stream by publishing to the same topic:

```tql
// Pipeline 1: Zeek logs
from_file "/var/log/zeek/*.log", watch=10s {
  read_zeek_tsv
}
publish "network"
```

A second pipeline publishes Suricata alerts to the same topic:

```tql
// Pipeline 2: Suricata alerts
from_file "/var/log/suricata/eve.json", watch=10s {
  read_suricata
}
publish "network"
```

A third pipeline consumes the merged stream:

```tql
// Pipeline 3: Consume merged stream
subscribe "network"
import
```

The subscriber receives events from both Zeek and Suricata in a single stream.

## Subscribe to multiple topics

A single subscriber can listen to multiple topics:

```tql
subscribe "alerts", "notices", "critical"
to_kafka "all-priority-events"
```

## Shutdown guarantees

On shutdown, Tenzir drains in-flight publish/subscribe data before it stops pipelines. This guarantee is strongest when you use fixed topic names and an acyclic pipeline graph.

Avoid cyclic publish/subscribe topologies. If a pipeline publishes back into a stream that eventually feeds the same pipeline again, Tenzir can’t guarantee a fully graceful drain and may drop data during shutdown.

There is also a small probability of data loss during shutdown when using dynamic topics. In practice, this is likely to happen only for newly started nodes or newly observed topics during shutdown.

## Combining with fork

Use [`fork`](https://preview.docs.tenzir.com/375/375/reference/operators/fork.md) with [`publish`](https://preview.docs.tenzir.com/375/375/reference/operators/publish.md) to send copies of events while continuing the main pipeline:

```tql
from_file "events.json"
fork {
  publish "raw-events"
}
// Continue processing in main pipeline
where severity >= "high"
import
```

## Back pressure behavior

Subscribers propagate back pressure to publishers. If a subscribing pipeline can’t keep up, publishers slow down to match, preventing data loss.

Pipelines not visible on the overview page at [app.tenzir.com](https://app.tenzir.com) drop data instead of slowing publishers. This prevents slow ad-hoc queries from blocking production pipelines.

## See also

* [Send to destinations](https://preview.docs.tenzir.com/375/375/guides/routing/send-to-destinations.md)
* [Fan out with subpipelines](https://preview.docs.tenzir.com/375/375/guides/routing/fan-out-with-subpipelines.md)
* [Load-balance pipelines](https://preview.docs.tenzir.com/375/375/guides/routing/load-balance-pipelines.md)
* [`publish`](https://preview.docs.tenzir.com/375/375/reference/operators/publish.md)
* [`subscribe`](https://preview.docs.tenzir.com/375/375/reference/operators/subscribe.md)
* [`fork`](https://preview.docs.tenzir.com/375/375/reference/operators/fork.md)