# from_kafka

Receives events from an Apache Kafka topic.

```tql
from_kafka topic:string, [count=int, exit=bool, offset=int|string, options=record,
           aws_iam=record, aws_region=string, batch_size=int]
```

## Description

The `from_kafka` operator consumes messages from a Kafka topic and produces events containing the message payload as a string field.

The implementation uses the official [librdkafka](https://github.com/confluentinc/librdkafka) from Confluent and supports all [configuration options](https://github.com/confluentinc/librdkafka/blob/master/CONFIGURATION.md). You can specify them via `options` parameter as `{key: value, ...}`.

The operator injects the following default librdkafka configuration values in case no configuration file is present, or when the configuration does not include them:

* `bootstrap.servers`: `localhost`
* `client.id`: `tenzir`
* `group.id`: `tenzir`
* `enable.auto.commit`: `false` (This option cannot be changed)

Each consumed message is produced as an event with the following schema:

```tql
{
  message: string
}
```

### `topic: string`

The Kafka topic to consume from.

If the topic starts with `^`, `from_kafka` treats it as a regular expression and subscribes to all matching topics. The regular expression must match the complete topic name, so use patterns like `^tenant-.*\.alerts$`.

### `count = int (optional)`

Exit successfully after having consumed `count` messages.

### `exit = bool (optional)`

Exit successfully after having received the last message from all partitions. This option isn’t supported with regular expression topic subscriptions.

Without this option, the operator waits for new messages after consuming the last one.

### `offset = int|string (optional)`

The offset to start consuming from. Possible values are:

* `"beginning"`: first offset
* `"end"`: last offset
* `"stored"`: stored offset
* `<value>`: absolute offset
* `-<value>`: relative offset from end

The default is `"stored"`.

### `options = record (optional)`

A record of key-value configuration options for [librdkafka](https://github.com/confluentinc/librdkafka), e.g., `{"auto.offset.reset" : "earliest", "enable.partition.eof": true}`.

The `from_kafka` operator passes the key-value pairs directly to [librdkafka](https://github.com/confluentinc/librdkafka). Consult the list of available [configuration options](https://github.com/confluentinc/librdkafka/blob/master/CONFIGURATION.md) to configure Kafka according to your needs.

We recommend factoring these options into the plugin-specific `kafka.yaml` so that they are independent of the `from_kafka` arguments.

### `batch_size = int (optional)`

The number of messages to accumulate before emitting a batch. The operator commits offsets after each batch to improve throughput.

Defaults to `10k`.

## Amazon MSK

The operator supports [Amazon MSK](https://preview.docs.tenzir.com/407/integrations/amazon/msk.md) with IAM authentication.

### `aws_iam = record (optional)`

Configures explicit AWS credentials or IAM role assumption. If not specified, the operator uses the AWS SDK's default credential chain.

```tql
{
  region: string,            // AWS region for API requests.
  access_key_id: string,     // AWS access key ID.
  secret_access_key: string, // AWS secret access key.
  session_token: string,     // session token for temporary credentials.
  assume_role: string,       // ARN of IAM role to assume.
  session_name: string,      // session name for role assumption.
  external_id: string,       // external ID for role assumption.
  web_identity: record,      // OIDC web identity token configuration.
}
```

See [AWS Authentication](https://preview.docs.tenzir.com/407/reference/aws-authentication.md) for a description of every field, the default credential chain, web identity configuration, and local authentication with the AWS CLI.

### `aws_region = string (optional)`

The AWS region used to construct the MSK authentication URL. Required when connecting to MSK with IAM authentication.

### Consume from MSK using AWS IAM authentication

```tql
from_kafka "security-logs",
  options={"bootstrap.servers": "my-cluster.kafka.us-east-1.amazonaws.com:9098"},
  aws_iam={region: "us-east-1"}
```

## Examples

### Consume JSON messages and parse them

```tql
from_kafka "logs"
message = message.parse_json()
```

### Consume 100 messages starting from the beginning

```tql
from_kafka "events", count=100, offset="beginning"
```

### Consume messages and exit when caught up

```tql
from_kafka "alerts", exit=true
```

### Consume from topics that match a regular expression

```tql
from_kafka "^tenant-.*\\.alerts$", offset="beginning"
```

## See Also

* [`to_kafka`](https://preview.docs.tenzir.com/407/reference/operators/to_kafka.md)
* [Tenzir v6 Migration](https://preview.docs.tenzir.com/407/guides/tenzir-v6-migration.md)
* [Kafka](https://preview.docs.tenzir.com/407/integrations/kafka.md)