Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 38 additions & 18 deletions src/content/docs/integrations/zeromq.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -5,41 +5,61 @@ title: ZeroMQ
[ZeroMQ](https://zeromq.org/) (0mq) is a light-weight messaging framework with
various socket types. Tenzir supports writing to [PUB
sockets](https://zeromq.org/socket-api/#pub-socket) and reading from [SUB
sockets](https://zeromq.org/socket-api/#sub-socket), both in server (listening)
and client (connect) mode.
sockets](https://zeromq.org/socket-api/#sub-socket), both in bind mode and
connect mode.

![ZeroMQ](zeromq.svg)

Use the IP address `0.0.0.0` to listen on all available network interfaces.

The new executor provides event-oriented ZeroMQ operators:
Copy link
Copy Markdown
Member

@mavam mavam Apr 13, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"new executor" should not be mentioned in the primary docs. Rather, we should have an explicit migration guide that transitioning users will ultimately resort to.

Otherwise you have to do a content pass in another month to rephrase all of this again.


- <Op>from_zmq</Op>: Connects as a `SUB` socket and receives events.
- <Op>accept_zmq</Op>: Binds a `SUB` socket and receives events.
- <Op>to_zmq</Op>: Connects as a `PUB` socket and sends events.
- <Op>serve_zmq</Op>: Binds a `PUB` socket and sends events.

Tenzir documents these operators for PUB/SUB-style use. ZeroMQ itself does not
have a first-class topic abstraction. Instead, Tenzir uses an optional `prefix`
that is prepended to outgoing messages and matched by subscribers with ZeroMQ's
native byte-prefix filtering. Receivers strip the matched prefix before running
their nested `read_*` pipeline unless `keep_prefix=true`.

Because ZeroMQ is entirely asynchronous, publishers send messages even when no
subscriber is present. This can lead to lost messages when the publisher begins
operating before the subscriber. To avoid data loss due to such races, pass
`monitor=true` to activate message buffering until at least one remote peer has
connected.

:::tip[URL Support]
The URL scheme `zmq://` dispatches to
<Op>load_zmq</Op> and
<Op>save_zmq</Op> for seamless URL-style use via
<Op>from</Op> and <Op>to</Op>.
:::
`monitor=true` on <Op>to_zmq</Op>, <Op>serve_zmq</Op>, or the legacy
<Op>save_zmq</Op> operator to wait until at least one remote peer has connected
on TCP transports.

## Examples

### Accept Syslog messages over ZeroMQ
### Connect to a remote publisher and parse JSON

```tql
from "zmq://127.0.0.1:541" {
read_syslog
from_zmq "tcp://collector.example.com:5555" {
read_json
}
```

### Send events to a ZeroMQ socket
### Receive a prefixed stream

```tql
from {message: "Tenzir"}
to "zmq://1.2.3.4:8080" {
write_ndjson
accept_zmq "tcp://0.0.0.0:5555", prefix="alerts/" {
read_ndjson
}
```

### Publish events with a dynamic prefix

```tql
export
serve_zmq "tcp://0.0.0.0:5555", encoding="json", prefix=kind + "/"
```

### Connect and publish JSON

```tql
export
to_zmq "tcp://collector.example.com:5555", encoding="json"
```
48 changes: 48 additions & 0 deletions src/content/docs/reference/operators.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -359,6 +359,10 @@ operators:
description: 'Sends and receives HTTP/1.1 requests.'
example: 'from_http "0.0.0.0:8080"'
path: 'reference/operators/from_http'
- name: 'from_zmq'
description: 'Connects to a remote ZeroMQ publisher and receives events.'
example: 'from_zmq "tcp://collector.example.com:5555", prefix="alerts/" { read_json }'
path: 'reference/operators/from_zmq'
- name: 'from_stdin'
description: 'Reads and parses events from standard input.'
example: 'from_stdin { read_json }'
Expand Down Expand Up @@ -631,6 +635,10 @@ operators:
description: 'Parses an incoming `Zeek TSV` stream into events.'
example: 'read_zeek_tsv'
path: 'reference/operators/read_zeek_tsv'
- name: 'accept_zmq'
description: 'Listens on a ZeroMQ endpoint and receives events.'
example: 'accept_zmq "tcp://0.0.0.0:5555", prefix="alerts/" { read_json }'
path: 'reference/operators/accept_zmq'
- name: 'save_amqp'
description: 'Saves a byte stream via AMQP messages.'
example: 'save_amqp'
Expand Down Expand Up @@ -711,6 +719,10 @@ operators:
description: 'Sends events to the Microsoft Azure Logs Ingestion API.'
example: 'to_azure_log_analytics tenant_id="...", workspace_id="..."'
path: 'reference/operators/to_azure_log_analytics'
- name: 'to_zmq'
description: 'Connects to a remote ZeroMQ subscriber endpoint and sends events.'
example: 'to_zmq "tcp://collector.example.com:5555", encoding="json", prefix=kind + "/"'
path: 'reference/operators/to_zmq'
- name: 'to_clickhouse'
description: 'Sends events to a ClickHouse table.'
example: 'to_clickhouse table="my_table"'
Expand All @@ -731,6 +743,10 @@ operators:
description: 'Sends unstructured events to a Google SecOps Chronicle instance.'
example: 'to_google_secops …'
path: 'reference/operators/to_google_secops'
- name: 'serve_zmq'
description: 'Listens on a ZeroMQ endpoint and sends events.'
example: 'serve_zmq "tcp://0.0.0.0:5555", encoding="json", prefix=kind + "/"'
path: 'reference/operators/serve_zmq'
- name: 'to_hive'
description: 'Writes events to a URI using hive partitioning.'
example: 'to_hive "s3://…", partition_by=[x]'
Expand Down Expand Up @@ -2195,6 +2211,14 @@ from_http "0.0.0.0:8080"

</ReferenceCard>

<ReferenceCard title="from_zmq" description="Connects to a remote ZeroMQ publisher and receives events." href="/reference/operators/from_zmq">

```tql
from_zmq "tcp://collector.example.com:5555", prefix="alerts/" { read_json }
```

</ReferenceCard>

<ReferenceCard title="from_stdin" description="Reads and parses events from standard input." href="/reference/operators/from_stdin">

```tql
Expand Down Expand Up @@ -2259,6 +2283,14 @@ from_velociraptor subscribe="Windows"

</ReferenceCard>

<ReferenceCard title="accept_zmq" description="Listens on a ZeroMQ endpoint and receives events." href="/reference/operators/accept_zmq">

```tql
accept_zmq "tcp://0.0.0.0:5555", prefix="alerts/" { read_json }
```

</ReferenceCard>

</CardGrid>

## Node
Expand Down Expand Up @@ -2509,6 +2541,14 @@ to_azure_log_analytics tenant_id="...", workspace_id="..."

</ReferenceCard>

<ReferenceCard title="to_zmq" description="Connects to a remote ZeroMQ subscriber endpoint and sends events." href="/reference/operators/to_zmq">

```tql
to_zmq "tcp://collector.example.com:5555", encoding="json", prefix=kind + "/"
```

</ReferenceCard>

<ReferenceCard title="to_clickhouse" description="Sends events to a ClickHouse table." href="/reference/operators/to_clickhouse">

```tql
Expand Down Expand Up @@ -2549,6 +2589,14 @@ to_google_secops …

</ReferenceCard>

<ReferenceCard title="serve_zmq" description="Listens on a ZeroMQ endpoint and sends events." href="/reference/operators/serve_zmq">

```tql
serve_zmq "tcp://0.0.0.0:5555", encoding="json", prefix=kind + "/"
```

</ReferenceCard>

<ReferenceCard title="to_hive" description="Writes events to a URI using hive partitioning." href="/reference/operators/to_hive">

```tql
Expand Down
82 changes: 82 additions & 0 deletions src/content/docs/reference/operators/accept_zmq.mdx
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
---
title: accept_zmq
category: Inputs/Events
example: 'accept_zmq "tcp://0.0.0.0:5555", prefix="alerts/" { read_json }'
---

import Op from '@components/see-also/Op.astro';
import Integration from '@components/see-also/Integration.astro';

Listens on a ZeroMQ endpoint and receives events.

```tql
accept_zmq endpoint:string, [prefix=string, keep_prefix=bool, { … }]
```

## Description

Binds a ZeroMQ `SUB` socket to the specified endpoint and receives messages that
match the configured subscription prefix.

Use `accept_zmq` when Tenzir should own the listening endpoint. This matches the
naming used by other transport operators such as <Op>accept_tcp</Op>, even
though ZeroMQ itself calls this binding rather than accepting.

As with <Op>from_zmq</Op>, the `prefix` option uses ZeroMQ's raw subscription
filtering. When `keep_prefix=false`, the operator strips the matched prefix
before handing the remaining bytes to the nested pipeline.

### `endpoint: string`

The endpoint to listen on, for example `tcp://0.0.0.0:5555`, `ipc://path`, or
`inproc://name`.

### `prefix = string (optional)`

A constant subscription prefix to install on the `SUB` socket.

The expression must evaluate to a string before the operator starts receiving
messages. It cannot depend on event fields.

Defaults to the empty string, which subscribes to all messages.

### `keep_prefix = bool (optional)`

Whether to keep the matched prefix in the bytes that are passed to the nested
pipeline.

Defaults to `false`.

### `{ … } (optional)`

The pipeline to run for incoming message payloads. It receives bytes and must
produce events, for example `{ read_json }` or `{ read_syslog }`.

If you omit the nested pipeline, the operator emits one event per message with a
single field `message` containing the message payload as a `blob`.

## Examples

### Listen for JSON messages

```tql
accept_zmq "tcp://0.0.0.0:5555" {
read_json
}
```

### Listen with a subscription prefix

```tql
accept_zmq "tcp://0.0.0.0:5555", prefix="suricata/" {
read_suricata
}
```

## See Also

- <Op>from_zmq</Op>
- <Op>to_zmq</Op>
- <Op>serve_zmq</Op>
- <Op>load_zmq</Op>
- <Integration>zeromq</Integration>
4 changes: 2 additions & 2 deletions src/content/docs/reference/operators/from.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -144,14 +144,14 @@ load_tcp "tcp://0.0.0.0:12345", parallel=10 {
| `ftp`, `ftps` | <Op>load_ftp</Op> | `from "ftp://example.com/file.json"` |
| `gs` | <Op>load_gcs</Op> | `from "gs://bucket/object.json"` |
| `http`, `https` | <Op>load_http</Op> | `from "http://example.com/file.json"` |
| `inproc` | <Op>load_zmq</Op> | `from "inproc://127.0.0.1:56789" { read_json }` |
| `inproc` | <Op>load_zmq</Op> | `from "inproc://worker" { read_json }` |
| `kafka` | <Op>load_kafka</Op> | `from "kafka://topic" { read_json }` |
| `opensearch` | <Op>from_opensearch</Op> | `from "opensearch://1.2.3.4:9200` |
| `s3` | <Op>load_s3</Op> | `from "s3://bucket/file.json"` |
| `sqs` | <Op>load_sqs</Op> | `from "sqs://my-queue" { read_json }` |
| `tcp` | <Op>load_tcp</Op> | `from "tcp://127.0.0.1:13245" { read_json }` |
| `udp` | <Op>load_udp</Op> | `from "udp://127.0.0.1:56789" { read_json }` |
| `zmq` | <Op>load_zmq</Op> | `from "zmq://127.0.0.1:56789" { read_json }` |
| `zmq` | <Op>load_zmq</Op> | `from "zmq://127.0.0.1:56789" { read_json }` |

Please see the respective operator pages for details on the URI's locator format.

Expand Down
93 changes: 93 additions & 0 deletions src/content/docs/reference/operators/from_zmq.mdx
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
---
title: from_zmq
category: Inputs/Events
example: 'from_zmq "tcp://collector.example.com:5555", prefix="alerts/" { read_json }'
---

import Op from '@components/see-also/Op.astro';
import Integration from '@components/see-also/Integration.astro';

Connects to a remote ZeroMQ publisher and receives events.

```tql
from_zmq endpoint:string, [prefix=string, keep_prefix=bool, { … }]
```

## Description

Connects to the specified ZeroMQ endpoint as a `SUB` socket and receives
messages that match the configured subscription prefix.

Tenzir documents these operators for PUB/SUB-style use. ZeroMQ itself does not
have a first-class notion of a topic. Instead, the `prefix` option performs a
raw prefix match on the incoming message bytes, using ZeroMQ subscription
filtering at the socket.

When `keep_prefix=false`, the operator strips the matched prefix from the
message before running the nested pipeline. This lets you combine prefix-based
routing at the transport layer with regular `read_*` operators inside TQL.

If the connection fails, the operator retries with exponential backoff.

### `endpoint: string`

The remote endpoint to connect to. This is typically a ZeroMQ endpoint such as
`tcp://host:port`, `ipc://path`, or `inproc://name`.

### `prefix = string (optional)`

A constant subscription prefix to install on the `SUB` socket.

The expression must evaluate to a string before the operator starts receiving
messages. It cannot depend on event fields.

Defaults to the empty string, which subscribes to all messages.

### `keep_prefix = bool (optional)`

Whether to keep the matched prefix in the bytes that are passed to the nested
pipeline.

Defaults to `false`.

### `{ … } (optional)`

The pipeline to run for incoming message payloads. It receives bytes and must
produce events, for example `{ read_json }` or `{ read_syslog }`.

If you omit the nested pipeline, the operator emits one event per message with a
single field `message` containing the message payload as a `blob`.

## Examples

### Connect to a publisher and parse JSON

```tql
from_zmq "tcp://collector.example.com:5555" {
read_json
}
```

### Subscribe to a prefixed stream

```tql
from_zmq "tcp://collector.example.com:5555", prefix="alerts/" {
read_ndjson
}
```

### Keep the matched prefix

```tql
from_zmq "tcp://collector.example.com:5555", prefix="syslog ", keep_prefix=true {
read_syslog
}
```

## See Also

- <Op>accept_zmq</Op>
- <Op>to_zmq</Op>
- <Op>serve_zmq</Op>
- <Op>load_zmq</Op>
- <Integration>zeromq</Integration>
Loading
Loading