Skip to content

Runs a subpipeline across multiple parallel workers.

parallel [jobs:int] [, route_by=any] {}

The parallel operator distributes incoming events across multiple parallel instances of a subpipeline. Each event is processed by exactly one worker.

Use this operator to parallelize CPU-intensive transformations or I/O-bound operations that would otherwise bottleneck on a single thread.

By default, events are distributed across workers using an adaptive round-robin strategy that keeps worker loads balanced. Use route_by to instead route events deterministically by a key, ensuring that all events with the same key value go to the same worker.

This operator may reorder the event stream since workers process events concurrently.

When used as a source operator (without upstream input), parallel spawns multiple independent instances of the subpipeline. This is useful for running the same source pipeline with concurrent connections.

The number of parallel workers to spawn. Must be greater than zero. Defaults to the number of available CPU cores.

An expression evaluated per event to determine which worker processes it. Events with the same route_by value are always sent to the same worker. This guarantees that related events are grouped together, which is required for stateful subpipelines like deduplicate or summarize.

Cannot be used when parallel is used as a source operator.

The subpipeline to run in parallel. The subpipeline may either:

  • Produce events as output (transformation)
  • End with a sink (void output)

When parallel is used as a source operator, the subpipeline runs as an independent source producing events or as a full pipeline ending with a sink.

The subpipeline must not produce bytes as output.

Parse raw JSON strings across multiple workers:

subscribe "raw"
parallel 4 {
parsed = data.parse_json()
}

Ensure events from the same source IP are always handled by the same worker, enabling per-source deduplication:

subscribe "events"
parallel route_by=src_ip {
deduplicate src_ip, dst_ip, dst_port
}

Send events to Google SecOps with 4 concurrent connections:

subscribe "events"
parallel 4 {
to_google_secops customer_id="…", private_key=secret("secops_key"),
client_email="…", log_type="…", log_text=raw
}

Last updated: