Skip to content

This guide shows you how to write OCSF mapping operators in TQL. You’ll learn to organize mappings by attribute groups, handle unmapped fields, and validate your output. The guide assumes you’ve already identified your target OCSF event class and profiles.

Install the tenzir-ocsf skill when you want an agent to help with OCSF schema decisions. See Use agent skills for installation and usage examples.

Ask the agent to choose the OCSF version, event class, and profiles before it maps attributes. OCSF mappings should produce the required classification and occurrence attributes first, then add primary, context, and profile-specific attributes.

This section uses Palo Alto NGFW traffic logs as a running example. The namespace follows the vendor::product:: convention, resulting in paloalto::ngfw::ocsf::map.

Use this annotated template as a starting point for creating a user-defined operator as part of a TQL package.

We recommend organizing fields by OCSF attribute groups: classification, occurrence, context, primary.

---
args:
named:
- name: event
description: The field that holds the event to map.
type: field
default: this
---
// --- Preamble ---------------------------------
// Keep all mapping work inside the explicit event scope.
$event = {...$event, panos: $event, ocsf: {}}
// --- OCSF: classification attributes ----------
@name = "ocsf.network_activity"
$event.ocsf.category_uid = 4
$event.ocsf.class_uid = 4001
$event.ocsf.activity_id = 6
$event.ocsf.severity_id = 1
$event.ocsf.type_uid = $event.ocsf.class_uid * 100 + $event.ocsf.activity_id
// --- OCSF: occurrence attributes --------------
$event.ocsf.time = move $event.panos.time_generated
$event.ocsf.start_time = move $event.panos.start
if $event.panos.elapsed != null {
$event.ocsf.end_time = $event.ocsf.start_time + $event.panos.elapsed
$event.ocsf.duration = count_milliseconds(move $event.panos.elapsed).round()
}
// --- OCSF: context attributes -----------------
// Metadata about the event source.
$event.ocsf.metadata = {
log_name: "traffic",
product: {
cpe_name: "cpe:/a:paloaltonetworks:pan-os",
name: "NGFW",
vendor_name: "Palo Alto Networks",
},
original_event_uid: move $event.panos.sessionid,
version: "1.8.0",
}
$event.ocsf.app_name = move $event.panos.app
// --- OCSF: primary attributes -----------------
// Primary attributes reflect the core semantic meaning of the event.
$event.ocsf.src_endpoint = {
ip: move $event.panos.src,
port: move $event.panos.sport,
}
$event.ocsf.dst_endpoint = {
ip: move $event.panos.dst,
port: move $event.panos.dport,
}
let $proto_nums = {tcp: 6, udp: 17, icmp: 1}
$event.ocsf.connection_info = {
protocol_name: (move $event.panos.proto).to_lower(),
}
$event.ocsf.connection_info.protocol_num = $proto_nums[$event.ocsf.connection_info.protocol_name]? else -1
// --- OCSF: profile-specific attributes --------
// Add fields for declared profiles (host, network_proxy, etc.)
// ocsf.device = {...}
// ocsf.proxy = {...}
// --- Epilogue ---------------------------------
// Return the mapped OCSF event and preserve mapping residue.
$event = {...$event.ocsf, unmapped: $event.panos}
  • Use the event scope: Mapping UDOs accept a named event field argument that defaults to this. Start with a spread such as $event = {...$event, panos: $event, ocsf: {}}, then mutate only fields under $event.
  • Use a source namespace: Keep the input under a short descriptor such as panos, zeek, okta, or the generic event. This prevents name clashes with OCSF fields and keeps the remaining source fields ready for unmapped.
  • Use move: Transfer fields with move to simultaneously assign and remove from source, for example $event.ocsf.time = move $event.panos.time_generated.
  • Only use drop for multi-use fields: When a field appears in multiple mappings, drop it after the last use. Prefer move and single assignments.
  • Keep unmapped residue: Fields left under the source namespace still need review or an intentional decision to preserve source-specific data.
  • Produce minimal OCSF: Map required identifiers, required attributes, and source-specific semantics. Don’t hand-write derived sibling fields such as activity_name, category_name, or severity; let ocsf::derive expand the minimal event before validation.
  • Validate the result: Run ocsf::derive and ocsf::cast after the mapper returns the OCSF event.

Organize OCSF mappings as a package with a dispatcher and per-event-type operators:

Put the target schema first in mapping namespaces. Source-to-OCSF packages use paloalto::ngfw::ocsf::map. Cross-schema mappings keep the same rule, for example paloalto::ngfw::asim::ocsf::map for OCSF-to-ASIM mapping.

  • Directorypaloalto/
    • Directoryoperators/
      • Directoryngfw/
        • Directoryocsf/
          • map.tql Main dispatcher
          • base.tql Fallback to OCSF base event
          • Directoryevents/
            • network.tql Traffic logs → Network Activity
            • dns.tql DNS logs → DNS Activity
            • threat.tql Threat logs → Security Finding
    • Directorytests/
      • Directoryinputs/
        • traffic.json Input shared across multiple tests
      • Directoryngfw/
        • Directoryocsf/
          • map.tql
          • map.txt
          • base.tql
          • base.txt
          • Directoryevents/
            • network.input Log sample(s)
            • network.tql Mapping for network event type
            • network.txt Mapped OCSF event(s)
            • dns.input
            • dns.tql
            • dns.txt
    • package.yaml

Your package should include one main mapping operator. This operator performs source-specific cleanup and shared OCSF setup, dispatches events based on the event type, and returns the mapped OCSF event:

---
args:
named:
- name: event
description: The field that holds the event to map.
type: field
default: this
---
$event = {...$event, panos: $event, ocsf: {}}
$event.ocsf.metadata = {
product: {
cpe_name: "cpe:/a:paloaltonetworks:pan-os",
name: "NGFW",
vendor_name: "Palo Alto Networks",
},
version: "1.8.0",
}
$event.ocsf.severity_id = 1
match $event.panos.type {
"TRAFFIC" => {
paloalto::ngfw::ocsf::events::network event=$event
}
"DNS" => {
paloalto::ngfw::ocsf::events::dns event=$event
}
"THREAT" => {
paloalto::ngfw::ocsf::events::threat event=$event
}
_ => {
paloalto::ngfw::ocsf::base event=$event
}
}
$event = {...$event.ocsf, unmapped: $event.panos}

If the parser package does not set a type field, dispatch on a different field in the log that differentiates the event types.

Create one test file per event type:

from_file env("TENZIR_INPUT") {
read_json
}
paloalto::ngfw::ocsf::map
ocsf::derive
ocsf::cast

This requires that your test file has a sibling .input file that the test framework exposes through TENZIR_INPUT. Use the reader that matches your fixture format.

When a caller parses into a nested field or wants to add raw data after mapping, pass the mapping scope explicitly and add the additional attributes after the mapper returns:

from_file env("TENZIR_INPUT") {
read_lines
}
panos = line.parse_json()
paloalto::ngfw::ocsf::map event=panos
panos.raw_data = move line
panos.raw_data_size = panos.raw_data.length_bytes()
this = panos
ocsf::derive
ocsf::cast

The ocsf::cast operator is the primary schema validation gate. It ensures that your mapping produces schema-compliant output.

Your mapping is complete once ocsf::cast no longer emits warnings.

Last updated: