diff --git a/src/content/docs/guides/collecting/index.mdx b/src/content/docs/guides/collecting/index.mdx
index c2b683232..6af9ec146 100644
--- a/src/content/docs/guides/collecting/index.mdx
+++ b/src/content/docs/guides/collecting/index.mdx
@@ -61,13 +61,20 @@ pagination patterns and advanced configurations.
### Message brokers
-Subscribe to topics or queues from Apache Kafka, AMQP, Amazon SQS, and Google
-Cloud Pub/Sub:
+Subscribe to topics or queues from Apache Kafka, NATS, AMQP, Amazon SQS, and
+Google Cloud Pub/Sub:
```tql
from_kafka "security-events", offset="end"
```
+For NATS JetStream, consume from a subject and parse the message field:
+
+```tql
+from_nats "alerts"
+this = string(message).parse_json()
+```
+
See the [message broker guide](/guides/collecting/read-from-message-brokers)
for broker-specific configurations.
diff --git a/src/content/docs/guides/collecting/read-from-message-brokers.mdx b/src/content/docs/guides/collecting/read-from-message-brokers.mdx
index a39d2717e..6695c6e6d 100644
--- a/src/content/docs/guides/collecting/read-from-message-brokers.mdx
+++ b/src/content/docs/guides/collecting/read-from-message-brokers.mdx
@@ -4,8 +4,8 @@ title: Read from message brokers
This guide shows you how to receive events from message brokers using TQL.
You'll learn to subscribe to topics and queues from Apache Kafka (including
-Amazon MSK), AMQP-based brokers (like RabbitMQ), Amazon SQS, and Google Cloud
-Pub/Sub.
+Amazon MSK), NATS JetStream, AMQP-based brokers (like RabbitMQ), Amazon SQS,
+and Google Cloud Pub/Sub.
## Apache Kafka
@@ -63,6 +63,34 @@ from_kafka "security-logs",
aws_iam={region: "us-east-1"}
```
+## NATS JetStream
+
+nats is a messaging system for services, edge
+deployments, and cloud-native applications. Use from_nats to consume
+messages from JetStream subjects.
+
+### Consume from a subject
+
+```tql
+from_nats "alerts", durable="tenzir-alerts"
+this = string(message).parse_json()
+```
+
+The NATS server must have a JetStream stream that captures the subject you
+consume from.
+
+### Preserve message metadata
+
+Use `metadata_field` to copy NATS metadata into events:
+
+```tql
+from_nats "alerts", metadata_field=nats
+parsed = string(message).parse_json()
+nats_subject = nats.subject
+nats_stream = nats.stream
+nats_sequence = nats.stream_sequence
+```
+
## AMQP (RabbitMQ)
amqp is supported by brokers like RabbitMQ. Use AMQP URLs with
@@ -125,6 +153,7 @@ content. Parse it to extract structured data.
- parsing/parse-string-fields
- kafka
+- nats
- amqp
- amazon/sqs
- google/cloud-pubsub
diff --git a/src/content/docs/guides/routing/send-to-destinations.mdx b/src/content/docs/guides/routing/send-to-destinations.mdx
index 5e0deb9f4..49fd73ba9 100644
--- a/src/content/docs/guides/routing/send-to-destinations.mdx
+++ b/src/content/docs/guides/routing/send-to-destinations.mdx
@@ -13,7 +13,8 @@ operators accept expressions for flexible serialization.
### Message brokers
-Send events to message brokers like kafka.
+Send events to message brokers like kafka and
+nats.
Send to Kafka with automatic JSON formatting:
@@ -32,6 +33,16 @@ to_kafka "events", message=this.print_json()
The `message` parameter accepts any expression that evaluates to a string or
blob.
+Send to NATS JetStream:
+
+```tql
+subscribe "security-events"
+to_nats "alerts"
+```
+
+The NATS server must have a JetStream stream that captures the subject you
+publish to.
+
### Analytics platforms
Send data to platforms like splunk,
@@ -114,10 +125,10 @@ Destination operators use expressions for flexible message formatting:
### Serialize the entire event
-Serialize as JSON (the default for most operators):
+Use the default event serialization:
```tql
-to_kafka "events", message=this.print_json()
+to_kafka "events"
```
Serialize as compact JSON without nulls:
@@ -145,7 +156,7 @@ to_kafka "metrics", message=f"{host}: {metric_name}={value}"
Route events to different destinations based on content:
```tql
-to_kafka f"events.{event_type}", message=this.print_json()
+to_kafka f"events.{event_type}"
```
## See also
diff --git a/src/content/docs/integrations/index.mdx b/src/content/docs/integrations/index.mdx
index 251fc4867..a4c91f354 100644
--- a/src/content/docs/integrations/index.mdx
+++ b/src/content/docs/integrations/index.mdx
@@ -22,7 +22,7 @@ Core integrations are native connectors to the ecosystem, enabling
communication over numerous protocols and APIs:
- **Cloud storage**: amazon/s3, google/cloud-storage, microsoft/azure-blob-storage
-- **Message queues**: kafka, amazon/sqs, amqp
+- **Message queues**: kafka, nats, amazon/sqs, amqp
- **Databases**: snowflake, clickhouse, mysql
- **Network protocols**: tcp, udp, http, syslog
diff --git a/src/content/docs/integrations/kafka.mdx b/src/content/docs/integrations/kafka.mdx
index bb05b1359..088650004 100644
--- a/src/content/docs/integrations/kafka.mdx
+++ b/src/content/docs/integrations/kafka.mdx
@@ -44,4 +44,4 @@ to_kafka "topic"
You can control the message encoding with the `message` argument in
to_kafka that defaults to
-`this.print_json()`.
+`this.print_ndjson()`.
diff --git a/src/content/docs/integrations/nats.mdx b/src/content/docs/integrations/nats.mdx
new file mode 100644
index 000000000..f45dff6ec
--- /dev/null
+++ b/src/content/docs/integrations/nats.mdx
@@ -0,0 +1,49 @@
+---
+title: NATS
+---
+
+[NATS](https://nats.io/) is a messaging system for services, edge deployments,
+and cloud-native applications. Tenzir integrates with NATS JetStream to consume
+messages from subjects and publish events back to subjects.
+
+Use from_nats to consume JetStream messages and to_nats to
+publish messages.
+
+## Examples
+
+### Consume JSON messages from a subject
+
+```tql
+from_nats "alerts", durable="tenzir-alerts"
+this = string(message).parse_json()
+```
+
+The NATS server must have a JetStream stream that captures the subject you
+consume from.
+
+### Publish events to a subject
+
+```tql
+subscribe "alerts"
+to_nats "alerts"
+```
+
+By default, to_nats serializes each event with
+`this.print_ndjson()`.
+
+### Connect to a secured NATS server
+
+```tql
+from_nats "alerts",
+ url="tls://nats.example.com:4222",
+ auth={token: secret("NATS_TOKEN")},
+ tls={}
+this = string(message).parse_json()
+```
+
+## See Also
+
+- from_nats
+- to_nats
+- collecting/read-from-message-brokers
+- routing/send-to-destinations
diff --git a/src/content/docs/reference/operators.mdx b/src/content/docs/reference/operators.mdx
index 6f821338f..85a933cce 100644
--- a/src/content/docs/reference/operators.mdx
+++ b/src/content/docs/reference/operators.mdx
@@ -383,6 +383,10 @@ operators:
description: 'Reads events from a MySQL database.'
example: 'from_mysql table="users", host="db.example.com", database="mydb"'
path: 'reference/operators/from_mysql'
+ - name: 'from_nats'
+ description: 'Consumes messages from a NATS JetStream subject.'
+ example: 'from_nats "alerts"'
+ path: 'reference/operators/from_nats'
- name: 'from_nic'
description: 'Captures packets from a network interface and outputs events.'
example: 'from_nic "eth0"'
@@ -753,8 +757,12 @@ operators:
path: 'reference/operators/to_hive'
- name: 'to_kafka'
description: 'Sends messages to an Apache Kafka topic.'
- example: 'to_kafka "topic", message=this.print_json()'
+ example: 'to_kafka "topic"'
path: 'reference/operators/to_kafka'
+ - name: 'to_nats'
+ description: 'Publishes messages to a NATS JetStream subject.'
+ example: 'to_nats "alerts"'
+ path: 'reference/operators/to_nats'
- name: 'to_opensearch'
description: 'Sends events to an OpenSearch-compatible Bulk API.'
example: 'to_opensearch "localhost:9200", …'
@@ -2261,6 +2269,14 @@ from_mysql table="users", host="db.example.com", database="mydb"
+
+
+```tql
+from_nats "alerts"
+```
+
+
+
```tql
@@ -2609,7 +2625,15 @@ to_hive "s3://…", partition_by=[x]
```tql
-to_kafka "topic", message=this.print_json()
+to_kafka "topic"
+```
+
+
+
+
+
+```tql
+to_nats "alerts"
```
diff --git a/src/content/docs/reference/operators/from_nats.mdx b/src/content/docs/reference/operators/from_nats.mdx
new file mode 100644
index 000000000..bde0079a8
--- /dev/null
+++ b/src/content/docs/reference/operators/from_nats.mdx
@@ -0,0 +1,143 @@
+---
+title: from_nats
+category: Inputs/Events
+example: 'from_nats "alerts"'
+---
+
+import TLSOptions from '@partials/operators/TLSOptions.mdx';
+
+Consumes messages from a NATS JetStream subject.
+
+```tql
+from_nats subject:string, [url=secret, durable=string, count=int, tls=record,
+ auth=record, metadata_field=field, _batch_size=int,
+ _queue_capacity=int, _batch_timeout=duration]
+```
+
+## Description
+
+The `from_nats` operator consumes messages from a NATS JetStream subject. The
+operator produces one event per NATS message with a `message` field of type
+`blob`. It acknowledges messages after the events were pushed downstream.
+
+The NATS server must have a JetStream stream that captures the subject you
+consume from. The operator uses the default URL `nats://localhost:4222` unless
+you provide `url` or configure `plugins.nats.url`.
+
+Durable consumers must use explicit acknowledgments. If `from_nats` creates the
+durable consumer, it configures the consumer with explicit acknowledgments. If a
+durable consumer already exists with a different acknowledgment policy, the
+operator exits with an error.
+
+### `subject: string`
+
+The NATS subject to consume from.
+
+### `url = secret (optional)`
+
+The NATS server URL.
+
+If the URL has no scheme, Tenzir uses `nats://` by default or `tls://` when TLS
+is enabled. Use `nats://`, `tls://`, `ws://`, or `wss://` to select a specific
+transport.
+
+### `durable = string (optional)`
+
+The durable consumer name to use for the JetStream subscription.
+
+### `count = int (optional)`
+
+Exit successfully after consuming `count` messages.
+
+### `metadata_field = field (optional)`
+
+The field that receives a record with NATS message metadata.
+
+The metadata record contains these fields:
+
+| Field | Type | Description |
+| :------------------ | :------------- | :----------------------------------------------- |
+| `subject` | `string` | The message subject. |
+| `reply` | `string` | The reply subject, or `null` when absent. |
+| `headers` | `record` | NATS headers as `list` values. |
+| `stream` | `string` | The JetStream stream name, or `null` when absent. |
+| `consumer` | `string` | The JetStream consumer name, or `null` when absent. |
+| `stream_sequence` | `uint64` | The stream sequence number. |
+| `consumer_sequence` | `uint64` | The consumer sequence number. |
+| `num_delivered` | `uint64` | The message delivery count. |
+| `num_pending` | `uint64` | The number of pending messages for the consumer. |
+| `timestamp` | `time` | The JetStream message timestamp. |
+
+### `auth = record (optional)`
+
+Authentication settings for the NATS connection. Each value can be a string or a
+secret.
+
+Supported authentication records are:
+
+- `{token: secret("NATS_TOKEN")}` for token authentication.
+- `{user: "alice", password: secret("NATS_PASSWORD")}` for user/password
+ authentication.
+- `{credentials: "/path/to/user.creds"}` for NATS credentials files.
+- `{credentials: "/path/to/user.creds", seed: "/path/to/user.nk"}` for
+ credentials files with a separate seed file.
+- `{credentials_memory: secret("NATS_CREDS")}` for credentials content stored in
+ a secret.
+
+
+
+NATS uses the standard Tenzir `tls` record. The nats.c library doesn't expose a
+minimum TLS version setting, so `tls.min_version` is accepted for record
+compatibility but ignored with a warning.
+
+### `_batch_size = int (optional)`
+
+The maximum number of messages to emit in one batch.
+
+### `_queue_capacity = int (optional)`
+
+The maximum number of received messages to queue before applying backpressure.
+
+### `_batch_timeout = duration (optional)`
+
+The maximum time to wait before emitting a partial batch.
+
+## Examples
+
+### Consume messages
+
+```tql
+from_nats "alerts"
+```
+
+### Consume JSON messages
+
+```tql
+from_nats "alerts"
+this = string(message).parse_json()
+```
+
+### Collect NATS metadata
+
+```tql
+from_nats "alerts", durable="tenzir-alerts", metadata_field=nats
+parsed = string(message).parse_json()
+nats_subject = nats.subject
+nats_stream_sequence = nats.stream_sequence
+```
+
+### Connect with token authentication and TLS
+
+```tql
+from_nats "alerts",
+ url="tls://nats.example.com:4222",
+ auth={token: secret("NATS_TOKEN")},
+ tls={}
+this = string(message).parse_json()
+```
+
+## See Also
+
+- to_nats
+- collecting/read-from-message-brokers
+- nats
diff --git a/src/content/docs/reference/operators/to_kafka.mdx b/src/content/docs/reference/operators/to_kafka.mdx
index 1ef8936dd..cdac04ef8 100644
--- a/src/content/docs/reference/operators/to_kafka.mdx
+++ b/src/content/docs/reference/operators/to_kafka.mdx
@@ -1,7 +1,7 @@
---
title: to_kafka
category: Outputs/Events
-example: 'to_kafka "topic", message=this.print_json()'
+example: 'to_kafka "topic"'
---
import AWSIAMOptions from '@partials/operators/AWSIAMOptions.mdx';
@@ -39,7 +39,7 @@ The Kafka topic to send messages to.
An expression that evaluates to the message content for each row.
-Defaults to `this.print_json()` when not specified.
+Defaults to `this.print_ndjson()` when not specified.
### `key = string (optional)`
@@ -83,15 +83,10 @@ to_kafka "events"
This pipeline subscribes to security alerts, filters for high-severity events,
selects relevant fields, and sends them to Kafka as JSON. Each event is
-automatically formatted using `this.print_json()`, producing messages like:
+automatically formatted using `this.print_ndjson()`, producing messages like:
```json
-{
- "timestamp": "2024-03-15T10:30:00.000000",
- "source_ip": "192.168.1.100",
- "alert_type": "brute_force",
- "details": "Multiple failed login attempts detected"
-}
+{"timestamp":"2024-03-15T10:30:00.000000","source_ip":"192.168.1.100","alert_type":"brute_force","details":"Multiple failed login attempts detected"}
```
### Send JSON-formatted events with explicit message
diff --git a/src/content/docs/reference/operators/to_nats.mdx b/src/content/docs/reference/operators/to_nats.mdx
new file mode 100644
index 000000000..233235eb2
--- /dev/null
+++ b/src/content/docs/reference/operators/to_nats.mdx
@@ -0,0 +1,108 @@
+---
+title: to_nats
+category: Outputs/Events
+example: 'to_nats "alerts"'
+---
+
+import TLSOptions from '@partials/operators/TLSOptions.mdx';
+
+Publishes messages to a NATS JetStream subject.
+
+```tql
+to_nats subject:string, [message=blob|string, headers=record, url=secret,
+ tls=record, auth=record, _max_pending=int]
+```
+
+## Description
+
+The `to_nats` operator publishes one NATS JetStream message per input event. The
+NATS server must have a JetStream stream that captures the target subject.
+
+The operator uses the default URL `nats://localhost:4222` unless you provide
+`url` or configure `plugins.nats.url`.
+
+### `subject: string`
+
+The NATS subject to publish to.
+
+### `message = blob|string (optional)`
+
+An expression that evaluates to the message payload for each row.
+
+Defaults to `this.print_ndjson()` when not specified.
+
+### `headers = record (optional)`
+
+An expression that evaluates to a record of NATS headers for each row. Header
+values must be strings or lists of strings.
+
+### `url = secret (optional)`
+
+The NATS server URL.
+
+If the URL has no scheme, Tenzir uses `nats://` by default or `tls://` when TLS
+is enabled. Use `nats://`, `tls://`, `ws://`, or `wss://` to select a specific
+transport.
+
+### `auth = record (optional)`
+
+Authentication settings for the NATS connection. Each value can be a string or a
+secret.
+
+Supported authentication records are:
+
+- `{token: secret("NATS_TOKEN")}` for token authentication.
+- `{user: "alice", password: secret("NATS_PASSWORD")}` for user/password
+ authentication.
+- `{credentials: "/path/to/user.creds"}` for NATS credentials files.
+- `{credentials: "/path/to/user.creds", seed: "/path/to/user.nk"}` for
+ credentials files with a separate seed file.
+- `{credentials_memory: secret("NATS_CREDS")}` for credentials content stored in
+ a secret.
+
+
+
+NATS uses the standard Tenzir `tls` record. The nats.c library does not expose a
+minimum TLS version setting, so `tls.min_version` is accepted for record
+compatibility but ignored with a warning.
+
+## Examples
+
+### Publish a JSON event
+
+```tql
+from {
+ severity: "high",
+ alert_type: "suspicious-login",
+}
+to_nats "alerts"
+```
+
+### Publish headers
+
+```tql
+from {
+ message: "hello",
+ headers: {
+ source: "tenzir",
+ tags: ["demo", "nats"],
+ },
+}
+to_nats "alerts", message=message, headers=headers
+```
+
+### Connect with token authentication and TLS
+
+```tql
+subscribe "alerts"
+to_nats "alerts",
+ url="tls://nats.example.com:4222",
+ auth={token: secret("NATS_TOKEN")},
+ tls={}
+```
+
+## See Also
+
+- from_nats
+- routing/send-to-destinations
+- nats
diff --git a/src/sidebar.ts b/src/sidebar.ts
index 5bbaefa7e..ac1718484 100644
--- a/src/sidebar.ts
+++ b/src/sidebar.ts
@@ -365,6 +365,7 @@ export const integrations = [
"integrations/amqp",
"integrations/fluent-bit",
"integrations/kafka",
+ "integrations/nats",
"integrations/zeromq",
],
"integration",