# from_amazon_kinesis

Receives records from an [Amazon Kinesis Data Streams](https://aws.amazon.com/kinesis/data-streams/) stream.

```tql
from_amazon_kinesis stream:string, [start=string|time, count=int, exit=bool,
                     records_per_call=int, poll_idle=duration,
                     aws_region=string, aws_iam=record, endpoint=string]
```

## Description

The `from_amazon_kinesis` operator reads records from the existing shards of a Kinesis data stream and emits one event per Kinesis record. All shards are read concurrently, so throughput scales with the shard count. Record order is preserved within a shard but interleaved across shards, matching the ordering guarantees of Kinesis itself.

The emitted events use the `tenzir.amazon_kinesis` schema with these fields:

| Field             | Type       | Description                                      |
| ----------------- | ---------- | ------------------------------------------------ |
| `message`         | `blob`     | The raw Kinesis record payload.                  |
| `stream`          | `string`   | The stream name.                                 |
| `shard_id`        | `string`   | The shard that contained the record.             |
| `sequence_number` | `string`   | The Kinesis sequence number.                     |
| `partition_key`   | `string`   | The record partition key.                        |
| `arrival_time`    | `time`     | The approximate time when Kinesis received it.   |
| `encryption_type` | `string`   | The server-side encryption type, when available. |
| `behind_latest`   | `duration` | The shard lag reported by Kinesis.               |

The `arrival_time` and `encryption_type` fields are optional because Kinesis only returns them when they are present on the record.

The operator lists shards during startup and discovers new shards when an existing shard closes, such as after a resharding operation. Pipeline snapshots store the next sequence number per shard, and restarts resume with `AFTER_SEQUENCE_NUMBER`. This gives at-least-once restart behavior.

The operator requires these AWS permissions:

* `kinesis:ListShards`
* `kinesis:GetShardIterator`
* `kinesis:GetRecords`

### `stream: string`

The name of the Kinesis data stream to receive records from.

### `start = string | time (optional)`

The position for the initial shard iterator when no snapshot is available.

The value can be one of:

* `"latest"`: start after the latest record
* `"trim_horizon"`: start at the oldest available record
* `<timestamp>`: start at the given timestamp

Defaults to `"latest"`.

### `count = int (optional)`

Exit successfully after emitting `count` records.

The value must be greater than zero.

### `exit = bool (optional)`

Exit successfully after the operator is caught up on all shards.

Without this option, the operator waits for new records after consuming the currently available records.

Defaults to `false`.

### `records_per_call = int (optional)`

The maximum number of records to fetch in one `GetRecords` request.

The value must be between `1` and `10000`.

Defaults to `1000`.

### `poll_idle = duration (optional)`

How long to wait after a `GetRecords` request returns no records.

The value must be non-negative and less than `5min`.

Defaults to `1s`.

### `aws_region = string (optional)`

The AWS region for reading from the stream.

If omitted, the operator uses the region from `aws_iam` when present. Otherwise, it uses the default AWS SDK region resolution.

### `endpoint = string (optional)`

A custom Kinesis endpoint URL.

If omitted, the operator uses `AWS_ENDPOINT_URL_KINESIS` when set, then `AWS_ENDPOINT_URL`, then the default AWS SDK endpoint for the region.

### `aws_iam = record (optional)`

Configures explicit AWS credentials or IAM role assumption. If not specified, the operator uses the AWS SDK's default credential chain.

```tql
{
  region: string,            // AWS region for API requests.
  access_key_id: string,     // AWS access key ID.
  secret_access_key: string, // AWS secret access key.
  session_token: string,     // session token for temporary credentials.
  assume_role: string,       // ARN of IAM role to assume.
  session_name: string,      // session name for role assumption.
  external_id: string,       // external ID for role assumption.
  web_identity: record,      // OIDC web identity token configuration.
}
```

See [AWS Authentication](https://preview.docs.tenzir.com/407/reference/aws-authentication.md) for a description of every field, the default credential chain, web identity configuration, and local authentication with the AWS CLI.

## Examples

### Read from the latest position

```tql
from_amazon_kinesis "security-events"
```

### Read from the oldest available record

```tql
from_amazon_kinesis "security-events", start="trim_horizon"
```

### Parse JSON payloads

```tql
from_amazon_kinesis "security-events", start="trim_horizon"
this = string(message).parse_json()
```

### Read a bounded number of records

```tql
from_amazon_kinesis "security-events",
  start="trim_horizon",
  count=100,
  exit=true
```

### Use explicit credentials

```tql
from_amazon_kinesis "security-events", aws_iam={
  region: "us-east-1",
  access_key_id: secret("aws-key"),
  secret_access_key: secret("aws-secret")
}
```

### Use a LocalStack endpoint

```tql
from_amazon_kinesis "security-events",
  aws_region="us-east-1",
  endpoint="http://127.0.0.1:4566"
```

## See Also

* [`to_amazon_kinesis`](https://preview.docs.tenzir.com/407/reference/operators/to_amazon_kinesis.md)
* [Read from message brokers](https://preview.docs.tenzir.com/407/guides/collecting/read-from-message-brokers.md)
* [Kinesis](https://preview.docs.tenzir.com/407/integrations/amazon/kinesis.md)