Skip to content

Consumes messages from a NATS JetStream subject.

from_nats subject:string, [url=secret, durable=string, count=int, tls=record,
auth=record, _batch_size=int, _queue_capacity=int] {}

The from_nats operator consumes messages from a NATS JetStream subject. The operator passes each message payload as bytes to the subpipeline, then acknowledges the message after the subpipeline finishes successfully.

The NATS server must have a JetStream stream that captures the subject you consume from. The operator uses the default URL nats://localhost:4222 unless you provide url or configure plugins.nats.url.

The NATS subject to consume from.

The NATS server URL.

If the URL has no scheme, Tenzir uses nats:// by default or tls:// when TLS is enabled. Use nats://, tls://, ws://, or wss:// to select a specific transport.

The durable consumer name to use for the JetStream subscription.

Exit successfully after consuming count messages.

Authentication settings for the NATS connection. Each value can be a string or a secret.

Supported authentication records are:

  • {token: secret("NATS_TOKEN")} for token authentication.
  • {user: "alice", password: secret("NATS_PASSWORD")} for user/password authentication.
  • {credentials: "/path/to/user.creds"} for NATS credentials files.
  • {credentials: "/path/to/user.creds", seed: "/path/to/user.nk"} for credentials files with a separate seed file.
  • {credentials_memory: secret("NATS_CREDS")} for credentials content stored in a secret.

TLS configuration. Provide an empty record (tls={}) to enable TLS with defaults or set fields to customize it.

{
skip_peer_verification: bool, // skip certificate verification.
cacert: string, // CA bundle to verify peers.
certfile: string, // client certificate to present.
keyfile: string, // private key for the client certificate.
min_version: string, // minimum TLS version (`"1.0"`, `"1.1"`, `"1.2"`, "1.3"`).
ciphers: string, // OpenSSL cipher list string.
client_ca: string, // CA to validate client certificates.
require_client_cert, // require clients to present a certificate.
}

The client_ca and require_client_cert options are only applied for operators that accept incoming client connections, and otherwise ignored.

Any value not specified in the record will either be picked up from the configuration or if not configured will not be used by the operator.

See the Node TLS Setup guide for more details.

NATS uses the standard Tenzir tls record. The nats.c library does not expose a minimum TLS version setting, so tls.min_version is accepted for record compatibility but ignored with a warning.

The parsing subpipeline that receives each NATS message payload as bytes.

Inside the subpipeline, from_nats exposes the following metadata as dollar-variables:

VariableTypeDescription
$subjectstringThe message subject.
$replystringThe reply subject, or null when absent.
$headersrecordNATS headers as list<string> values.
$streamstringThe JetStream stream name, or null when absent.
$consumerstringThe JetStream consumer name, or null when absent.
$stream_sequenceuint64The stream sequence number.
$consumer_sequenceuint64The consumer sequence number.
$num_delivereduint64The message delivery count.
$num_pendinguint64The number of pending messages for the consumer.
$timestamptimeThe JetStream message timestamp.
from_nats "alerts" {
read_json
}
from_nats "alerts", durable="tenzir-alerts" {
read_json
nats_subject = $subject
nats_stream = $stream
nats_sequence = $stream_sequence
}
from_nats "alerts",
url="tls://nats.example.com:4222",
auth={token: secret("NATS_TOKEN")},
tls={}
{
read_json
}

Last updated: