Skip to content

This guide shows you how to receive events from message brokers using TQL. You’ll learn to subscribe to topics and queues from Apache Kafka (including Amazon MSK), NATS JetStream, AMQP-based brokers (like RabbitMQ), Amazon SQS, and Google Cloud Pub/Sub.

Apache Kafka is a distributed message broker commonly used for high-throughput event streaming. Use from_kafka to subscribe to topics.

from_kafka "security-events"

By default, from_kafka produces events with the raw message in a message field. Parse the message content to extract structured data:

from_kafka "security-events"
this = message.parse_json()

The offset option determines where to start reading:

ValueDescription
"beginning"Start from the oldest available message
"end"Start from the newest messages only
"stored"Resume from the last committed offset
1000Start from a specific offset
-100Start 100 messages before the end
from_kafka "events", offset="beginning"

Kafka uses consumer groups to distribute messages across multiple consumers. Specify a group ID for coordinated consumption:

from_kafka "events", group_id="tenzir-ingest"

Amazon MSK is a managed Kafka service. Use the aws_iam option for IAM authentication:

from_kafka "security-logs",
options={"bootstrap.servers": "my-cluster.kafka.us-east-1.amazonaws.com:9098"},
aws_iam={region: "us-east-1"}

NATS is a messaging system for services, edge deployments, and cloud-native applications. Use from_nats to consume messages from JetStream subjects.

from_nats "alerts", durable="tenzir-alerts" {
read_json
}

The NATS server must have a JetStream stream that captures the subject you consume from.

Use dollar-variables inside the parsing subpipeline to copy NATS metadata into events:

from_nats "alerts" {
read_json
nats_subject = $subject
nats_stream = $stream
nats_sequence = $stream_sequence
}

AMQP is supported by brokers like RabbitMQ. Use AMQP URLs with from or load_amqp directly.

from "amqp://user:pass@broker:5672/vhost"

The URL structure is amqp://user:password@host:port/vhost. Configure additional options like exchange and routing key in the operator parameters.

Amazon SQS is a managed message queue. Use load_sqs or the sqs:// URL scheme.

from "sqs://my-queue" {
read_json
}

Use long polling to reduce API calls and receive messages in batches:

from "sqs://my-queue", poll_interval=5s {
read_json
}

SQS automatically deletes messages after successful receipt.

Google Cloud Pub/Sub provides managed messaging for Google Cloud. Use from_google_cloud_pubsub to subscribe.

from_google_cloud_pubsub project_id="my-project",
subscription_id="my-subscription"
parsed = message.parse_json()

The operator produces events with a message field containing the raw message content. Parse it to extract structured data.

Last updated: