diff --git a/kafka_actions/assets/configuration/spec.yaml b/kafka_actions/assets/configuration/spec.yaml index 55757d4c9db70..f37fb82582787 100644 --- a/kafka_actions/assets/configuration/spec.yaml +++ b/kafka_actions/assets/configuration/spec.yaml @@ -292,6 +292,18 @@ files: type: boolean description: Whether value uses Schema Registry format default: false + - name: value_skip_bytes + type: integer + description: | + Number of bytes to drop from the start of the message value before + deserialization. Use this to strip a producer-side prefix that + kafka_actions doesn't recognize natively — for example a 1-byte + version flag, a non-Confluent schema-registry envelope, or a + tenant id prepended to every record. Applied before raw / json / + bson / protobuf / avro decoding, and before Schema Registry + magic-byte detection. Default 0 (no bytes skipped). + default: 0 + example: 1 - name: key_format type: string description: | @@ -312,6 +324,13 @@ files: type: boolean description: Whether key uses Schema Registry format default: false + - name: key_skip_bytes + type: integer + description: | + Number of bytes to drop from the start of the message key before + deserialization. See value_skip_bytes for details. Default 0. + default: 0 + example: 1 - name: consumer_group_id type: string description: | diff --git a/kafka_actions/changelog.d/23556.added b/kafka_actions/changelog.d/23556.added new file mode 100644 index 0000000000000..9d0b0e585f075 --- /dev/null +++ b/kafka_actions/changelog.d/23556.added @@ -0,0 +1 @@ +Support skipping a fixed number of bytes before deserialization via `value_skip_bytes` and `key_skip_bytes` for stripping producer-side prefixes. \ No newline at end of file diff --git a/kafka_actions/datadog_checks/kafka_actions/check.py b/kafka_actions/datadog_checks/kafka_actions/check.py index 60b12fd8dc338..36749930be796 100644 --- a/kafka_actions/datadog_checks/kafka_actions/check.py +++ b/kafka_actions/datadog_checks/kafka_actions/check.py @@ -275,9 +275,11 @@ def _action_read_messages(self): 'value_format': config.get('value_format', 'json'), 'value_schema': config.get('value_schema'), 'value_uses_schema_registry': config.get('value_uses_schema_registry', False), + 'value_skip_bytes': config.get('value_skip_bytes', 0), 'key_format': config.get('key_format', 'string'), 'key_schema': config.get('key_schema'), 'key_uses_schema_registry': config.get('key_uses_schema_registry', False), + 'key_skip_bytes': config.get('key_skip_bytes', 0), } self.log.debug( diff --git a/kafka_actions/datadog_checks/kafka_actions/config_models/instance.py b/kafka_actions/datadog_checks/kafka_actions/config_models/instance.py index 4d0b7b2281417..2c4fa36ed500b 100644 --- a/kafka_actions/datadog_checks/kafka_actions/config_models/instance.py +++ b/kafka_actions/datadog_checks/kafka_actions/config_models/instance.py @@ -124,6 +124,11 @@ class ReadMessages(BaseModel): examples=['json'], ) key_schema: Optional[str] = Field(None, description='Schema definition for protobuf/avro key') + key_skip_bytes: Optional[int] = Field( + 0, + description='Number of bytes to drop from the start of the message key before\ndeserialization. See value_skip_bytes for details. Default 0.\n', + examples=[1], + ) key_uses_schema_registry: Optional[bool] = Field(False, description='Whether key uses Schema Registry format') max_scanned_messages: Optional[int] = Field( 1000, @@ -155,6 +160,11 @@ class ReadMessages(BaseModel): examples=['json'], ) value_schema: Optional[str] = Field(None, description='Schema definition for protobuf/avro value') + value_skip_bytes: Optional[int] = Field( + 0, + description="Number of bytes to drop from the start of the message value before\ndeserialization. Use this to strip a producer-side prefix that\nkafka_actions doesn't recognize natively — for example a 1-byte\nversion flag, a non-Confluent schema-registry envelope, or a\ntenant id prepended to every record. Applied before raw / json /\nbson / protobuf / avro decoding, and before Schema Registry\nmagic-byte detection. Default 0 (no bytes skipped).\n", + examples=[1], + ) value_uses_schema_registry: Optional[bool] = Field(False, description='Whether value uses Schema Registry format') diff --git a/kafka_actions/datadog_checks/kafka_actions/message_deserializer.py b/kafka_actions/datadog_checks/kafka_actions/message_deserializer.py index a3da090054139..fe09e8035745f 100644 --- a/kafka_actions/datadog_checks/kafka_actions/message_deserializer.py +++ b/kafka_actions/datadog_checks/kafka_actions/message_deserializer.py @@ -183,6 +183,7 @@ def deserialize_message( format_type: str = 'json', schema_str: str | None = None, uses_schema_registry: bool = False, + skip_bytes: int = 0, ) -> tuple[str | None, int | None]: """Deserialize a message (key or value). @@ -191,6 +192,13 @@ def deserialize_message( format_type: 'json', 'bson', 'protobuf', 'avro', or 'raw' schema_str: Schema definition (for protobuf/avro) uses_schema_registry: Whether to expect Schema Registry format + skip_bytes: Number of bytes to drop from the start of raw_bytes + before any further processing. Useful for stripping a custom + producer-side prefix (e.g. a 1-byte version flag, a 4-byte + tenant id, a non-Confluent schema registry envelope) so the + remaining bytes can be fed to one of the standard format + paths. Applied before raw/json/bson/protobuf/avro and before + Schema Registry magic-byte detection. Returns: Tuple of (deserialized_string, schema_id) @@ -200,6 +208,20 @@ def deserialize_message( if not raw_bytes: return None, None + if skip_bytes: + if skip_bytes < 0: + self.log.warning("skip_bytes must be non-negative, got %d", skip_bytes) + return f"", None + if skip_bytes > len(raw_bytes): + self.log.warning("skip_bytes=%d exceeds message length %d", skip_bytes, len(raw_bytes)) + return ( + f"", + None, + ) + raw_bytes = raw_bytes[skip_bytes:] + if not raw_bytes: + return None, None + if format_type == 'raw': return json.dumps(base64.b64encode(raw_bytes).decode('ascii')), None @@ -614,9 +636,10 @@ def key(self) -> dict | str | None: key_format = self.config.get('key_format', 'json') key_schema = self.config.get('key_schema') key_uses_sr = self.config.get('key_uses_schema_registry', False) + key_skip_bytes = self.config.get('key_skip_bytes', 0) deserialized, schema_id = self.deserializer.deserialize_message( - self.kafka_msg.key(), key_format, key_schema, key_uses_sr + self.kafka_msg.key(), key_format, key_schema, key_uses_sr, skip_bytes=key_skip_bytes ) self._key_deserialized = self._parse_deserialized(deserialized) @@ -631,9 +654,10 @@ def value(self) -> dict | str | None: value_format = self.config.get('value_format', 'json') value_schema = self.config.get('value_schema') value_uses_sr = self.config.get('value_uses_schema_registry', False) + value_skip_bytes = self.config.get('value_skip_bytes', 0) deserialized, schema_id = self.deserializer.deserialize_message( - self.kafka_msg.value(), value_format, value_schema, value_uses_sr + self.kafka_msg.value(), value_format, value_schema, value_uses_sr, skip_bytes=value_skip_bytes ) self._value_deserialized = self._parse_deserialized(deserialized) diff --git a/kafka_actions/tests/test_message_deserializer.py b/kafka_actions/tests/test_message_deserializer.py index 9932b9f44a8f8..af67af7d328ba 100644 --- a/kafka_actions/tests/test_message_deserializer.py +++ b/kafka_actions/tests/test_message_deserializer.py @@ -565,6 +565,113 @@ def test_deserialize_raw_no_coercion_for_json_like_base64(self): assert msg.value == '1234', "raw base64 '1234' must stay as string, not become int" assert isinstance(msg.value, str) + def test_skip_bytes_strips_prefix_before_json_decode(self): + """skip_bytes drops a producer-side prefix before deserialization.""" + log = MagicMock() + deserializer = MessageDeserializer(log) + + # 1-byte version flag prepended to a JSON payload — common shape for + # producers that route by version on the same topic. + prefixed = b'\x03' + b'{"order_id": "12345"}' + + result, schema_id = deserializer.deserialize_message(prefixed, 'json', skip_bytes=1) + + assert json.loads(result) == {'order_id': '12345'} + assert schema_id is None + + def test_skip_bytes_zero_is_default_behavior(self): + """skip_bytes=0 (the default) leaves the message untouched.""" + log = MagicMock() + deserializer = MessageDeserializer(log) + + message = b'{"order_id": "12345"}' + baseline, _ = deserializer.deserialize_message(message, 'json') + with_zero, _ = deserializer.deserialize_message(message, 'json', skip_bytes=0) + + assert baseline == with_zero + assert json.loads(with_zero) == {'order_id': '12345'} + + def test_skip_bytes_with_raw_format(self): + """skip_bytes is applied before raw base64 encoding.""" + log = MagicMock() + deserializer = MessageDeserializer(log) + + prefix = b'\xde\xad\xbe\xef' + payload = b'\x00\x01\x02' + result, schema_id = deserializer.deserialize_message(prefix + payload, 'raw', skip_bytes=4) + + expected = base64.b64encode(payload).decode('ascii') + assert json.loads(result) == expected + assert schema_id is None + + def test_skip_bytes_negative_returns_error(self): + """A negative skip_bytes is rejected with a clear error.""" + log = MagicMock() + deserializer = MessageDeserializer(log) + + result, schema_id = deserializer.deserialize_message(b'whatever', 'json', skip_bytes=-1) + + assert result.startswith('