feat(pipeline): widen conflicting columns to String instead of errori…#7991
feat(pipeline): widen conflicting columns to String instead of errori…#7991teochenglim wants to merge 1 commit intoGreptimeTeam:mainfrom
Conversation
There was a problem hiding this comment.
Code Review
This pull request introduces a coerce_on_conflict configuration for the Greptime identity pipeline, which resolves type mismatches by widening columns to String and retroactively converting existing values in the batch. Review feedback highlights a bug in the string conversion of jsonb data (stored in BinaryValue and JsonValue) and identifies redundant logic in the value resolution process for newly added columns.
| ValueData::BinaryValue(v) => { | ||
| ValueData::StringValue(String::from_utf8_lossy(&v).into_owned()) | ||
| } | ||
| ValueData::JsonValue(v) => { | ||
| ValueData::StringValue(String::from_utf8_lossy(&v).into_owned()) | ||
| } |
There was a problem hiding this comment.
The BinaryValue and JsonValue variants in this pipeline are used to store JSON data encoded in the jsonb binary format (as seen in lines 775 and 786). Using String::from_utf8_lossy directly on these bytes will not produce a valid JSON string and will instead result in a string containing the raw binary representation (likely garbage characters). You should decode the jsonb data using the jsonb crate before converting it to a string.
| ValueData::BinaryValue(v) => { | |
| ValueData::StringValue(String::from_utf8_lossy(&v).into_owned()) | |
| } | |
| ValueData::JsonValue(v) => { | |
| ValueData::StringValue(String::from_utf8_lossy(&v).into_owned()) | |
| } | |
| ValueData::BinaryValue(v) | ValueData::JsonValue(v) => { | |
| let s = jsonb::from_slice(&v) | |
| .map(|jv| jv.to_string()) | |
| .unwrap_or_else(|_| String::from_utf8_lossy(&v).into_owned()); | |
| ValueData::StringValue(s) | |
| } |
| let value_data = if let Some(idx) = index { | ||
| if schema_info.coerced_to_string.contains(&idx) { | ||
| value_data.map(value_data_to_string) | ||
| } else { | ||
| value_data | ||
| } | ||
| } else { | ||
| // Newly added column: check by name in case it was just inserted. | ||
| let new_idx = schema_info.index.get(&column_name).copied(); | ||
| if new_idx.is_some_and(|i| schema_info.coerced_to_string.contains(&i)) { | ||
| value_data.map(value_data_to_string) | ||
| } else { | ||
| value_data | ||
| } | ||
| }; |
There was a problem hiding this comment.
The else block for newly added columns is redundant. If index was None at the start of resolve_value, it means the column was not present in the schema for the current batch. While resolve_schema might add the column to the schema, it only inserts into coerced_to_string when a type mismatch is detected on an existing column (where index is Some). Therefore, a newly added column can never be in coerced_to_string during the same resolve_value call, making the extra lookup and check unnecessary.
let value_data = if let Some(idx) = index {
if schema_info.coerced_to_string.contains(&idx) {
value_data.map(value_data_to_string)
} else {
value_data
}
} else {
value_data
};…ng in identity pipeline Signed-off-by: Teo Cheng Lim <teochenglim@gmail.com>
|
Addressing Gemini code review Gemini Code Review — Response & PatchesReview SummaryGemini flagged two issues on the
Patch 1 — Fix jsonb binary → String conversion (High)File: Root cause: - ValueData::BinaryValue(v) => {
- ValueData::StringValue(String::from_utf8_lossy(&v).into_owned())
- }
- ValueData::JsonValue(v) => {
- ValueData::StringValue(String::from_utf8_lossy(&v).into_owned())
- }
+ ValueData::BinaryValue(v) | ValueData::JsonValue(v) => {
+ let s = jsonb::from_slice(&v)
+ .map(|jv| jv.to_string())
+ .unwrap_or_else(|_| String::from_utf8_lossy(&v).into_owned());
+ ValueData::StringValue(s)
+ }Patch 2 — Remove redundant else branch in resolve_value() (Medium)File: Root cause: - } else {
- // Newly added column: check by name in case it was just inserted.
- let new_idx = schema_info.index.get(&column_name).copied();
- if new_idx.is_some_and(|i| schema_info.coerced_to_string.contains(&i)) {
- value_data.map(value_data_to_string)
- } else {
- value_data
- }
- };
+ } else {
+ value_data
+ }; |
feat(pipeline): add coerce_on_conflict option to gracefully handle type conflicts in identity pipeline
I hereby agree to the terms of the GreptimeDB CLA.
Refer to a related PR or issue link (optional)
What's changed and what's your intention?
When ingesting JSON logs via the
greptime_identitypipeline, a type conflict onany field (e.g.
user_id: 123in one document,user_id: "unknown"in another)causes the entire batch to be rejected with
IdentifyPipelineColumnTypeMismatch.For semi-structured or externally-sourced logs with evolving schemas, this makes
zero-config ingestion unusable without writing a full custom pipeline that
explicitly enumerates every potentially-conflicting field.
This PR adds an opt-in
coerce_on_conflictflag (defaultfalse) toGreptimePipelineParams, passed via the existing HTTP header:How it works:
resolve_schema(), when a type mismatch is detected and the flag is set,the conflicting column is widened to
ConcreteDataType::Stringinstead ofreturning an error. A
WARNis emitted once per widened column.SchemaInfo.coerced_to_stringset.resolve_value(), after constructing theValueData, any value whosecolumn is in
coerced_to_stringis stringified via a newvalue_data_to_string()helper.identity_pipeline_inner(), after all rows in the batch are processed, afixup pass retroactively stringifies values that were written in earlier rows
before the conflict was first detected.
Backward compatibility:
The flag defaults to
false. Existing pipelines are completely unaffected.No API or schema changes are made to the default ingestion path.
Limitations:
String, thecolumn stays
Stringfor all subsequent rows in that batch.they retain their original typed values. Schema widening across batches
depends on the underlying table's ALTER TABLE behaviour.
PR Checklist
Please convert it to a draft if some of the following conditions are not met.
feat(pipeline): Add global
coerce_on_conflictflag for identity pipeline type conflict resolutionContext
GreptimeDB's
greptime_identitypipeline is a zero-config ingestion path that automatically flattens JSON and infers column schemas from the first document seen. It is the default pipeline for log ingestion and requires no user-defined schema.The schema-merging logic lives in
resolve_schema()(greptime.rs:426). When a field appears with a different type than what was previously recorded, the pipeline raisesIdentifyPipelineColumnTypeMismatchand rejects the entire batch.This strict behavior is intentional for structured, schema-stable data. However, it creates a brittle experience for semi-structured or evolving log sources where field types are not guaranteed to be consistent across documents.
Problem
When ingesting JSON logs from external sources (e.g., application traces, third-party webhooks, or heterogeneous systems), the same field may appear with different types across documents:
{ "user_id": 123, "status": "ok" } { "user_id": "unknown", "status": "degraded" }The second document fails with:
This means:
on_failure: default— impractical for unknown or highly dynamic schemas.The existing
on_failuremechanism only handles value coercion failures (e.g., parsing"-"as an integer within a declared transform). It does not apply to schema-level type conflicts detected inresolve_schema().How Elasticsearch Handles This
Elasticsearch faces the same problem — dynamic mapping locks a field's type on first write, and subsequent type conflicts reject the document. It exposes several mechanisms to work around this. Two concrete cases:
ES Case 1:
ignore_malformed— accept the document, drop the conflicting fieldSet at the index level, this tells Elasticsearch to skip indexing a field that doesn't match its mapped type, but still accept the rest of the document.
Behavior with conflicting input:
Document 2 is accepted, but
user_id: "unknown"is silently excluded from the index. It still appears in_source(the raw JSON), but you cannot search or filter on it. It is invisible to queries.Why this doesn't solve our problem: Data is lost for query purposes.
WHERE user_id = 'unknown'returns nothing. For a time-series log database where every field should be queryable, this is unacceptable.ES Case 2:
dynamic_templates— pre-empt conflicts by pattern-matching field namesDynamic templates let you override type detection before conflicts occur. You define rules keyed on field name patterns or detected JSON types:
Behavior:
This works — but only if you know the field names (or name patterns) in advance. You must author the template with explicit
matchrules before ingestion begins.Why this doesn't solve our problem: It requires anticipating which fields will conflict. For completely unknown schemas (third-party logs, forwarded events from external systems), you cannot enumerate fields you have never seen. This is the exact scenario we are trying to handle.
Options I Considered
Option 1: Per-field
on_failure: defaultin a custom pipelineWrite a custom pipeline that declares each field with
type: stringandon_failure: default. This is GreptimeDB's existing escape hatch for type conflicts.Why it falls short: Same limitation as ES dynamic templates — you must enumerate every field that might conflict. Completely unworkable for unknown or evolving schemas. You are essentially writing a schema for a system whose value is being schema-free.
Option 2: Pre-ingestion transformation
Stringify all values before they reach GreptimeDB — using Vector, Logstash, or a custom Rust adapter that converts every
serde_json::Valueto its string representation before sending to the write API.Why it falls short: Adds an external component to the ingestion path, increases operational complexity, and destroys type information permanently — even for fields that are consistently typed. You lose
WHERE score > 90forever just becausescoreoccasionally arrives as a string.Option 3:
ignore_malformed-style — accept document, drop conflicting fieldAnalogous to ES Case 1 above. When a conflict is detected, null out the conflicting field and continue ingestion. The document survives, but the conflicting value is lost.
Why it falls short: For a time-series query database, silent data loss is worse than a visible error. A user querying
user_id = "unknown"would get zero results with no indication that data was dropped. ES accepts this trade-off because_sourcepreserves the raw JSON; GreptimeDB has no equivalent raw storage layer.Option 4: Global
coerce_on_conflict— this PR ✓Add a pipeline-level opt-in flag. When a type conflict is detected, widen the column to
Stringand coerce both the incoming value and all previously written values in the current batch to their string representations. The field remains stored and fully queryable. A warning is logged once per widened column. No document is rejected.This is the gap that neither Elasticsearch nor any comparable system fills natively:
ignore_malformed_sourcedynamic_templateson_failurecoerce_on_conflict(this PR)Solution
What this PR does
Adds
coerce_on_conflict: booltoGreptimePipelineParams, passed via the existingx-greptime-pipeline-paramsHTTP header:When
coerce_on_conflict = false(default):When
coerce_on_conflict = true:resolve_schema(), when a type mismatch is detected:ConcreteDataType::String.SchemaInfo.coerced_to_string.WARNis emitted once per widened column:coerce_on_conflict: widening column 'user_id' from Int64 to String.resolve_value(), after constructing theValueData, any column tracked incoerced_to_stringhas its value stringified viavalue_data_to_string().identity_pipeline_inner(), after all rows are processed, a fixup pass retroactively stringifies values written in earlier rows before the conflict was detected.End-to-end example
Input batch (3 documents):
[ { "user_id": 123, "event": "login" }, { "user_id": "unknown", "event": "auth_failure" }, { "user_id": 456, "event": "logout" } ]Without this PR (default behaviour):
With
coerce_on_conflict=true:All three rows land.
user_idis queryable as a string column.WHERE user_id = 'unknown'returns row 2 as expected.Code changes
src/pipeline/src/etl/transform/transformer/greptime.rscoerce_on_conflictfield + accessor onGreptimePipelineParams;coerced_to_string: HashSet<usize>onSchemaInfo; coercion logic inresolve_schema();value_data_to_string()helper; post-construction coercion inresolve_value(); retroactive fixup pass inidentity_pipeline_inner()Trade-offs
false)true)Because
coerce_on_conflictdefaults tofalse, this PR has zero impact on existing pipelines.