Skip to content
Open
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 9 additions & 2 deletions src/content/docs/guides/collecting/index.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -61,13 +61,20 @@ pagination patterns and advanced configurations.

### Message brokers

Subscribe to topics or queues from Apache Kafka, AMQP, Amazon SQS, and Google
Cloud Pub/Sub:
Subscribe to topics or queues from Apache Kafka, NATS, AMQP, Amazon SQS, and
Google Cloud Pub/Sub:

```tql
from_kafka "security-events", offset="end"
```

For NATS JetStream, consume from a subject and parse the message field:

```tql
from_nats "alerts"
this = string(message).parse_json()
```

See the [message broker guide](/guides/collecting/read-from-message-brokers)
for broker-specific configurations.

Expand Down
33 changes: 31 additions & 2 deletions src/content/docs/guides/collecting/read-from-message-brokers.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ title: Read from message brokers

This guide shows you how to receive events from message brokers using TQL.
You'll learn to subscribe to topics and queues from Apache Kafka (including
Amazon MSK), AMQP-based brokers (like RabbitMQ), Amazon SQS, and Google Cloud
Pub/Sub.
Amazon MSK), NATS JetStream, AMQP-based brokers (like RabbitMQ), Amazon SQS,
and Google Cloud Pub/Sub.

## Apache Kafka

Expand Down Expand Up @@ -63,6 +63,34 @@ from_kafka "security-logs",
aws_iam={region: "us-east-1"}
```

## NATS JetStream

<Integration>nats</Integration> is a messaging system for services, edge
deployments, and cloud-native applications. Use <Op>from_nats</Op> to consume
messages from JetStream subjects.

### Consume from a subject

```tql
from_nats "alerts", durable="tenzir-alerts"
this = string(message).parse_json()
```

The NATS server must have a JetStream stream that captures the subject you
consume from.

### Preserve message metadata

Use `metadata_field` to copy NATS metadata into events:

```tql
from_nats "alerts", metadata_field=nats
this = string(message).parse_json()
Comment thread
mavam marked this conversation as resolved.
Outdated
nats_subject = nats.subject
nats_stream = nats.stream
nats_sequence = nats.stream_sequence
```

## AMQP (RabbitMQ)

<Integration>amqp</Integration> is supported by brokers like RabbitMQ. Use AMQP URLs with
Expand Down Expand Up @@ -125,6 +153,7 @@ content. Parse it to extract structured data.

- <Guide>parsing/parse-string-fields</Guide>
- <Integration>kafka</Integration>
- <Integration>nats</Integration>
- <Integration>amqp</Integration>
- <Integration>amazon/sqs</Integration>
- <Integration>google/cloud-pubsub</Integration>
13 changes: 12 additions & 1 deletion src/content/docs/guides/routing/send-to-destinations.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@ operators accept expressions for flexible serialization.

### Message brokers

Send events to message brokers like <Integration>kafka</Integration>.
Send events to message brokers like <Integration>kafka</Integration> and
<Integration>nats</Integration>.

Send to Kafka with automatic JSON formatting:

Expand All @@ -32,6 +33,16 @@ to_kafka "events", message=this.print_json()
The `message` parameter accepts any expression that evaluates to a string or
blob.

Send to NATS JetStream:

```tql
subscribe "security-events"
to_nats "alerts", message=this.print_json()
```

The NATS server must have a JetStream stream that captures the subject you
publish to.

### Analytics platforms

Send data to platforms like <Integration>splunk</Integration>,
Expand Down
2 changes: 1 addition & 1 deletion src/content/docs/integrations/index.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ Core integrations are native connectors to the ecosystem, enabling
communication over numerous protocols and APIs:

- **Cloud storage**: <Integration>amazon/s3</Integration>, <Integration>google/cloud-storage</Integration>, <Integration>microsoft/azure-blob-storage</Integration>
- **Message queues**: <Integration>kafka</Integration>, <Integration>amazon/sqs</Integration>, <Integration>amqp</Integration>
- **Message queues**: <Integration>kafka</Integration>, <Integration>nats</Integration>, <Integration>amazon/sqs</Integration>, <Integration>amqp</Integration>
- **Databases**: <Integration>snowflake</Integration>, <Integration>clickhouse</Integration>, <Integration>mysql</Integration>
- **Network protocols**: <Integration>tcp</Integration>, <Integration>udp</Integration>, <Integration>http</Integration>, <Integration>syslog</Integration>

Expand Down
46 changes: 46 additions & 0 deletions src/content/docs/integrations/nats.mdx
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
---
title: NATS
---

[NATS](https://nats.io/) is a messaging system for services, edge deployments,
and cloud-native applications. Tenzir integrates with NATS JetStream to consume
messages from subjects and publish events back to subjects.

Use <Op>from_nats</Op> to consume JetStream messages and <Op>to_nats</Op> to
publish messages.

## Examples

### Consume JSON messages from a subject

```tql
from_nats "alerts", durable="tenzir-alerts"
this = string(message).parse_json()
```

The NATS server must have a JetStream stream that captures the subject you
consume from.

### Publish events to a subject

```tql
subscribe "alerts"
to_nats "alerts", message=this.print_json()
```

### Connect to a secured NATS server

```tql
from_nats "alerts",
url="tls://nats.example.com:4222",
auth={token: secret("NATS_TOKEN")},
tls={}
this = string(message).parse_json()
```

## See Also

- <Op>from_nats</Op>
- <Op>to_nats</Op>
- <Guide>collecting/read-from-message-brokers</Guide>
- <Guide>routing/send-to-destinations</Guide>
24 changes: 24 additions & 0 deletions src/content/docs/reference/operators.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -383,6 +383,10 @@ operators:
description: 'Reads events from a MySQL database.'
example: 'from_mysql table="users", host="db.example.com", database="mydb"'
path: 'reference/operators/from_mysql'
- name: 'from_nats'
description: 'Consumes messages from a NATS JetStream subject.'
example: 'from_nats "alerts"'
path: 'reference/operators/from_nats'
- name: 'from_nic'
description: 'Captures packets from a network interface and outputs events.'
example: 'from_nic "eth0"'
Expand Down Expand Up @@ -755,6 +759,10 @@ operators:
description: 'Sends messages to an Apache Kafka topic.'
example: 'to_kafka "topic", message=this.print_json()'
path: 'reference/operators/to_kafka'
- name: 'to_nats'
description: 'Publishes messages to a NATS JetStream subject.'
example: 'to_nats "alerts"'
path: 'reference/operators/to_nats'
- name: 'to_opensearch'
description: 'Sends events to an OpenSearch-compatible Bulk API.'
example: 'to_opensearch "localhost:9200", …'
Expand Down Expand Up @@ -2261,6 +2269,14 @@ from_mysql table="users", host="db.example.com", database="mydb"

</ReferenceCard>

<ReferenceCard title="from_nats" description="Consumes messages from a NATS JetStream subject." href="/reference/operators/from_nats">

```tql
from_nats "alerts"
```

</ReferenceCard>

<ReferenceCard title="from_nic" description="Captures packets from a network interface and outputs events." href="/reference/operators/from_nic">

```tql
Expand Down Expand Up @@ -2614,6 +2630,14 @@ to_kafka "topic", message=this.print_json()

</ReferenceCard>

<ReferenceCard title="to_nats" description="Publishes messages to a NATS JetStream subject." href="/reference/operators/to_nats">

```tql
to_nats "alerts"
```

</ReferenceCard>

<ReferenceCard title="to_opensearch" description="Sends events to an OpenSearch-compatible Bulk API." href="/reference/operators/to_opensearch">

```tql
Expand Down
143 changes: 143 additions & 0 deletions src/content/docs/reference/operators/from_nats.mdx
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
---
title: from_nats
category: Inputs/Events
example: 'from_nats "alerts"'
---

import TLSOptions from '@partials/operators/TLSOptions.mdx';

Consumes messages from a NATS JetStream subject.

```tql
from_nats subject:string, [url=secret, durable=string, count=int, tls=record,
auth=record, metadata_field=field, _batch_size=int,
_queue_capacity=int, _batch_timeout=duration]
```

## Description

The `from_nats` operator consumes messages from a NATS JetStream subject. The
operator produces one event per NATS message with a `message` field of type
`blob`. It acknowledges messages after the events were pushed downstream.

The NATS server must have a JetStream stream that captures the subject you
consume from. The operator uses the default URL `nats://localhost:4222` unless
you provide `url` or configure `plugins.nats.url`.

Durable consumers must use explicit acknowledgments. If `from_nats` creates the
durable consumer, it configures the consumer with explicit acknowledgments. If a
durable consumer already exists with a different acknowledgment policy, the
operator exits with an error.

### `subject: string`

The NATS subject to consume from.

### `url = secret (optional)`

The NATS server URL.

If the URL has no scheme, Tenzir uses `nats://` by default or `tls://` when TLS
is enabled. Use `nats://`, `tls://`, `ws://`, or `wss://` to select a specific
transport.

### `durable = string (optional)`

The durable consumer name to use for the JetStream subscription.

### `count = int (optional)`

Exit successfully after consuming `count` messages.

### `metadata_field = field (optional)`

The field that receives a record with NATS message metadata.

The metadata record contains these fields:

| Field | Type | Description |
| :------------------ | :------------- | :----------------------------------------------- |
| `subject` | `string` | The message subject. |
| `reply` | `string` | The reply subject, or `null` when absent. |
| `headers` | `record` | NATS headers as `list<string>` values. |
| `stream` | `string` | The JetStream stream name, or `null` when absent. |
| `consumer` | `string` | The JetStream consumer name, or `null` when absent. |
| `stream_sequence` | `uint64` | The stream sequence number. |
| `consumer_sequence` | `uint64` | The consumer sequence number. |
| `num_delivered` | `uint64` | The message delivery count. |
| `num_pending` | `uint64` | The number of pending messages for the consumer. |
| `timestamp` | `time` | The JetStream message timestamp. |

### `auth = record (optional)`

Authentication settings for the NATS connection. Each value can be a string or a
secret.

Supported authentication records are:

- `{token: secret("NATS_TOKEN")}` for token authentication.
- `{user: "alice", password: secret("NATS_PASSWORD")}` for user/password
authentication.
- `{credentials: "/path/to/user.creds"}` for NATS credentials files.
- `{credentials: "/path/to/user.creds", seed: "/path/to/user.nk"}` for
credentials files with a separate seed file.
- `{credentials_memory: secret("NATS_CREDS")}` for credentials content stored in
a secret.

<TLSOptions />

NATS uses the standard Tenzir `tls` record. The nats.c library doesn't expose a
minimum TLS version setting, so `tls.min_version` is accepted for record
compatibility but ignored with a warning.

### `_batch_size = int (optional)`

The maximum number of messages to emit in one batch.

### `_queue_capacity = int (optional)`

The maximum number of received messages to queue before applying backpressure.

### `_batch_timeout = duration (optional)`

The maximum time to wait before emitting a partial batch.

## Examples

### Consume messages

```tql
from_nats "alerts"
```

### Consume JSON messages

```tql
from_nats "alerts"
this = string(message).parse_json()
```

### Collect NATS metadata

```tql
from_nats "alerts", durable="tenzir-alerts", metadata_field=nats
parsed = string(message).parse_json()
nats_subject = nats.subject
nats_stream_sequence = nats.stream_sequence
```

### Connect with token authentication and TLS

```tql
from_nats "alerts",
url="tls://nats.example.com:4222",
auth={token: secret("NATS_TOKEN")},
tls={}
this = string(message).parse_json()
```

## See Also

- <Op>to_nats</Op>
- <Guide>collecting/read-from-message-brokers</Guide>
- <Integration>nats</Integration>
11 changes: 3 additions & 8 deletions src/content/docs/reference/operators/to_kafka.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ The Kafka topic to send messages to.

An expression that evaluates to the message content for each row.

Defaults to `this.print_json()` when not specified.
Defaults to `this.print_ndjson()` when not specified.
Comment thread
mavam marked this conversation as resolved.

### `key = string (optional)`

Expand Down Expand Up @@ -83,15 +83,10 @@ to_kafka "events"

This pipeline subscribes to security alerts, filters for high-severity events,
selects relevant fields, and sends them to Kafka as JSON. Each event is
automatically formatted using `this.print_json()`, producing messages like:
automatically formatted using `this.print_ndjson()`, producing messages like:

```json
{
"timestamp": "2024-03-15T10:30:00.000000",
"source_ip": "192.168.1.100",
"alert_type": "brute_force",
"details": "Multiple failed login attempts detected"
}
{"timestamp":"2024-03-15T10:30:00.000000","source_ip":"192.168.1.100","alert_type":"brute_force","details":"Multiple failed login attempts detected"}
```

### Send JSON-formatted events with explicit message
Expand Down
Loading
Loading