Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions kafka_actions/assets/configuration/spec.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,18 @@ files:
type: boolean
description: Whether value uses Schema Registry format
default: false
- name: value_skip_bytes
type: integer
description: |
Number of bytes to drop from the start of the message value before
deserialization. Use this to strip a producer-side prefix that
kafka_actions doesn't recognize natively — for example a 1-byte
version flag, a non-Confluent schema-registry envelope, or a
tenant id prepended to every record. Applied before raw / json /
bson / protobuf / avro decoding, and before Schema Registry
magic-byte detection. Default 0 (no bytes skipped).
default: 0
example: 1
- name: key_format
type: string
description: |
Expand All @@ -312,6 +324,13 @@ files:
type: boolean
description: Whether key uses Schema Registry format
default: false
- name: key_skip_bytes
type: integer
description: |
Number of bytes to drop from the start of the message key before
deserialization. See value_skip_bytes for details. Default 0.
default: 0
example: 1
- name: consumer_group_id
type: string
description: |
Expand Down
1 change: 1 addition & 0 deletions kafka_actions/changelog.d/23556.added
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Support skipping a fixed number of bytes before deserialization via `value_skip_bytes` and `key_skip_bytes` for stripping producer-side prefixes.
2 changes: 2 additions & 0 deletions kafka_actions/datadog_checks/kafka_actions/check.py
Original file line number Diff line number Diff line change
Expand Up @@ -275,9 +275,11 @@ def _action_read_messages(self):
'value_format': config.get('value_format', 'json'),
'value_schema': config.get('value_schema'),
'value_uses_schema_registry': config.get('value_uses_schema_registry', False),
'value_skip_bytes': config.get('value_skip_bytes', 0),
'key_format': config.get('key_format', 'string'),
'key_schema': config.get('key_schema'),
'key_uses_schema_registry': config.get('key_uses_schema_registry', False),
'key_skip_bytes': config.get('key_skip_bytes', 0),
}

self.log.debug(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,11 @@ class ReadMessages(BaseModel):
examples=['json'],
)
key_schema: Optional[str] = Field(None, description='Schema definition for protobuf/avro key')
key_skip_bytes: Optional[int] = Field(
0,
description='Number of bytes to drop from the start of the message key before\ndeserialization. See value_skip_bytes for details. Default 0.\n',
examples=[1],
)
key_uses_schema_registry: Optional[bool] = Field(False, description='Whether key uses Schema Registry format')
max_scanned_messages: Optional[int] = Field(
1000,
Expand Down Expand Up @@ -155,6 +160,11 @@ class ReadMessages(BaseModel):
examples=['json'],
)
value_schema: Optional[str] = Field(None, description='Schema definition for protobuf/avro value')
value_skip_bytes: Optional[int] = Field(
0,
description="Number of bytes to drop from the start of the message value before\ndeserialization. Use this to strip a producer-side prefix that\nkafka_actions doesn't recognize natively — for example a 1-byte\nversion flag, a non-Confluent schema-registry envelope, or a\ntenant id prepended to every record. Applied before raw / json /\nbson / protobuf / avro decoding, and before Schema Registry\nmagic-byte detection. Default 0 (no bytes skipped).\n",
examples=[1],
)
value_uses_schema_registry: Optional[bool] = Field(False, description='Whether value uses Schema Registry format')


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,7 @@ def deserialize_message(
format_type: str = 'json',
schema_str: str | None = None,
uses_schema_registry: bool = False,
skip_bytes: int = 0,
) -> tuple[str | None, int | None]:
"""Deserialize a message (key or value).

Expand All @@ -191,6 +192,13 @@ def deserialize_message(
format_type: 'json', 'bson', 'protobuf', 'avro', or 'raw'
schema_str: Schema definition (for protobuf/avro)
uses_schema_registry: Whether to expect Schema Registry format
skip_bytes: Number of bytes to drop from the start of raw_bytes
before any further processing. Useful for stripping a custom
producer-side prefix (e.g. a 1-byte version flag, a 4-byte
tenant id, a non-Confluent schema registry envelope) so the
remaining bytes can be fed to one of the standard format
paths. Applied before raw/json/bson/protobuf/avro and before
Schema Registry magic-byte detection.

Returns:
Tuple of (deserialized_string, schema_id)
Expand All @@ -200,6 +208,20 @@ def deserialize_message(
if not raw_bytes:
return None, None

if skip_bytes:
if skip_bytes < 0:
self.log.warning("skip_bytes must be non-negative, got %d", skip_bytes)
return f"<deserialization error: skip_bytes must be non-negative, got {skip_bytes}>", None
if skip_bytes > len(raw_bytes):
self.log.warning("skip_bytes=%d exceeds message length %d", skip_bytes, len(raw_bytes))
return (
f"<deserialization error: skip_bytes={skip_bytes} exceeds message length {len(raw_bytes)}>",
None,
)
raw_bytes = raw_bytes[skip_bytes:]
if not raw_bytes:
return None, None

if format_type == 'raw':
return json.dumps(base64.b64encode(raw_bytes).decode('ascii')), None

Expand Down Expand Up @@ -614,9 +636,10 @@ def key(self) -> dict | str | None:
key_format = self.config.get('key_format', 'json')
key_schema = self.config.get('key_schema')
key_uses_sr = self.config.get('key_uses_schema_registry', False)
key_skip_bytes = self.config.get('key_skip_bytes', 0)

deserialized, schema_id = self.deserializer.deserialize_message(
self.kafka_msg.key(), key_format, key_schema, key_uses_sr
self.kafka_msg.key(), key_format, key_schema, key_uses_sr, skip_bytes=key_skip_bytes
)

self._key_deserialized = self._parse_deserialized(deserialized)
Expand All @@ -631,9 +654,10 @@ def value(self) -> dict | str | None:
value_format = self.config.get('value_format', 'json')
value_schema = self.config.get('value_schema')
value_uses_sr = self.config.get('value_uses_schema_registry', False)
value_skip_bytes = self.config.get('value_skip_bytes', 0)

deserialized, schema_id = self.deserializer.deserialize_message(
self.kafka_msg.value(), value_format, value_schema, value_uses_sr
self.kafka_msg.value(), value_format, value_schema, value_uses_sr, skip_bytes=value_skip_bytes
)

self._value_deserialized = self._parse_deserialized(deserialized)
Expand Down
107 changes: 107 additions & 0 deletions kafka_actions/tests/test_message_deserializer.py
Original file line number Diff line number Diff line change
Expand Up @@ -565,6 +565,113 @@ def test_deserialize_raw_no_coercion_for_json_like_base64(self):
assert msg.value == '1234', "raw base64 '1234' must stay as string, not become int"
assert isinstance(msg.value, str)

def test_skip_bytes_strips_prefix_before_json_decode(self):
"""skip_bytes drops a producer-side prefix before deserialization."""
log = MagicMock()
deserializer = MessageDeserializer(log)

# 1-byte version flag prepended to a JSON payload — common shape for
# producers that route by version on the same topic.
prefixed = b'\x03' + b'{"order_id": "12345"}'

result, schema_id = deserializer.deserialize_message(prefixed, 'json', skip_bytes=1)

assert json.loads(result) == {'order_id': '12345'}
assert schema_id is None

def test_skip_bytes_zero_is_default_behavior(self):
"""skip_bytes=0 (the default) leaves the message untouched."""
log = MagicMock()
deserializer = MessageDeserializer(log)

message = b'{"order_id": "12345"}'
baseline, _ = deserializer.deserialize_message(message, 'json')
with_zero, _ = deserializer.deserialize_message(message, 'json', skip_bytes=0)

assert baseline == with_zero
assert json.loads(with_zero) == {'order_id': '12345'}

def test_skip_bytes_with_raw_format(self):
"""skip_bytes is applied before raw base64 encoding."""
log = MagicMock()
deserializer = MessageDeserializer(log)

prefix = b'\xde\xad\xbe\xef'
payload = b'\x00\x01\x02'
result, schema_id = deserializer.deserialize_message(prefix + payload, 'raw', skip_bytes=4)

expected = base64.b64encode(payload).decode('ascii')
assert json.loads(result) == expected
assert schema_id is None

def test_skip_bytes_negative_returns_error(self):
"""A negative skip_bytes is rejected with a clear error."""
log = MagicMock()
deserializer = MessageDeserializer(log)

result, schema_id = deserializer.deserialize_message(b'whatever', 'json', skip_bytes=-1)

assert result.startswith('<deserialization error: skip_bytes must be non-negative')
assert schema_id is None

def test_skip_bytes_exceeds_message_length_returns_error(self):
"""skip_bytes greater than the message length returns a clear error."""
log = MagicMock()
deserializer = MessageDeserializer(log)

result, schema_id = deserializer.deserialize_message(b'abc', 'json', skip_bytes=10)

assert result.startswith('<deserialization error: skip_bytes=10 exceeds message length 3')
assert schema_id is None

def test_skip_bytes_equal_to_message_length_returns_none(self):
"""Skipping exactly the whole message yields an empty result, not an error."""
log = MagicMock()
deserializer = MessageDeserializer(log)

result, schema_id = deserializer.deserialize_message(b'abc', 'json', skip_bytes=3)

assert result is None
assert schema_id is None

def test_value_skip_bytes_via_deserialized_message(self):
"""DeserializedMessage routes value_skip_bytes / key_skip_bytes through to the deserializer."""
log = MagicMock()
deserializer = MessageDeserializer(log)

# 4-byte tenant id prefix on the value, no prefix on the key.
value_with_prefix = b'\x00\x00\x00\x07' + b'{"price": 99.95}'
key = b'order-42'

kafka_msg = MockKafkaMessage(key=key, value=value_with_prefix)
config = {
'key_format': 'string',
'key_skip_bytes': 0,
'value_format': 'json',
'value_skip_bytes': 4,
}

msg = DeserializedMessage(kafka_msg, deserializer, config)
assert msg.value == {'price': 99.95}
assert msg.key == 'order-42'

def test_skip_bytes_runs_before_schema_registry_detection(self):
"""skip_bytes is applied before the Confluent SR magic-byte check.

Lets a caller strip a custom outer envelope and then have the rest
decoded as a normal Confluent SR-framed message.
"""
log = MagicMock()
deserializer = MessageDeserializer(log)

# Outer 1-byte env flag, then Confluent SR framing: 0x00 + schema_id=42 + JSON body.
sr_framed = b'\x00' + (42).to_bytes(4, 'big') + b'{"v": 1}'
prefixed = b'\xfe' + sr_framed

result, schema_id = deserializer.deserialize_message(prefixed, 'json', uses_schema_registry=True, skip_bytes=1)
assert json.loads(result) == {'v': 1}
assert schema_id == 42


class TestSchemaRegistryIntegration:
"""Test MessageDeserializer with schema registry client."""
Expand Down
Loading