Receives messages from an AMQP queue.
from_amqp url:secret, [channel=int, exchange=str, routing_key=str, queue=str, options=record, passive=bool, durable=bool, exclusive=bool, no_auto_delete=bool, no_local=bool, ack=bool]Description
Section titled “Description”The from_amqp operator is an AMQP 0-9-1 client that
receives messages from a queue. It emits one event per AMQP message with the
schema tenzir.amqp and a message: blob field containing the message body.
url: secret
Section titled “url: secret”The URL of the AMQP server. The URL must have the following format:
amqp://[USERNAME[:PASSWORD]@]HOSTNAME[:PORT]/[VHOST]The URL is required. Values in the URL override corresponding entries from
options.
channel = int (optional)
Section titled “channel = int (optional)”The channel number to use.
The value must fit into an unsigned 16-bit integer.
Defaults to 1.
exchange = str (optional)
Section titled “exchange = str (optional)”The exchange to interact with.
Defaults to "amq.direct".
routing_key = str (optional)
Section titled “routing_key = str (optional)”The routing key to bind the queue to the exchange.
Defaults to the empty string.
queue = str (optional)
Section titled “queue = str (optional)”The name of the queue to declare and bind.
Defaults to the empty string, which lets the broker generate a queue name, such
as "amq.gen-XNTLF0FwabIn9FFKKtQHzg".
options = record (optional)
Section titled “options = record (optional)”An option record for the AMQP connection. Values must be numbers, booleans, strings, or secrets.
Available options are:
hostname: 127.0.0.1port: 5672ssl: falsevhost: /max_channels: 2047frame_size: 131072heartbeat: 0sasl_method: plainusername: guestpassword: guestpassive = bool (optional)
Section titled “passive = bool (optional)”If true, the broker replies with OK if an exchange already exists with the
same name and raises an error otherwise.
Defaults to false.
durable = bool (optional)
Section titled “durable = bool (optional)”If true, a newly created exchange is durable and remains active when the
broker restarts.
Defaults to false.
exclusive = bool (optional)
Section titled “exclusive = bool (optional)”If true, the queue is exclusive to the current connection and is deleted when
the connection closes.
Defaults to false.
no_auto_delete = bool (optional)
Section titled “no_auto_delete = bool (optional)”If true, the exchange is not deleted when all queues have finished using it.
Defaults to false.
no_local = bool (optional)
Section titled “no_local = bool (optional)”If true, the broker doesn’t send messages to the connection that published
them.
Defaults to false.
ack = bool (optional)
Section titled “ack = bool (optional)”If true, the broker expects acknowledgements for messages. If false, the
broker assumes delivery will succeed and immediately dequeues the message.
Defaults to false.
Examples
Section titled “Examples”Receive messages from an AMQP queue
Section titled “Receive messages from an AMQP queue”from_amqp "amqp://admin:pass@0.0.0.1:5672/vhost", queue="events"Parse JSON messages
Section titled “Parse JSON messages”from_amqp "amqp://admin:pass@0.0.0.1:5672/vhost", queue="events"this = string(message).parse_json()Configure connection options
Section titled “Configure connection options”from_amqp "amqp://broker/vhost", options={ username: "tenzir", password: secret("amqp-password"), heartbeat: 30,}