# from_kafka

Receives events from an Apache Kafka topic.

```tql
from_kafka topic:string, [count=int, exit=bool, offset=int|string, options=record,
           aws_iam=record, aws_region=string, batch_size=int]
```

## Description

The `from_kafka` operator consumes messages from a Kafka topic and produces events containing the message payload as a string field.

The implementation uses the official [librdkafka](https://github.com/confluentinc/librdkafka) from Confluent and supports all [configuration options](https://github.com/confluentinc/librdkafka/blob/master/CONFIGURATION.md). You can specify them via `options` parameter as `{key: value, ...}`.

The operator injects the following default librdkafka configuration values in case no configuration file is present, or when the configuration does not include them:

* `bootstrap.servers`: `localhost`
* `client.id`: `tenzir`
* `group.id`: `tenzir`
* `enable.auto.commit`: `false` (This option cannot be changed)

Each consumed message is produced as an event with the following schema:

```tql
{
  message: string
}
```

### `topic: string`

The Kafka topic to consume from.

### `count = int (optional)`

Exit successfully after having consumed `count` messages.

### `exit = bool (optional)`

Exit successfully after having received the last message from all partitions.

Without this option, the operator waits for new messages after consuming the last one.

### `offset = int|string (optional)`

The offset to start consuming from. Possible values are:

* `"beginning"`: first offset
* `"end"`: last offset
* `"stored"`: stored offset
* `<value>`: absolute offset
* `-<value>`: relative offset from end

The default is `"stored"`.

### `options = record (optional)`

A record of key-value configuration options for [librdkafka](https://github.com/confluentinc/librdkafka), e.g., `{"auto.offset.reset" : "earliest", "enable.partition.eof": true}`.

The `from_kafka` operator passes the key-value pairs directly to [librdkafka](https://github.com/confluentinc/librdkafka). Consult the list of available [configuration options](https://github.com/confluentinc/librdkafka/blob/master/CONFIGURATION.md) to configure Kafka according to your needs.

We recommend factoring these options into the plugin-specific `kafka.yaml` so that they are independent of the `from_kafka` arguments.

### `batch_size = int (optional)`

The number of messages to accumulate before emitting a batch. The operator commits offsets after each batch to improve throughput.

Defaults to `10k`.

## Amazon MSK

The operator supports [Amazon MSK](https://preview.docs.tenzir.com/375/375/integrations/amazon/msk.md) with IAM authentication.

### `aws_iam = record (optional)`

Configures explicit AWS credentials or IAM role assumption. If not specified, the operator uses the AWS SDK's default credential chain.

```tql
{
  region: string,            // AWS region for API requests.
  access_key_id: string,     // AWS access key ID.
  secret_access_key: string, // AWS secret access key.
  session_token: string,     // session token for temporary credentials.
  assume_role: string,       // ARN of IAM role to assume.
  session_name: string,      // session name for role assumption.
  external_id: string,       // external ID for role assumption.
  web_identity: record,      // OIDC web identity token configuration.
}
```

See [AWS Authentication](https://preview.docs.tenzir.com/375/375/reference/aws-authentication.md) for a description of every field, the default credential chain, web identity configuration, and local authentication with the AWS CLI.

### `aws_region = string (optional)`

The AWS region used to construct the MSK authentication URL. Required when connecting to MSK with IAM authentication.

### Consume from MSK using AWS IAM authentication

```tql
from_kafka "security-logs",
  options={"bootstrap.servers": "my-cluster.kafka.us-east-1.amazonaws.com:9098"},
  aws_iam={region: "us-east-1"}
```

## Examples

### Consume JSON messages and parse them

```tql
from_kafka "logs"
message = message.parse_json()
```

### Consume 100 messages starting from the beginning

```tql
from_kafka "events", count=100, offset="beginning"
```

### Consume messages and exit when caught up

```tql
from_kafka "alerts", exit=true
```

## See Also

* [`to_kafka`](https://preview.docs.tenzir.com/375/375/reference/operators/to_kafka.md)
* [Tenzir v6 Migration](https://preview.docs.tenzir.com/375/375/guides/tenzir-v6-migration.md)
* [Kafka](https://preview.docs.tenzir.com/375/375/integrations/kafka.md)