diff --git a/src/pipeline/src/etl/transform/transformer/greptime.rs b/src/pipeline/src/etl/transform/transformer/greptime.rs index 2f5bdad50a20..242122bff5a2 100644 --- a/src/pipeline/src/etl/transform/transformer/greptime.rs +++ b/src/pipeline/src/etl/transform/transformer/greptime.rs @@ -80,6 +80,9 @@ pub struct GreptimePipelineParams { /// Max nested levels when flattening JSON object. Defaults to /// `DEFAULT_MAX_NESTED_LEVELS_FOR_JSON_FLATTENING` when not provided. pub max_nested_levels: OnceCell, + /// When true, type conflicts in identity pipeline are resolved by widening the column to String + /// instead of returning an error. Default false for backward compatibility. + pub coerce_on_conflict: OnceCell, } impl GreptimePipelineParams { @@ -93,6 +96,7 @@ impl GreptimePipelineParams { options, skip_error: OnceCell::new(), max_nested_levels: OnceCell::new(), + coerce_on_conflict: OnceCell::new(), } } @@ -101,6 +105,7 @@ impl GreptimePipelineParams { options, skip_error: OnceCell::new(), max_nested_levels: OnceCell::new(), + coerce_on_conflict: OnceCell::new(), } } @@ -138,6 +143,13 @@ impl GreptimePipelineParams { .unwrap_or(DEFAULT_MAX_NESTED_LEVELS_FOR_JSON_FLATTENING) }) } + + /// When true, type conflicts are resolved by widening the column to String rather than erroring. + pub fn coerce_on_conflict(&self) -> bool { + *self + .coerce_on_conflict + .get_or_init(|| self.options.get("coerce_on_conflict").map(truthy).unwrap_or(false)) + } } impl GreptimeTransformer { @@ -348,6 +360,9 @@ pub struct SchemaInfo { pub index: HashMap, /// The pipeline's corresponding table (if already created). Useful to retrieve column schemas. table: Option>, + /// Column indices that were widened to String due to coerce_on_conflict. + /// Used to retroactively stringify values in earlier rows within the same batch. + pub coerced_to_string: std::collections::HashSet, } impl SchemaInfo { @@ -356,6 +371,7 @@ impl SchemaInfo { schema: Vec::with_capacity(capacity), index: HashMap::with_capacity(capacity), table: None, + coerced_to_string: std::collections::HashSet::new(), } } @@ -368,6 +384,7 @@ impl SchemaInfo { schema: schema_list.into_iter().map(Into::into).collect(), index, table: None, + coerced_to_string: std::collections::HashSet::new(), } } @@ -414,20 +431,34 @@ fn resolve_schema( schema_info: &mut SchemaInfo, ) -> Result<()> { if let Some(index) = index { - let column_type = &mut schema_info.schema[index].column_schema.data_type; - match (column_type, value_type) { - (column_type, value_type) if column_type == value_type => Ok(()), - (ConcreteDataType::Json(column_type), ConcreteDataType::Json(value_type)) - if column_type.is_include(value_type) => - { - Ok(()) - } - (column_type, value_type) => IdentifyPipelineColumnTypeMismatchSnafu { - column, - expected: column_type.to_string(), - actual: value_type.to_string(), + // Clone to avoid holding &mut across the hashset mutation below. + let column_type = schema_info.schema[index].column_schema.data_type.clone(); + match (&column_type, value_type) { + (a, b) if a == b => Ok(()), + (ConcreteDataType::Json(a), ConcreteDataType::Json(b)) if a.is_include(b) => Ok(()), + (column_type, value_type) => { + if pipeline_context.pipeline_param.coerce_on_conflict() { + if !schema_info.coerced_to_string.contains(&index) { + warn!( + "coerce_on_conflict: widening column '{}' from {} to String (got {})", + column, + column_type, + value_type + ); + schema_info.coerced_to_string.insert(index); + } + schema_info.schema[index].column_schema.data_type = + ConcreteDataType::string_datatype(); + Ok(()) + } else { + IdentifyPipelineColumnTypeMismatchSnafu { + column, + expected: column_type.to_string(), + actual: value_type.to_string(), + } + .fail() + } } - .fail(), } } else { let column_schema = schema_info @@ -591,6 +622,29 @@ pub(crate) fn values_to_row( Ok(Row { values: row }) } +/// Converts any ValueData variant to a StringValue using its natural string representation. +/// Used when a column is retroactively or prospectively widened to String via coerce_on_conflict. +fn value_data_to_string(value: ValueData) -> ValueData { + match value { + ValueData::StringValue(_) => value, + ValueData::I64Value(v) => ValueData::StringValue(v.to_string()), + ValueData::U64Value(v) => ValueData::StringValue(v.to_string()), + ValueData::F64Value(v) => ValueData::StringValue(v.to_string()), + ValueData::BoolValue(v) => ValueData::StringValue(v.to_string()), + ValueData::TimestampNanosecondValue(v) => ValueData::StringValue(v.to_string()), + ValueData::TimestampMillisecondValue(v) => ValueData::StringValue(v.to_string()), + ValueData::TimestampSecondValue(v) => ValueData::StringValue(v.to_string()), + ValueData::TimestampMicrosecondValue(v) => ValueData::StringValue(v.to_string()), + ValueData::BinaryValue(v) | ValueData::JsonValue(v) => { + let s = jsonb::from_slice(&v) + .map(|jv| jv.to_string()) + .unwrap_or_else(|_| String::from_utf8_lossy(&v).into_owned()); + ValueData::StringValue(s) + } + other => ValueData::StringValue(format!("{other:?}")), + } +} + fn decide_semantic(p_ctx: &PipelineContext, column_name: &str) -> SemanticType { if p_ctx.channel == Channel::Prometheus && column_name != greptime_value() { SemanticType::Tag @@ -735,6 +789,19 @@ fn resolve_value( } }; + // If this column was widened to String by coerce_on_conflict, stringify the value now. + // Newly added columns (index == None) can never be in coerced_to_string because coercion + // only triggers on existing columns where a mismatch is detected. + let value_data = if let Some(idx) = index { + if schema_info.coerced_to_string.contains(&idx) { + value_data.map(value_data_to_string) + } else { + value_data + } + } else { + value_data + }; + let value = GreptimeValue { value_data }; if let Some(index) = index { row[index] = value; @@ -829,6 +896,16 @@ fn identity_pipeline_inner( for _ in 0..diff { row.values.push(GreptimeValue { value_data: None }); } + + // Retroactively stringify values in rows that were written before a conflict was + // detected. Only non-null values need conversion; nulls stay null. + for &col_idx in &schema_info.coerced_to_string { + if let Some(cell) = row.values.get_mut(col_idx) { + if let Some(vd) = cell.value_data.take() { + cell.value_data = Some(value_data_to_string(vd)); + } + } + } } } @@ -1359,4 +1436,75 @@ mod tests { assert_eq!(ttl_1h_count, 2); // Alice and Bob with 1h TTL assert_eq!(ttl_24h_count, 1); // Charlie with 24h TTL } + + /// Verify that coerce_on_conflict=true widens a conflicting column to String and stringifies + /// both the conflicting value and all previously written values for that column in the batch. + #[test] + fn test_coerce_on_conflict() { + let params = GreptimePipelineParams::from_params(Some("coerce_on_conflict=true")); + let pipeline_ctx = PipelineContext::new( + &PipelineDefinition::GreptimeIdentityPipeline(None), + ¶ms, + Channel::Unknown, + ); + + // Row 1: user_id is an integer. Row 2: user_id is a string — conflict. + let array = vec![ + serde_json::json!({"user_id": 123, "event": "login"}), + serde_json::json!({"user_id": "unknown", "event": "auth_failure"}), + serde_json::json!({"user_id": 456, "event": "logout"}), + ]; + let array: Vec = array.iter().map(|v| v.into()).collect(); + let result = identity_pipeline(array, None, &pipeline_ctx); + assert!(result.is_ok(), "expected Ok but got: {:?}", result.err()); + + let mut rows_map = result.unwrap(); + assert_eq!(rows_map.len(), 1); + let rows = rows_map.drain().next().unwrap().1; + + // All three rows must survive. + assert_eq!(rows.rows.len(), 3); + + // Find the user_id column index from the schema. + let user_id_col = rows + .schema + .iter() + .position(|s| s.column_name == "user_id") + .expect("user_id column must be present"); + + // All three user_id values must be stored as strings. + let user_id_values: Vec<&str> = rows + .rows + .iter() + .map(|row| match row.values[user_id_col].value_data.as_ref() { + Some(ValueData::StringValue(s)) => s.as_str(), + other => panic!("expected StringValue, got {:?}", other), + }) + .collect(); + + assert_eq!(user_id_values, vec!["123", "unknown", "456"]); + } + + /// Without coerce_on_conflict, a type conflict must still return an error. + #[test] + fn test_no_coerce_on_conflict_still_errors() { + let params = GreptimePipelineParams::default(); + let pipeline_ctx = PipelineContext::new( + &PipelineDefinition::GreptimeIdentityPipeline(None), + ¶ms, + Channel::Unknown, + ); + + let array = vec![ + serde_json::json!({"user_id": 123}), + serde_json::json!({"user_id": "unknown"}), + ]; + let array: Vec = array.iter().map(|v| v.into()).collect(); + let result = identity_pipeline(array, None, &pipeline_ctx); + assert!(result.is_err()); + assert!( + result.unwrap_err().to_string().contains("Column datatype mismatch"), + "expected mismatch error" + ); + } }