Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
177 changes: 176 additions & 1 deletion src/index/src/target.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,31 +14,105 @@

use std::any::Any;
use std::fmt::{self, Display};
use std::str::FromStr;

use common_error::ext::ErrorExt;
use common_error::status_code::StatusCode;
use common_macro::stack_trace_debug;
use serde::{Deserialize, Serialize};
use snafu::{Snafu, ensure};
use snafu::{OptionExt, Snafu, ensure};
use store_api::storage::ColumnId;

/// Describes an index target. Column ids are the only supported variant for now.
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub enum IndexTarget {
ColumnId(ColumnId),
SubField {
column_id: ColumnId,
path: Vec<String>,
value_type: IndexValueType,
},
}

#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
pub enum IndexValueType {
#[serde(rename = "string")]
String,
#[serde(rename = "i64")]
I64,
#[serde(rename = "u64")]
U64,
#[serde(rename = "f64")]
F64,
#[serde(rename = "bool")]
Bool,
#[serde(rename = "binary")]
Binary,
#[serde(rename = "ts_ms")]
TimestampMs,
#[serde(rename = "date32")]
Date32,
}

impl Display for IndexValueType {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let s = match self {
IndexValueType::String => "string",
IndexValueType::I64 => "i64",
IndexValueType::U64 => "u64",
IndexValueType::F64 => "f64",
IndexValueType::Bool => "bool",
IndexValueType::Binary => "binary",
IndexValueType::TimestampMs => "ts_ms",
IndexValueType::Date32 => "date32",
};
write!(f, "{s}")
}
}

impl FromStr for IndexValueType {
type Err = TargetKeyError;

fn from_str(s: &str) -> Result<Self, Self::Err> {
match s {
"string" => Ok(IndexValueType::String),
"i64" => Ok(IndexValueType::I64),
"u64" => Ok(IndexValueType::U64),
"f64" => Ok(IndexValueType::F64),
"bool" => Ok(IndexValueType::Bool),
"binary" => Ok(IndexValueType::Binary),
"ts_ms" => Ok(IndexValueType::TimestampMs),
"date32" => Ok(IndexValueType::Date32),
_ => InvalidIndexValueTypeSnafu {
value_type: s.to_string(),
}
.fail(),
}
}
}

impl Display for IndexTarget {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
IndexTarget::ColumnId(id) => write!(f, "{}", id),
IndexTarget::SubField {
column_id,
path,
value_type,
} => {
write!(f, "sub:{}:{}:{}", column_id, value_type, encode_path(path))
}
}
}
}

impl IndexTarget {
/// Parse a target key string back into an index target description.
pub fn decode(key: &str) -> Result<Self, TargetKeyError> {
if let Some(rest) = key.strip_prefix("sub:") {
return decode_subfield_target(rest);
}

validate_column_key(key)?;
let id = key
.parse::<ColumnId>()
Expand All @@ -59,6 +133,15 @@ pub enum TargetKeyError {

#[snafu(display("failed to parse column id from '{value}'"))]
InvalidColumnId { value: String },

#[snafu(display("invalid subfield target key: {key}"))]
InvalidSubfieldTargetKey { key: String },

#[snafu(display("invalid index value type: {value_type}"))]
InvalidIndexValueType { value_type: String },

#[snafu(display("invalid subfield path segment: {segment}"))]
InvalidSubfieldPathSegment { segment: String },
}

impl ErrorExt for TargetKeyError {
Expand All @@ -80,6 +163,85 @@ fn validate_column_key(key: &str) -> Result<(), TargetKeyError> {
Ok(())
}

fn decode_subfield_target(rest: &str) -> Result<IndexTarget, TargetKeyError> {
let mut parts = rest.splitn(3, ':');
let col = parts
.next()
.context(InvalidSubfieldTargetKeySnafu { key: rest })?;
let value_type = parts
.next()
.context(InvalidSubfieldTargetKeySnafu { key: rest })?;
let encoded_path = parts
.next()
.context(InvalidSubfieldTargetKeySnafu { key: rest })?;

validate_column_key(col)?;
let column_id = col
.parse::<ColumnId>()
.map_err(|_| InvalidColumnIdSnafu { value: col }.build())?;
let value_type = IndexValueType::from_str(value_type)?;
let path = decode_path(encoded_path)?;

Ok(IndexTarget::SubField {
column_id,
path,
value_type,
})
}

fn encode_path(path: &[String]) -> String {
path.iter()
.map(|seg| seg.replace('\\', "\\\\").replace('.', "\\."))
.collect::<Vec<_>>()
.join(".")
}

fn decode_path(encoded: &str) -> Result<Vec<String>, TargetKeyError> {
if encoded.is_empty() {
return Ok(Vec::new());
}

let mut parts = Vec::new();
let mut current = String::new();
let mut escaped = false;
for ch in encoded.chars() {
if escaped {
current.push(ch);
escaped = false;
continue;
}
if ch == '\\' {
escaped = true;
continue;
}
if ch == '.' {
if current.is_empty() {
return InvalidSubfieldPathSegmentSnafu {
segment: encoded.to_string(),
}
.fail();
}
parts.push(std::mem::take(&mut current));
continue;
}
current.push(ch);
}
if escaped {
return InvalidSubfieldPathSegmentSnafu {
segment: encoded.to_string(),
}
.fail();
}
if current.is_empty() {
return InvalidSubfieldPathSegmentSnafu {
segment: encoded.to_string(),
}
.fail();
}
parts.push(current);
Ok(parts)
}

#[cfg(test)]
mod tests {
use super::*;
Expand All @@ -104,4 +266,17 @@ mod tests {
let err = IndexTarget::decode("1a2").unwrap_err();
assert!(matches!(err, TargetKeyError::InvalidCharacters { .. }));
}

#[test]
fn encode_decode_subfield() {
let target = IndexTarget::SubField {
column_id: 42,
path: vec!["a".to_string(), "b.c".to_string()],
value_type: IndexValueType::String,
};
let key = format!("{}", target);
assert_eq!(key, "sub:42:string:a.b\\.c");
let decoded = IndexTarget::decode(&key).unwrap();
assert_eq!(decoded, target);
}
}
9 changes: 9 additions & 0 deletions src/mito2/src/engine/puffin_index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -370,6 +370,14 @@ fn decode_target_info(target_key: &str) -> (String, String) {
TARGET_TYPE_COLUMN.to_string(),
json!({ "column": id }).to_string(),
),
Ok(IndexTarget::SubField {
column_id,
path,
value_type,
}) => (
"subfield".to_string(),
json!({ "column": column_id, "path": path, "type": value_type }).to_string(),
),
_ => (
TARGET_TYPE_UNKNOWN.to_string(),
json!({ "error": "failed_to_decode" }).to_string(),
Expand All @@ -380,6 +388,7 @@ fn decode_target_info(target_key: &str) -> (String, String) {
fn decode_column_id(target_key: &str) -> Option<ColumnId> {
match IndexTarget::decode(target_key) {
Ok(IndexTarget::ColumnId(id)) => Some(id),
Ok(IndexTarget::SubField { column_id, .. }) => Some(column_id),
_ => None,
}
}
Expand Down
59 changes: 59 additions & 0 deletions src/mito2/src/region/options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use common_base::readable_size::ReadableSize;
use common_stat::get_total_memory_readable;
use common_time::TimeToLive;
use common_wal::options::{WAL_OPTIONS_KEY, WalOptions};
use index::target::IndexValueType;
use serde::de::Error as _;
use serde::{Deserialize, Deserializer, Serialize};
use serde_json::Value;
Expand Down Expand Up @@ -317,17 +318,32 @@ pub struct InvertedIndexOptions {
/// The number of rows in a segment.
#[serde_as(as = "DisplayFromStr")]
pub segment_row_count: usize,

/// Nested sub-field index targets encoded as JSON string.
/// Example:
/// `[{"column_id":1,"path":["a","b"],"value_type":"string"}]`
#[serde(deserialize_with = "deserialize_sub_fields")]
#[serde(serialize_with = "serialize_sub_fields")]
pub sub_fields: Vec<InvertedSubfieldIndexOption>,
}

impl Default for InvertedIndexOptions {
fn default() -> Self {
Self {
ignore_column_ids: Vec::new(),
segment_row_count: DEFAULT_INDEX_SEGMENT_ROW_COUNT,
sub_fields: Vec::new(),
}
}
}

#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct InvertedSubfieldIndexOption {
pub column_id: ColumnId,
pub path: Vec<String>,
pub value_type: IndexValueType,
}

/// Options for region level memtable.
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[serde(tag = "memtable.type", rename_all = "snake_case")]
Expand Down Expand Up @@ -415,6 +431,30 @@ where
serializer.serialize_str(&s)
}

fn deserialize_sub_fields<'de, D>(
deserializer: D,
) -> Result<Vec<InvertedSubfieldIndexOption>, D::Error>
where
D: Deserializer<'de>,
{
let s: String = Deserialize::deserialize(deserializer)?;
if s.is_empty() {
return Ok(Vec::new());
}
serde_json::from_str(&s).map_err(D::Error::custom)
}

fn serialize_sub_fields<S>(
sub_fields: &[InvertedSubfieldIndexOption],
serializer: S,
) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
let s = serde_json::to_string(sub_fields).map_err(serde::ser::Error::custom)?;
serializer.serialize_str(&s)
}

/// Converts the `options` map to a json object.
///
/// Replaces "null" strings by `null` json values.
Expand Down Expand Up @@ -560,13 +600,29 @@ mod tests {
inverted_index: InvertedIndexOptions {
ignore_column_ids: vec![1, 2, 3],
segment_row_count: 512,
sub_fields: vec![],
},
},
..Default::default()
};
assert_eq!(expect, options);
}

#[test]
fn test_with_inverted_index_sub_fields() {
let sub_fields = r#"[{"column_id":1,"path":["a","b"],"value_type":"string"}]"#;
let map = make_map(&[("index.inverted_index.sub_fields", sub_fields)]);
let options = RegionOptions::try_from(&map).unwrap();
assert_eq!(
vec![InvertedSubfieldIndexOption {
column_id: 1,
path: vec!["a".to_string(), "b".to_string()],
value_type: IndexValueType::String,
}],
options.index_options.inverted_index.sub_fields
);
}

// No need to add compatible tests for RegionOptions since the above tests already check for compatibility.
#[test]
fn test_with_any_wal_options() {
Expand Down Expand Up @@ -667,6 +723,7 @@ mod tests {
inverted_index: InvertedIndexOptions {
ignore_column_ids: vec![1, 2, 3],
segment_row_count: 512,
sub_fields: vec![],
},
},
memtable: Some(MemtableOptions::PartitionTree(PartitionTreeOptions {
Expand Down Expand Up @@ -702,6 +759,7 @@ mod tests {
inverted_index: InvertedIndexOptions {
ignore_column_ids: vec![1, 2, 3],
segment_row_count: 512,
sub_fields: vec![],
},
},
memtable: Some(MemtableOptions::PartitionTree(PartitionTreeOptions {
Expand Down Expand Up @@ -767,6 +825,7 @@ mod tests {
inverted_index: InvertedIndexOptions {
ignore_column_ids: vec![],
segment_row_count: 512,
sub_fields: vec![],
},
},
memtable: Some(MemtableOptions::PartitionTree(PartitionTreeOptions {
Expand Down
Loading
Loading