Skip to content

This guide shows you how to map events to Elastic Common Schema (ECS) in TQL. You’ll learn how to populate required ECS fields, choose event categorization values, map common network fieldsets, preserve source-specific details, and prepare records for Elasticsearch-compatible destinations.

The TQL examples in this guide use nested records that serialize to ECS field paths. For example, source.ip in ECS becomes source: {ip: ...} in TQL.

Install the tenzir-ecs skill when you want an agent to help with ECS schema decisions. See Use agent skills for installation and usage examples.

Ask the agent to choose event.kind, event.category, and event.type values before it maps fieldsets. ECS treats event.category and event.type as lists, and values such as allowed or denied belong in event.type or event.action, not automatically in event.outcome.

Start by setting the fields that define the document contract and search behavior:

  • @timestamp: The time when the event originated.
  • ecs.version: The ECS version used for the mapping.
  • event.kind: The high-level kind, such as event, alert, metric, or pipeline_error.
  • event.category: The broad category list, such as network.
  • event.type: The subtype list, such as connection, allowed, or denied.

For a firewall connection event, choose event.kind: event, event.category: [network], and an event.type list that includes connection plus the normalized disposition when the source records one.

The following example shows a package mapper that maps a parsed firewall connection event to ECS. It keeps all mapping work inside the event argument, puts the source data under fw, sets ECS categorization fields, maps network participants, normalizes the transport protocol, and preserves unmapped residue under a custom namespace.

---
args:
named:
- name: event
description: The field that holds the event to map.
type: field
default: this
---
$event = {...$event, fw: $event, ecs: {}}
let $event_types = {
allow: ["connection", "allowed"],
allowed: ["connection", "allowed"],
block: ["connection", "denied"],
blocked: ["connection", "denied"],
deny: ["connection", "denied"],
denied: ["connection", "denied"],
drop: ["connection", "denied"],
dropped: ["connection", "denied"],
}
$event.ecs["@timestamp"] = move $event.fw.ts
$event.ecs.ecs.version = "9.4.0"
$event.ecs.event.kind = "event"
$event.ecs.event.category = ["network"]
$event.ecs.event.type = $event_types[$event.fw.action.to_lower()]? else ["connection"]
$event.ecs.event.action = (move $event.fw.action).to_lower()
$event.ecs.source.ip = move $event.fw.src_ip
$event.ecs.source.port = move $event.fw.src_port
$event.ecs.source.bytes = move $event.fw.bytes_out
$event.ecs.destination.ip = move $event.fw.dst_ip
$event.ecs.destination.port = move $event.fw.dst_port
$event.ecs.destination.bytes = move $event.fw.bytes_in
$event.ecs.network.transport = (move $event.fw.proto).to_lower()
$event.ecs.network.bytes = $event.ecs.source.bytes + $event.ecs.destination.bytes
$event.ecs.observer.hostname = move $event.fw.device
$event.ecs.observer.vendor = "Example Networks"
$event.ecs.observer.product = "Example Firewall"
$event = {...$event.ecs, example: {unmapped: $event.fw}}

A call with a firewall event such as {ts: 2024-01-15T10:30:45Z, action: "allowed", src_ip: "10.0.0.5", src_port: 51544, dst_ip: "203.0.113.10", dst_port: 443, proto: "tcp", bytes_out: 1280, bytes_in: 8192, device: "edge-fw-01"} produces an ECS event like this:

{
"@timestamp": 2024-01-15T10:30:45Z,
ecs: {
version: "9.4.0",
},
event: {
kind: "event",
category: [
"network",
],
type: [
"connection",
"allowed",
],
action: "allowed",
},
source: {
ip: "10.0.0.5",
port: 51544,
bytes: 1280,
},
destination: {
ip: "203.0.113.10",
port: 443,
bytes: 8192,
},
network: {
transport: "tcp",
bytes: 9472,
},
observer: {
hostname: "edge-fw-01",
vendor: "Example Networks",
product: "Example Firewall",
},
example: {
unmapped: {},
},
}

Use the same structure for larger mappings:

  • Use the event scope: Package mappers accept a named event field argument with default: this, create source and target namespaces with an initial spread, and mutate only fields below $event.
  • Keep a source namespace: Keep the parsed event under a short namespace such as fw, dns, edr, or event before you create ECS fields.
  • Populate required fields first: Set @timestamp and ecs.version before you map event-specific fieldsets.
  • Choose categorization before fields: Let event.kind, event.category, and event.type drive which fieldsets you populate.
  • Use role-specific fieldsets: Use source and destination for packet or flow direction, and use client and server when the protocol role matters.
  • Normalize categorical values explicitly: Map source strings to ECS categorization values such as network, connection, allowed, or denied.
  • Preserve custom data deliberately: Put source-specific details that don’t have an ECS target into a custom namespace, not into a field that ECS already defines for another purpose.

Send ECS to Elastic-compatible destinations

Section titled “Send ECS to Elastic-compatible destinations”

After you map events to ECS, send them to Elasticsearch or OpenSearch with the Bulk API operators:

my_source::ecs::map
to_elasticsearch "https://elasticsearch.example.com:9200",
action="index",
index="ecs-events"

If the parsed source event lives in a nested field, pass that field explicitly and promote the mapped ECS document before indexing:

my_source::ecs::map event=parsed
this = parsed
to_elasticsearch "https://elasticsearch.example.com:9200",
action="index",
index="ecs-events"

Use to_opensearch instead when the destination is OpenSearch.

Last updated: