Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
174 changes: 161 additions & 13 deletions src/pipeline/src/etl/transform/transformer/greptime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,9 @@
/// Max nested levels when flattening JSON object. Defaults to
/// `DEFAULT_MAX_NESTED_LEVELS_FOR_JSON_FLATTENING` when not provided.
pub max_nested_levels: OnceCell<usize>,
/// When true, type conflicts in identity pipeline are resolved by widening the column to String
/// instead of returning an error. Default false for backward compatibility.
pub coerce_on_conflict: OnceCell<bool>,
}

impl GreptimePipelineParams {
Expand All @@ -93,6 +96,7 @@
options,
skip_error: OnceCell::new(),
max_nested_levels: OnceCell::new(),
coerce_on_conflict: OnceCell::new(),
}
}

Expand All @@ -101,6 +105,7 @@
options,
skip_error: OnceCell::new(),
max_nested_levels: OnceCell::new(),
coerce_on_conflict: OnceCell::new(),
}
}

Expand Down Expand Up @@ -138,6 +143,13 @@
.unwrap_or(DEFAULT_MAX_NESTED_LEVELS_FOR_JSON_FLATTENING)
})
}

Check warning on line 146 in src/pipeline/src/etl/transform/transformer/greptime.rs

View workflow job for this annotation

GitHub Actions / Rustfmt

Diff in /home/runner/work/greptimedb/greptimedb/src/pipeline/src/etl/transform/transformer/greptime.rs
/// When true, type conflicts are resolved by widening the column to String rather than erroring.
pub fn coerce_on_conflict(&self) -> bool {
*self
.coerce_on_conflict
.get_or_init(|| self.options.get("coerce_on_conflict").map(truthy).unwrap_or(false))
}
}

impl GreptimeTransformer {
Expand Down Expand Up @@ -348,6 +360,9 @@
pub index: HashMap<String, usize>,
/// The pipeline's corresponding table (if already created). Useful to retrieve column schemas.
table: Option<Arc<Table>>,
/// Column indices that were widened to String due to coerce_on_conflict.
/// Used to retroactively stringify values in earlier rows within the same batch.
pub coerced_to_string: std::collections::HashSet<usize>,
}

impl SchemaInfo {
Expand All @@ -356,6 +371,7 @@
schema: Vec::with_capacity(capacity),
index: HashMap::with_capacity(capacity),
table: None,
coerced_to_string: std::collections::HashSet::new(),
}
}

Expand All @@ -368,6 +384,7 @@
schema: schema_list.into_iter().map(Into::into).collect(),
index,
table: None,
coerced_to_string: std::collections::HashSet::new(),
}
}

Expand Down Expand Up @@ -414,20 +431,34 @@
schema_info: &mut SchemaInfo,
) -> Result<()> {
if let Some(index) = index {
let column_type = &mut schema_info.schema[index].column_schema.data_type;
match (column_type, value_type) {
(column_type, value_type) if column_type == value_type => Ok(()),
(ConcreteDataType::Json(column_type), ConcreteDataType::Json(value_type))
if column_type.is_include(value_type) =>
{
Ok(())
}
(column_type, value_type) => IdentifyPipelineColumnTypeMismatchSnafu {
column,
expected: column_type.to_string(),
actual: value_type.to_string(),
// Clone to avoid holding &mut across the hashset mutation below.
let column_type = schema_info.schema[index].column_schema.data_type.clone();
match (&column_type, value_type) {
(a, b) if a == b => Ok(()),
(ConcreteDataType::Json(a), ConcreteDataType::Json(b)) if a.is_include(b) => Ok(()),
(column_type, value_type) => {
if pipeline_context.pipeline_param.coerce_on_conflict() {
if !schema_info.coerced_to_string.contains(&index) {

Check warning on line 441 in src/pipeline/src/etl/transform/transformer/greptime.rs

View workflow job for this annotation

GitHub Actions / Rustfmt

Diff in /home/runner/work/greptimedb/greptimedb/src/pipeline/src/etl/transform/transformer/greptime.rs
warn!(
"coerce_on_conflict: widening column '{}' from {} to String (got {})",
column,
column_type,
value_type
);
schema_info.coerced_to_string.insert(index);
}
schema_info.schema[index].column_schema.data_type =
ConcreteDataType::string_datatype();
Ok(())
} else {
IdentifyPipelineColumnTypeMismatchSnafu {
column,
expected: column_type.to_string(),
actual: value_type.to_string(),
}
.fail()
}
}
.fail(),
}
} else {
let column_schema = schema_info
Expand Down Expand Up @@ -591,6 +622,29 @@
Ok(Row { values: row })
}

/// Converts any ValueData variant to a StringValue using its natural string representation.
/// Used when a column is retroactively or prospectively widened to String via coerce_on_conflict.
fn value_data_to_string(value: ValueData) -> ValueData {
match value {
ValueData::StringValue(_) => value,
ValueData::I64Value(v) => ValueData::StringValue(v.to_string()),
ValueData::U64Value(v) => ValueData::StringValue(v.to_string()),
ValueData::F64Value(v) => ValueData::StringValue(v.to_string()),
ValueData::BoolValue(v) => ValueData::StringValue(v.to_string()),
ValueData::TimestampNanosecondValue(v) => ValueData::StringValue(v.to_string()),
ValueData::TimestampMillisecondValue(v) => ValueData::StringValue(v.to_string()),
ValueData::TimestampSecondValue(v) => ValueData::StringValue(v.to_string()),
ValueData::TimestampMicrosecondValue(v) => ValueData::StringValue(v.to_string()),
ValueData::BinaryValue(v) | ValueData::JsonValue(v) => {

Check failure on line 638 in src/pipeline/src/etl/transform/transformer/greptime.rs

View workflow job for this annotation

GitHub Actions / Check (ubuntu-latest)

mismatched types

Check failure on line 638 in src/pipeline/src/etl/transform/transformer/greptime.rs

View workflow job for this annotation

GitHub Actions / Clippy

mismatched types

Check failure on line 638 in src/pipeline/src/etl/transform/transformer/greptime.rs

View workflow job for this annotation

GitHub Actions / Check Unused Dependencies

mismatched types
let s = jsonb::from_slice(&v)
.map(|jv| jv.to_string())
.unwrap_or_else(|_| String::from_utf8_lossy(&v).into_owned());
ValueData::StringValue(s)
}
other => ValueData::StringValue(format!("{other:?}")),
}
}

fn decide_semantic(p_ctx: &PipelineContext, column_name: &str) -> SemanticType {
if p_ctx.channel == Channel::Prometheus && column_name != greptime_value() {
SemanticType::Tag
Expand Down Expand Up @@ -735,6 +789,19 @@
}
};

// If this column was widened to String by coerce_on_conflict, stringify the value now.
// Newly added columns (index == None) can never be in coerced_to_string because coercion
// only triggers on existing columns where a mismatch is detected.
let value_data = if let Some(idx) = index {
if schema_info.coerced_to_string.contains(&idx) {
value_data.map(value_data_to_string)
} else {
value_data
}
} else {
value_data
};
Comment on lines +795 to +803
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The else block for newly added columns is redundant. If index was None at the start of resolve_value, it means the column was not present in the schema for the current batch. While resolve_schema might add the column to the schema, it only inserts into coerced_to_string when a type mismatch is detected on an existing column (where index is Some). Therefore, a newly added column can never be in coerced_to_string during the same resolve_value call, making the extra lookup and check unnecessary.

    let value_data = if let Some(idx) = index {
        if schema_info.coerced_to_string.contains(&idx) {
            value_data.map(value_data_to_string)
        } else {
            value_data
        }
    } else {
        value_data
    };


let value = GreptimeValue { value_data };
if let Some(index) = index {
row[index] = value;
Expand Down Expand Up @@ -829,6 +896,16 @@
for _ in 0..diff {
row.values.push(GreptimeValue { value_data: None });
}

// Retroactively stringify values in rows that were written before a conflict was
// detected. Only non-null values need conversion; nulls stay null.
for &col_idx in &schema_info.coerced_to_string {
if let Some(cell) = row.values.get_mut(col_idx) {
if let Some(vd) = cell.value_data.take() {
cell.value_data = Some(value_data_to_string(vd));
}
}
}
}
}

Expand Down Expand Up @@ -1359,4 +1436,75 @@
assert_eq!(ttl_1h_count, 2); // Alice and Bob with 1h TTL
assert_eq!(ttl_24h_count, 1); // Charlie with 24h TTL
}

/// Verify that coerce_on_conflict=true widens a conflicting column to String and stringifies
/// both the conflicting value and all previously written values for that column in the batch.
#[test]
fn test_coerce_on_conflict() {
let params = GreptimePipelineParams::from_params(Some("coerce_on_conflict=true"));
let pipeline_ctx = PipelineContext::new(
&PipelineDefinition::GreptimeIdentityPipeline(None),
&params,
Channel::Unknown,
);

// Row 1: user_id is an integer. Row 2: user_id is a string — conflict.
let array = vec![
serde_json::json!({"user_id": 123, "event": "login"}),
serde_json::json!({"user_id": "unknown", "event": "auth_failure"}),
serde_json::json!({"user_id": 456, "event": "logout"}),
];
let array: Vec<VrlValue> = array.iter().map(|v| v.into()).collect();
let result = identity_pipeline(array, None, &pipeline_ctx);
assert!(result.is_ok(), "expected Ok but got: {:?}", result.err());

let mut rows_map = result.unwrap();
assert_eq!(rows_map.len(), 1);
let rows = rows_map.drain().next().unwrap().1;

// All three rows must survive.
assert_eq!(rows.rows.len(), 3);

// Find the user_id column index from the schema.
let user_id_col = rows
.schema
.iter()
.position(|s| s.column_name == "user_id")
.expect("user_id column must be present");

// All three user_id values must be stored as strings.
let user_id_values: Vec<&str> = rows
.rows
.iter()
.map(|row| match row.values[user_id_col].value_data.as_ref() {
Some(ValueData::StringValue(s)) => s.as_str(),
other => panic!("expected StringValue, got {:?}", other),
})
.collect();

assert_eq!(user_id_values, vec!["123", "unknown", "456"]);
}

/// Without coerce_on_conflict, a type conflict must still return an error.
#[test]
fn test_no_coerce_on_conflict_still_errors() {
let params = GreptimePipelineParams::default();
let pipeline_ctx = PipelineContext::new(
&PipelineDefinition::GreptimeIdentityPipeline(None),
&params,
Channel::Unknown,
);

let array = vec![
serde_json::json!({"user_id": 123}),
serde_json::json!({"user_id": "unknown"}),
];
let array: Vec<VrlValue> = array.iter().map(|v| v.into()).collect();
let result = identity_pipeline(array, None, &pipeline_ctx);

Check warning on line 1503 in src/pipeline/src/etl/transform/transformer/greptime.rs

View workflow job for this annotation

GitHub Actions / Rustfmt

Diff in /home/runner/work/greptimedb/greptimedb/src/pipeline/src/etl/transform/transformer/greptime.rs
assert!(result.is_err());
assert!(
result.unwrap_err().to_string().contains("Column datatype mismatch"),
"expected mismatch error"
);
}
}
Loading