diff --git a/src/index/src/target.rs b/src/index/src/target.rs index cca0a2819232..1f2d45276b22 100644 --- a/src/index/src/target.rs +++ b/src/index/src/target.rs @@ -14,24 +14,94 @@ use std::any::Any; use std::fmt::{self, Display}; +use std::str::FromStr; use common_error::ext::ErrorExt; use common_error::status_code::StatusCode; use common_macro::stack_trace_debug; use serde::{Deserialize, Serialize}; -use snafu::{Snafu, ensure}; +use snafu::{OptionExt, Snafu, ensure}; use store_api::storage::ColumnId; /// Describes an index target. Column ids are the only supported variant for now. #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] pub enum IndexTarget { ColumnId(ColumnId), + SubField { + column_id: ColumnId, + path: Vec, + value_type: IndexValueType, + }, +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] +pub enum IndexValueType { + #[serde(rename = "string")] + String, + #[serde(rename = "i64")] + I64, + #[serde(rename = "u64")] + U64, + #[serde(rename = "f64")] + F64, + #[serde(rename = "bool")] + Bool, + #[serde(rename = "binary")] + Binary, + #[serde(rename = "ts_ms")] + TimestampMs, + #[serde(rename = "date32")] + Date32, +} + +impl Display for IndexValueType { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + let s = match self { + IndexValueType::String => "string", + IndexValueType::I64 => "i64", + IndexValueType::U64 => "u64", + IndexValueType::F64 => "f64", + IndexValueType::Bool => "bool", + IndexValueType::Binary => "binary", + IndexValueType::TimestampMs => "ts_ms", + IndexValueType::Date32 => "date32", + }; + write!(f, "{s}") + } +} + +impl FromStr for IndexValueType { + type Err = TargetKeyError; + + fn from_str(s: &str) -> Result { + match s { + "string" => Ok(IndexValueType::String), + "i64" => Ok(IndexValueType::I64), + "u64" => Ok(IndexValueType::U64), + "f64" => Ok(IndexValueType::F64), + "bool" => Ok(IndexValueType::Bool), + "binary" => Ok(IndexValueType::Binary), + "ts_ms" => Ok(IndexValueType::TimestampMs), + "date32" => Ok(IndexValueType::Date32), + _ => InvalidIndexValueTypeSnafu { + value_type: s.to_string(), + } + .fail(), + } + } } impl Display for IndexTarget { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { match self { IndexTarget::ColumnId(id) => write!(f, "{}", id), + IndexTarget::SubField { + column_id, + path, + value_type, + } => { + write!(f, "sub:{}:{}:{}", column_id, value_type, encode_path(path)) + } } } } @@ -39,6 +109,10 @@ impl Display for IndexTarget { impl IndexTarget { /// Parse a target key string back into an index target description. pub fn decode(key: &str) -> Result { + if let Some(rest) = key.strip_prefix("sub:") { + return decode_subfield_target(rest); + } + validate_column_key(key)?; let id = key .parse::() @@ -59,6 +133,15 @@ pub enum TargetKeyError { #[snafu(display("failed to parse column id from '{value}'"))] InvalidColumnId { value: String }, + + #[snafu(display("invalid subfield target key: {key}"))] + InvalidSubfieldTargetKey { key: String }, + + #[snafu(display("invalid index value type: {value_type}"))] + InvalidIndexValueType { value_type: String }, + + #[snafu(display("invalid subfield path segment: {segment}"))] + InvalidSubfieldPathSegment { segment: String }, } impl ErrorExt for TargetKeyError { @@ -80,6 +163,85 @@ fn validate_column_key(key: &str) -> Result<(), TargetKeyError> { Ok(()) } +fn decode_subfield_target(rest: &str) -> Result { + let mut parts = rest.splitn(3, ':'); + let col = parts + .next() + .context(InvalidSubfieldTargetKeySnafu { key: rest })?; + let value_type = parts + .next() + .context(InvalidSubfieldTargetKeySnafu { key: rest })?; + let encoded_path = parts + .next() + .context(InvalidSubfieldTargetKeySnafu { key: rest })?; + + validate_column_key(col)?; + let column_id = col + .parse::() + .map_err(|_| InvalidColumnIdSnafu { value: col }.build())?; + let value_type = IndexValueType::from_str(value_type)?; + let path = decode_path(encoded_path)?; + + Ok(IndexTarget::SubField { + column_id, + path, + value_type, + }) +} + +fn encode_path(path: &[String]) -> String { + path.iter() + .map(|seg| seg.replace('\\', "\\\\").replace('.', "\\.")) + .collect::>() + .join(".") +} + +fn decode_path(encoded: &str) -> Result, TargetKeyError> { + if encoded.is_empty() { + return Ok(Vec::new()); + } + + let mut parts = Vec::new(); + let mut current = String::new(); + let mut escaped = false; + for ch in encoded.chars() { + if escaped { + current.push(ch); + escaped = false; + continue; + } + if ch == '\\' { + escaped = true; + continue; + } + if ch == '.' { + if current.is_empty() { + return InvalidSubfieldPathSegmentSnafu { + segment: encoded.to_string(), + } + .fail(); + } + parts.push(std::mem::take(&mut current)); + continue; + } + current.push(ch); + } + if escaped { + return InvalidSubfieldPathSegmentSnafu { + segment: encoded.to_string(), + } + .fail(); + } + if current.is_empty() { + return InvalidSubfieldPathSegmentSnafu { + segment: encoded.to_string(), + } + .fail(); + } + parts.push(current); + Ok(parts) +} + #[cfg(test)] mod tests { use super::*; @@ -104,4 +266,17 @@ mod tests { let err = IndexTarget::decode("1a2").unwrap_err(); assert!(matches!(err, TargetKeyError::InvalidCharacters { .. })); } + + #[test] + fn encode_decode_subfield() { + let target = IndexTarget::SubField { + column_id: 42, + path: vec!["a".to_string(), "b.c".to_string()], + value_type: IndexValueType::String, + }; + let key = format!("{}", target); + assert_eq!(key, "sub:42:string:a.b\\.c"); + let decoded = IndexTarget::decode(&key).unwrap(); + assert_eq!(decoded, target); + } } diff --git a/src/mito2/src/engine/puffin_index.rs b/src/mito2/src/engine/puffin_index.rs index 281b619bc519..50d1a60f53e0 100644 --- a/src/mito2/src/engine/puffin_index.rs +++ b/src/mito2/src/engine/puffin_index.rs @@ -370,6 +370,14 @@ fn decode_target_info(target_key: &str) -> (String, String) { TARGET_TYPE_COLUMN.to_string(), json!({ "column": id }).to_string(), ), + Ok(IndexTarget::SubField { + column_id, + path, + value_type, + }) => ( + "subfield".to_string(), + json!({ "column": column_id, "path": path, "type": value_type }).to_string(), + ), _ => ( TARGET_TYPE_UNKNOWN.to_string(), json!({ "error": "failed_to_decode" }).to_string(), @@ -380,6 +388,7 @@ fn decode_target_info(target_key: &str) -> (String, String) { fn decode_column_id(target_key: &str) -> Option { match IndexTarget::decode(target_key) { Ok(IndexTarget::ColumnId(id)) => Some(id), + Ok(IndexTarget::SubField { column_id, .. }) => Some(column_id), _ => None, } } diff --git a/src/mito2/src/region/options.rs b/src/mito2/src/region/options.rs index fcf68a921664..6f7f033bcea8 100644 --- a/src/mito2/src/region/options.rs +++ b/src/mito2/src/region/options.rs @@ -23,6 +23,7 @@ use common_base::readable_size::ReadableSize; use common_stat::get_total_memory_readable; use common_time::TimeToLive; use common_wal::options::{WAL_OPTIONS_KEY, WalOptions}; +use index::target::IndexValueType; use serde::de::Error as _; use serde::{Deserialize, Deserializer, Serialize}; use serde_json::Value; @@ -317,6 +318,13 @@ pub struct InvertedIndexOptions { /// The number of rows in a segment. #[serde_as(as = "DisplayFromStr")] pub segment_row_count: usize, + + /// Nested sub-field index targets encoded as JSON string. + /// Example: + /// `[{"column_id":1,"path":["a","b"],"value_type":"string"}]` + #[serde(deserialize_with = "deserialize_sub_fields")] + #[serde(serialize_with = "serialize_sub_fields")] + pub sub_fields: Vec, } impl Default for InvertedIndexOptions { @@ -324,10 +332,18 @@ impl Default for InvertedIndexOptions { Self { ignore_column_ids: Vec::new(), segment_row_count: DEFAULT_INDEX_SEGMENT_ROW_COUNT, + sub_fields: Vec::new(), } } } +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +pub struct InvertedSubfieldIndexOption { + pub column_id: ColumnId, + pub path: Vec, + pub value_type: IndexValueType, +} + /// Options for region level memtable. #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] #[serde(tag = "memtable.type", rename_all = "snake_case")] @@ -415,6 +431,30 @@ where serializer.serialize_str(&s) } +fn deserialize_sub_fields<'de, D>( + deserializer: D, +) -> Result, D::Error> +where + D: Deserializer<'de>, +{ + let s: String = Deserialize::deserialize(deserializer)?; + if s.is_empty() { + return Ok(Vec::new()); + } + serde_json::from_str(&s).map_err(D::Error::custom) +} + +fn serialize_sub_fields( + sub_fields: &[InvertedSubfieldIndexOption], + serializer: S, +) -> Result +where + S: serde::Serializer, +{ + let s = serde_json::to_string(sub_fields).map_err(serde::ser::Error::custom)?; + serializer.serialize_str(&s) +} + /// Converts the `options` map to a json object. /// /// Replaces "null" strings by `null` json values. @@ -560,6 +600,7 @@ mod tests { inverted_index: InvertedIndexOptions { ignore_column_ids: vec![1, 2, 3], segment_row_count: 512, + sub_fields: vec![], }, }, ..Default::default() @@ -567,6 +608,21 @@ mod tests { assert_eq!(expect, options); } + #[test] + fn test_with_inverted_index_sub_fields() { + let sub_fields = r#"[{"column_id":1,"path":["a","b"],"value_type":"string"}]"#; + let map = make_map(&[("index.inverted_index.sub_fields", sub_fields)]); + let options = RegionOptions::try_from(&map).unwrap(); + assert_eq!( + vec![InvertedSubfieldIndexOption { + column_id: 1, + path: vec!["a".to_string(), "b".to_string()], + value_type: IndexValueType::String, + }], + options.index_options.inverted_index.sub_fields + ); + } + // No need to add compatible tests for RegionOptions since the above tests already check for compatibility. #[test] fn test_with_any_wal_options() { @@ -667,6 +723,7 @@ mod tests { inverted_index: InvertedIndexOptions { ignore_column_ids: vec![1, 2, 3], segment_row_count: 512, + sub_fields: vec![], }, }, memtable: Some(MemtableOptions::PartitionTree(PartitionTreeOptions { @@ -702,6 +759,7 @@ mod tests { inverted_index: InvertedIndexOptions { ignore_column_ids: vec![1, 2, 3], segment_row_count: 512, + sub_fields: vec![], }, }, memtable: Some(MemtableOptions::PartitionTree(PartitionTreeOptions { @@ -767,6 +825,7 @@ mod tests { inverted_index: InvertedIndexOptions { ignore_column_ids: vec![], segment_row_count: 512, + sub_fields: vec![], }, }, memtable: Some(MemtableOptions::PartitionTree(PartitionTreeOptions { diff --git a/src/mito2/src/sst/index.rs b/src/mito2/src/sst/index.rs index 31a96eecea11..c76c7a999aa1 100644 --- a/src/mito2/src/sst/index.rs +++ b/src/mito2/src/sst/index.rs @@ -32,6 +32,7 @@ use bloom_filter::creator::BloomFilterIndexer; use common_telemetry::{debug, error, info, warn}; use datatypes::arrow::array::BinaryArray; use datatypes::arrow::record_batch::RecordBatch; +use index::target::IndexTarget; use mito_codec::index::IndexValuesCodec; use mito_codec::row_converter::CompositeValues; use object_store::ObjectStore; @@ -383,9 +384,34 @@ impl IndexerBuilderImpl { let indexed_column_ids = self.metadata.inverted_indexed_column_ids( self.index_options.inverted_index.ignore_column_ids.iter(), ); - if indexed_column_ids.is_empty() { + let ignored = self + .index_options + .inverted_index + .ignore_column_ids + .iter() + .copied() + .collect::>(); + let subfield_targets = self + .index_options + .inverted_index + .sub_fields + .iter() + .filter(|sub| !ignored.contains(&sub.column_id)) + .map(|sub| IndexTarget::SubField { + column_id: sub.column_id, + path: sub.path.clone(), + value_type: sub.value_type, + }) + .collect::>(); + let mut indexed_targets = indexed_column_ids + .iter() + .copied() + .map(IndexTarget::ColumnId) + .collect::>(); + indexed_targets.extend(subfield_targets.iter().cloned()); + if indexed_targets.is_empty() { debug!( - "No columns to be indexed, skip creating inverted index, region_id: {}, file_id: {}", + "No targets to be indexed, skip creating inverted index, region_id: {}, file_id: {}", self.metadata.region_id, file_id, ); return None; @@ -414,14 +440,25 @@ impl IndexerBuilderImpl { segment_row_count = row_group_size; } - let indexer = InvertedIndexer::new( - file_id, - &self.metadata, - self.intermediate_manager.clone(), - self.inverted_index_config.mem_threshold_on_create(), - segment_row_count, - indexed_column_ids, - ); + let indexer = if subfield_targets.is_empty() { + InvertedIndexer::new( + file_id, + &self.metadata, + self.intermediate_manager.clone(), + self.inverted_index_config.mem_threshold_on_create(), + segment_row_count, + indexed_column_ids, + ) + } else { + InvertedIndexer::new_with_targets( + file_id, + &self.metadata, + self.intermediate_manager.clone(), + self.inverted_index_config.mem_threshold_on_create(), + segment_row_count, + indexed_targets, + ) + }; Some(indexer) } diff --git a/src/mito2/src/sst/index/inverted_index/creator.rs b/src/mito2/src/sst/index/inverted_index/creator.rs index 386ee11b9b29..15da770ce8b5 100644 --- a/src/mito2/src/sst/index/inverted_index/creator.rs +++ b/src/mito2/src/sst/index/inverted_index/creator.rs @@ -19,13 +19,16 @@ use std::sync::atomic::AtomicUsize; use api::v1::SemanticType; use common_telemetry::{debug, warn}; +use common_time::{Date, Timestamp}; use datatypes::arrow::record_batch::RecordBatch; +use datatypes::data_type::ConcreteDataType; +use datatypes::value::{Value, ValueRef}; use datatypes::vectors::Helper; use index::inverted_index::create::InvertedIndexCreator; use index::inverted_index::create::sort::external_sort::ExternalSorter; use index::inverted_index::create::sort_create::SortIndexCreator; use index::inverted_index::format::writer::InvertedIndexBlobWriter; -use index::target::IndexTarget; +use index::target::{IndexTarget, IndexValueType}; use mito_codec::index::{IndexValueCodec, IndexValuesCodec}; use mito_codec::row_converter::{CompositeValues, SortField}; use puffin::puffin_manager::{PuffinWriter, PutOptions}; @@ -75,13 +78,19 @@ pub struct InvertedIndexer { /// The memory usage of the index creator. memory_usage: Arc, - /// Ids of indexed columns and their encoded target keys. - indexed_column_ids: Vec<(ColumnId, String)>, + /// Indexed targets and their encoded target keys. + indexed_targets: Vec, /// Region metadata for column lookups. metadata: RegionMetadataRef, } +#[derive(Clone)] +struct IndexedTarget { + target: IndexTarget, + target_key: String, +} + impl InvertedIndexer { /// Creates a new `InvertedIndexer`. /// Should ensure that the number of tag columns is greater than 0. @@ -92,6 +101,28 @@ impl InvertedIndexer { memory_usage_threshold: Option, segment_row_count: NonZeroUsize, indexed_column_ids: HashSet, + ) -> Self { + let indexed_targets = indexed_column_ids + .into_iter() + .map(IndexTarget::ColumnId) + .collect::>(); + Self::new_with_targets( + sst_file_id, + metadata, + intermediate_manager, + memory_usage_threshold, + segment_row_count, + indexed_targets, + ) + } + + pub fn new_with_targets( + sst_file_id: FileId, + metadata: &RegionMetadataRef, + intermediate_manager: IntermediateManager, + memory_usage_threshold: Option, + segment_row_count: NonZeroUsize, + indexed_targets: Vec, ) -> Self { let temp_file_provider = Arc::new(TempFileProvider::new( IntermediateLocation::new(&metadata.region_id, &sst_file_id), @@ -112,11 +143,11 @@ impl InvertedIndexer { metadata.primary_key_encoding, metadata.primary_key_columns(), ); - let indexed_column_ids = indexed_column_ids + let indexed_targets = indexed_targets .into_iter() - .map(|col_id| { - let target_key = format!("{}", IndexTarget::ColumnId(col_id)); - (col_id, target_key) + .map(|target| { + let target_key = format!("{}", target); + IndexedTarget { target, target_key } }) .collect(); Self { @@ -127,7 +158,7 @@ impl InvertedIndexer { stats: Statistics::new(TYPE_INVERTED_INDEX), aborted: false, memory_usage, - indexed_column_ids, + indexed_targets, metadata: metadata.clone(), } } @@ -175,8 +206,12 @@ impl InvertedIndexer { let is_sparse = self.metadata.primary_key_encoding == PrimaryKeyEncoding::Sparse; let mut decoded_pks: Option> = None; - for (col_id, target_key) in &self.indexed_column_ids { - let Some(column_meta) = self.metadata.column_by_id(*col_id) else { + for indexed_target in &self.indexed_targets { + let (col_id, target_key) = match &indexed_target.target { + IndexTarget::ColumnId(col_id) => (*col_id, &indexed_target.target_key), + IndexTarget::SubField { column_id, .. } => (*column_id, &indexed_target.target_key), + }; + let Some(column_meta) = self.metadata.column_by_id(col_id) else { debug!( "Column {} not found in the metadata during building inverted index", col_id @@ -188,28 +223,63 @@ impl InvertedIndexer { // Convert Arrow array to VectorRef using Helper let vector = Helper::try_into_vector(column_array.clone()) .context(crate::error::ConvertVectorSnafu)?; + let target_sort_field = match &indexed_target.target { + IndexTarget::ColumnId(_) => None, + IndexTarget::SubField { value_type, .. } => { + Some(SortField::new(index_value_type_to_datatype(*value_type))) + } + }; let sort_field = SortField::new(vector.data_type()); for row in 0..batch.num_rows() { self.value_buf.clear(); let value_ref = vector.get_ref(row); - - if value_ref.is_null() { - self.index_creator - .push_with_name(target_key, None) - .await - .context(PushIndexValueSnafu)?; - } else { - IndexValueCodec::encode_nonnull_value( - value_ref, - &sort_field, - &mut self.value_buf, - ) - .context(EncodeSnafu)?; - self.index_creator - .push_with_name(target_key, Some(&self.value_buf)) - .await - .context(PushIndexValueSnafu)?; + match &indexed_target.target { + IndexTarget::ColumnId(_) => { + if value_ref.is_null() { + self.index_creator + .push_with_name(target_key, None) + .await + .context(PushIndexValueSnafu)?; + } else { + IndexValueCodec::encode_nonnull_value( + value_ref, + &sort_field, + &mut self.value_buf, + ) + .context(EncodeSnafu)?; + self.index_creator + .push_with_name(target_key, Some(&self.value_buf)) + .await + .context(PushIndexValueSnafu)?; + } + } + IndexTarget::SubField { + path, value_type, .. + } => { + let field = target_sort_field + .as_ref() + .expect("subfield should have target sort field"); + let Some(value) = extract_subfield_value(value_ref, path) + .and_then(|value| cast_value_for_index_type(value, *value_type)) + else { + self.index_creator + .push_with_name(target_key, None) + .await + .context(PushIndexValueSnafu)?; + continue; + }; + IndexValueCodec::encode_nonnull_value( + value.as_value_ref(), + field, + &mut self.value_buf, + ) + .context(EncodeSnafu)?; + self.index_creator + .push_with_name(target_key, Some(&self.value_buf)) + .await + .context(PushIndexValueSnafu)?; + } } } } else if is_sparse && column_meta.semantic_type == SemanticType::Tag { @@ -219,7 +289,7 @@ impl InvertedIndexer { } let pk_values_with_counts = decoded_pks.as_ref().unwrap(); - let Some(col_info) = self.codec.pk_col_info(*col_id) else { + let Some(col_info) = self.codec.pk_col_info(col_id) else { debug!( "Column {} not found in primary key during building bloom filter index", column_name @@ -231,7 +301,7 @@ impl InvertedIndexer { for (decoded, count) in pk_values_with_counts { let value = match decoded { CompositeValues::Dense(dense) => dense.get(pk_index).map(|v| &v.1), - CompositeValues::Sparse(sparse) => sparse.get(col_id), + CompositeValues::Sparse(sparse) => sparse.get(&col_id), }; let elem = value @@ -306,64 +376,114 @@ impl InvertedIndexer { let n = batch.num_rows(); guard.inc_row_count(n); - for (col_id, target_key) in &self.indexed_column_ids { - match self.codec.pk_col_info(*col_id) { - // pk - Some(col_info) => { - let pk_idx = col_info.idx; - let field = &col_info.field; - let value = batch - .pk_col_value(self.codec.decoder(), pk_idx, *col_id)? - .filter(|v| !v.is_null()) - .map(|v| { + for indexed_target in &self.indexed_targets { + if matches!(indexed_target.target, IndexTarget::ColumnId(_)) { + let col_id = match &indexed_target.target { + IndexTarget::ColumnId(col_id) => *col_id, + IndexTarget::SubField { .. } => unreachable!(), + }; + let target_key = &indexed_target.target_key; + match self.codec.pk_col_info(col_id) { + // pk + Some(col_info) => { + let pk_idx = col_info.idx; + let field = &col_info.field; + let value = batch + .pk_col_value(self.codec.decoder(), pk_idx, col_id)? + .filter(|v| !v.is_null()) + .map(|v| { + self.value_buf.clear(); + IndexValueCodec::encode_nonnull_value( + v.as_value_ref(), + field, + &mut self.value_buf, + ) + .context(EncodeSnafu)?; + Ok(self.value_buf.as_slice()) + }) + .transpose()?; + + self.index_creator + .push_with_name_n(target_key, value, n) + .await + .context(PushIndexValueSnafu)?; + } + // fields + None => { + let Some(values) = batch.field_col_value(col_id) else { + debug!( + "Column {} not found in the batch during building inverted index", + col_id + ); + continue; + }; + let sort_field = SortField::new(values.data.data_type()); + for i in 0..n { self.value_buf.clear(); - IndexValueCodec::encode_nonnull_value( - v.as_value_ref(), - field, - &mut self.value_buf, - ) - .context(EncodeSnafu)?; - Ok(self.value_buf.as_slice()) - }) - .transpose()?; + let value = values.data.get_ref(i); + if value.is_null() { + self.index_creator + .push_with_name(target_key, None) + .await + .context(PushIndexValueSnafu)?; + } else { + IndexValueCodec::encode_nonnull_value( + value, + &sort_field, + &mut self.value_buf, + ) + .context(EncodeSnafu)?; + self.index_creator + .push_with_name(target_key, Some(&self.value_buf)) + .await + .context(PushIndexValueSnafu)?; + } + } + } + } + } else { + let (col_id, path, value_type) = match &indexed_target.target { + IndexTarget::ColumnId(_) => unreachable!(), + IndexTarget::SubField { + column_id, + path, + value_type, + } => (*column_id, path.as_slice(), *value_type), + }; + let target_key = &indexed_target.target_key; + let Some(values) = batch.field_col_value(col_id) else { + debug!( + "Column {} not found in the batch during building inverted index", + col_id + ); + continue; + }; + let sort_field = SortField::new(index_value_type_to_datatype(value_type)); + + for i in 0..n { + self.value_buf.clear(); + let value = values.data.get_ref(i); + let Some(value) = extract_subfield_value(value, path) + .and_then(|value| cast_value_for_index_type(value, value_type)) + else { + self.index_creator + .push_with_name(target_key, None) + .await + .context(PushIndexValueSnafu)?; + continue; + }; + IndexValueCodec::encode_nonnull_value( + value.as_value_ref(), + &sort_field, + &mut self.value_buf, + ) + .context(EncodeSnafu)?; self.index_creator - .push_with_name_n(target_key, value, n) + .push_with_name(target_key, Some(&self.value_buf)) .await .context(PushIndexValueSnafu)?; } - // fields - None => { - let Some(values) = batch.field_col_value(*col_id) else { - debug!( - "Column {} not found in the batch during building inverted index", - col_id - ); - continue; - }; - let sort_field = SortField::new(values.data.data_type()); - for i in 0..n { - self.value_buf.clear(); - let value = values.data.get_ref(i); - if value.is_null() { - self.index_creator - .push_with_name(target_key, None) - .await - .context(PushIndexValueSnafu)?; - } else { - IndexValueCodec::encode_nonnull_value( - value, - &sort_field, - &mut self.value_buf, - ) - .context(EncodeSnafu)?; - self.index_creator - .push_with_name(target_key, Some(&self.value_buf)) - .await - .context(PushIndexValueSnafu)?; - } - } - } } } @@ -433,7 +553,12 @@ impl InvertedIndexer { } pub fn column_ids(&self) -> impl Iterator + '_ { - self.indexed_column_ids.iter().map(|(col_id, _)| *col_id) + self.indexed_targets + .iter() + .map(|indexed_target| match &indexed_target.target { + IndexTarget::ColumnId(col_id) => *col_id, + IndexTarget::SubField { column_id, .. } => *column_id, + }) } pub fn memory_usage(&self) -> usize { @@ -441,6 +566,81 @@ impl InvertedIndexer { } } +fn index_value_type_to_datatype(value_type: IndexValueType) -> ConcreteDataType { + match value_type { + IndexValueType::String => ConcreteDataType::string_datatype(), + IndexValueType::I64 => ConcreteDataType::int64_datatype(), + IndexValueType::U64 => ConcreteDataType::uint64_datatype(), + IndexValueType::F64 => ConcreteDataType::float64_datatype(), + IndexValueType::Bool => ConcreteDataType::boolean_datatype(), + IndexValueType::Binary => ConcreteDataType::binary_datatype(), + IndexValueType::TimestampMs => ConcreteDataType::timestamp_millisecond_datatype(), + IndexValueType::Date32 => ConcreteDataType::date_datatype(), + } +} + +fn extract_subfield_value(root: ValueRef<'_>, path: &[String]) -> Option { + let mut current = Value::from(root); + for segment in path { + current = match current { + Value::Struct(struct_value) => { + let fields = struct_value.struct_type().fields(); + let index = fields.iter().position(|field| field.name() == segment)?; + struct_value.items().get(index).cloned()? + } + Value::Json(_) => { + let mut json: serde_json::Value = current.try_into().ok()?; + let obj = json.as_object_mut()?; + json_value_to_value(obj.remove(segment)?)? + } + _ => return None, + }; + } + Some(current) +} + +fn json_value_to_value(json: serde_json::Value) -> Option { + match json { + serde_json::Value::Null => Some(Value::Null), + serde_json::Value::Bool(v) => Some(Value::Boolean(v)), + serde_json::Value::Number(v) => { + if let Some(i) = v.as_i64() { + Some(Value::Int64(i)) + } else if let Some(u) = v.as_u64() { + Some(Value::UInt64(u)) + } else { + v.as_f64().map(|f| Value::Float64(f.into())) + } + } + serde_json::Value::String(v) => Some(Value::String(v.into())), + serde_json::Value::Array(_) | serde_json::Value::Object(_) => None, + } +} + +fn cast_value_for_index_type(value: Value, value_type: IndexValueType) -> Option { + match value_type { + IndexValueType::String => value.as_string().map(|v| Value::String(v.into())), + IndexValueType::I64 => value.as_i64().map(Value::Int64), + IndexValueType::U64 => value.as_u64().map(Value::UInt64), + IndexValueType::F64 => value.as_f64_lossy().map(|v| Value::Float64(v.into())), + IndexValueType::Bool => value.as_bool().map(Value::Boolean), + IndexValueType::Binary => match value { + Value::Binary(v) => Some(Value::Binary(v)), + _ => None, + }, + IndexValueType::TimestampMs => match value { + Value::Timestamp(v) => Some(Value::Timestamp(v)), + Value::Int64(v) => Some(Value::Timestamp(Timestamp::new_millisecond(v))), + _ => None, + }, + IndexValueType::Date32 => match value { + Value::Date(v) => Some(Value::Date(v)), + Value::Int32(v) => Some(Value::Date(Date::new(v))), + _ => None, + }, + } +} + #[cfg(test)] mod tests { use std::collections::BTreeSet; @@ -448,15 +648,19 @@ mod tests { use api::v1::SemanticType; use datafusion_expr::{Expr as DfExpr, Operator, binary_expr, col, lit}; use datatypes::data_type::ConcreteDataType; + use datatypes::prelude::ScalarVectorBuilder; use datatypes::schema::ColumnSchema; - use datatypes::value::ValueRef; - use datatypes::vectors::{UInt8Vector, UInt64Vector}; + use datatypes::types::{StructField, StructType}; + use datatypes::value::{StructValueRef, ValueRef}; + use datatypes::vectors::{StructVectorBuilder, UInt8Vector, UInt64Vector}; use futures::future::BoxFuture; + use index::inverted_index::format::reader::{InvertedIndexBlobReader, InvertedIndexReader}; + use index::target::{IndexTarget, IndexValueType}; use mito_codec::row_converter::{DensePrimaryKeyCodec, PrimaryKeyCodecExt}; use object_store::ObjectStore; use object_store::services::Memory; - use puffin::puffin_manager::PuffinManager; use puffin::puffin_manager::cache::PuffinMetadataCache; + use puffin::puffin_manager::{PuffinManager, PuffinReader}; use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder}; use store_api::region_request::PathType; use store_api::storage::RegionId; @@ -517,6 +721,23 @@ mod tests { semantic_type: SemanticType::Field, column_id: 4, }) + .push_column_metadata(ColumnMetadata { + column_schema: ColumnSchema::new( + "field_obj", + ConcreteDataType::struct_datatype(StructType::new(Arc::new(vec![ + StructField::new( + "a", + ConcreteDataType::struct_datatype(StructType::new(Arc::new(vec![ + StructField::new("b", ConcreteDataType::string_datatype(), true), + ]))), + true, + ), + ]))), + true, + ), + semantic_type: SemanticType::Field, + column_id: 5, + }) .primary_key(vec![1, 2]); Arc::new(builder.build().unwrap()) @@ -557,6 +778,58 @@ mod tests { .unwrap() } + fn new_batch_with_struct( + str_tag: impl AsRef, + i32_tag: impl Into, + obj_values: impl IntoIterator, + ) -> Batch { + let fields = vec![ + (0, SortField::new(ConcreteDataType::string_datatype())), + (1, SortField::new(ConcreteDataType::int32_datatype())), + ]; + let codec = DensePrimaryKeyCodec::with_fields(fields); + let row: [ValueRef; 2] = [str_tag.as_ref().into(), i32_tag.into().into()]; + let primary_key = codec.encode(row.into_iter()).unwrap(); + + let inner_type = StructType::new(Arc::new(vec![StructField::new( + "b", + ConcreteDataType::string_datatype(), + true, + )])); + let root_type = StructType::new(Arc::new(vec![StructField::new( + "a", + ConcreteDataType::struct_datatype(inner_type.clone()), + true, + )])); + + let mut builder = StructVectorBuilder::with_type_and_capacity(root_type.clone(), 8); + let mut rows = 0; + for v in obj_values { + rows += 1; + builder.push(Some(StructValueRef::RefList { + val: vec![ValueRef::Struct(StructValueRef::RefList { + val: vec![ValueRef::String(v)], + fields: inner_type.clone(), + })], + fields: root_type.clone(), + })); + } + + let obj_field = BatchColumn { + column_id: 5, + data: Arc::new(builder.finish()), + }; + + Batch::new( + primary_key, + Arc::new(UInt64Vector::from_iter_values(std::iter::repeat_n(0, rows))), + Arc::new(UInt64Vector::from_iter_values(std::iter::repeat_n(0, rows))), + Arc::new(UInt8Vector::from_iter_values(std::iter::repeat_n(1, rows))), + vec![obj_field], + ) + .unwrap() + } + async fn build_applier_factory( prefix: &str, rows: BTreeSet<(&'static str, i32, [u64; 2])>, @@ -828,4 +1101,53 @@ mod tests { let res = applier_factory(expr).await; assert_eq!(res, vec![0, 1, 2]); } + + #[tokio::test] + async fn test_create_subfield_index_target_only() { + let (d, factory) = + PuffinManagerFactory::new_for_test_async("test_create_subfield_index_target_only_") + .await; + let table_dir = "table0".to_string(); + let sst_file_id = FileId::random(); + let object_store = mock_object_store(); + let region_metadata = mock_region_metadata(); + let intm_mgr = new_intm_mgr(d.path().to_string_lossy()).await; + let segment_row_count = 2; + let target = IndexTarget::SubField { + column_id: 5, + path: vec!["a".to_string(), "b".to_string()], + value_type: IndexValueType::String, + }; + let target_key = format!("{target}"); + + let mut creator = InvertedIndexer::new_with_targets( + sst_file_id, + ®ion_metadata, + intm_mgr, + None, + NonZeroUsize::new(segment_row_count).unwrap(), + vec![target], + ); + + let mut batch = new_batch_with_struct("aaa", 1, ["x", "y"]); + creator.update(&mut batch).await.unwrap(); + + let puffin_manager = factory.build( + object_store.clone(), + RegionFilePathFactory::new(table_dir, PathType::Bare), + ); + let sst_file_id = RegionFileId::new(region_metadata.region_id, sst_file_id); + let index_id = RegionIndexId::new(sst_file_id, 0); + let mut writer = puffin_manager.writer(&index_id).await.unwrap(); + let (row_count, _) = creator.finish(&mut writer).await.unwrap(); + assert_eq!(row_count, 2); + writer.finish().await.unwrap(); + + let reader = puffin_manager.reader(&index_id).await.unwrap(); + let blob_guard = reader.blob(INDEX_BLOB_TYPE).await.unwrap(); + let blob_reader = blob_guard.reader().await.unwrap(); + let inverted_blob_reader = InvertedIndexBlobReader::new(blob_reader); + let metas = inverted_blob_reader.metadata(None).await.unwrap(); + assert!(metas.metas.contains_key(&target_key)); + } }