Skip to content
Draft
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
183 changes: 182 additions & 1 deletion src/index/src/target.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,31 +14,111 @@

use std::any::Any;
use std::fmt::{self, Display};
use std::str::FromStr;

use common_error::ext::ErrorExt;
use common_error::status_code::StatusCode;
use common_macro::stack_trace_debug;
use serde::{Deserialize, Serialize};
use snafu::{Snafu, ensure};
use snafu::{OptionExt, Snafu, ensure};
use store_api::storage::ColumnId;

/// Describes an index target. Column ids are the only supported variant for now.
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub enum IndexTarget {
ColumnId(ColumnId),
SubField {
column_id: ColumnId,
path: Vec<String>,
value_type: IndexValueType,
},
}

#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
pub enum IndexValueType {
#[serde(rename = "string")]
String,
#[serde(rename = "i64")]
I64,
#[serde(rename = "u64")]
U64,
#[serde(rename = "f64")]
F64,
#[serde(rename = "bool")]
Bool,
#[serde(rename = "binary")]
Binary,
#[serde(rename = "ts_ms")]
TimestampMs,
#[serde(rename = "date32")]
Date32,
}

impl Display for IndexValueType {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let s = match self {
IndexValueType::String => "string",
IndexValueType::I64 => "i64",
IndexValueType::U64 => "u64",
IndexValueType::F64 => "f64",
IndexValueType::Bool => "bool",
IndexValueType::Binary => "binary",
IndexValueType::TimestampMs => "ts_ms",
IndexValueType::Date32 => "date32",
};
write!(f, "{s}")
}
}

impl FromStr for IndexValueType {
type Err = TargetKeyError;

fn from_str(s: &str) -> Result<Self, Self::Err> {
match s {
"string" => Ok(IndexValueType::String),
"i64" => Ok(IndexValueType::I64),
"u64" => Ok(IndexValueType::U64),
"f64" => Ok(IndexValueType::F64),
"bool" => Ok(IndexValueType::Bool),
"binary" => Ok(IndexValueType::Binary),
"ts_ms" => Ok(IndexValueType::TimestampMs),
"date32" => Ok(IndexValueType::Date32),
_ => InvalidIndexValueTypeSnafu {
value_type: s.to_string(),
}
.fail(),
}
}
}

impl Display for IndexTarget {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
IndexTarget::ColumnId(id) => write!(f, "{}", id),
IndexTarget::SubField {
column_id,
path,

Check warning on line 100 in src/index/src/target.rs

View workflow job for this annotation

GitHub Actions / Rustfmt

Diff in /home/runner/work/greptimedb/greptimedb/src/index/src/target.rs
value_type,
} => {
write!(
f,
"sub:{}:{}:{}",
column_id,
value_type,
encode_path(path)
)
}
}
}
}

impl IndexTarget {
/// Parse a target key string back into an index target description.
pub fn decode(key: &str) -> Result<Self, TargetKeyError> {
if let Some(rest) = key.strip_prefix("sub:") {
return decode_subfield_target(rest);
}

validate_column_key(key)?;
let id = key
.parse::<ColumnId>()
Expand All @@ -59,6 +139,15 @@

#[snafu(display("failed to parse column id from '{value}'"))]
InvalidColumnId { value: String },

#[snafu(display("invalid subfield target key: {key}"))]
InvalidSubfieldTargetKey { key: String },

#[snafu(display("invalid index value type: {value_type}"))]
InvalidIndexValueType { value_type: String },

#[snafu(display("invalid subfield path segment: {segment}"))]
InvalidSubfieldPathSegment { segment: String },
}

impl ErrorExt for TargetKeyError {
Expand All @@ -80,6 +169,85 @@
Ok(())
}

fn decode_subfield_target(rest: &str) -> Result<IndexTarget, TargetKeyError> {
let mut parts = rest.splitn(3, ':');
let col = parts
.next()
.context(InvalidSubfieldTargetKeySnafu { key: rest })?;
let value_type = parts
.next()
.context(InvalidSubfieldTargetKeySnafu { key: rest })?;
let encoded_path = parts
.next()
.context(InvalidSubfieldTargetKeySnafu { key: rest })?;

validate_column_key(col)?;
let column_id = col
.parse::<ColumnId>()
.map_err(|_| InvalidColumnIdSnafu { value: col }.build())?;
let value_type = IndexValueType::from_str(value_type)?;
let path = decode_path(encoded_path)?;

Ok(IndexTarget::SubField {
column_id,
path,
value_type,
})
}

fn encode_path(path: &[String]) -> String {
path.iter()
.map(|seg| seg.replace('\\', "\\\\").replace('.', "\\."))
.collect::<Vec<_>>()
.join(".")
}

fn decode_path(encoded: &str) -> Result<Vec<String>, TargetKeyError> {
if encoded.is_empty() {
return Ok(Vec::new());
}

let mut parts = Vec::new();
let mut current = String::new();
let mut escaped = false;
for ch in encoded.chars() {
if escaped {
current.push(ch);
escaped = false;
continue;
}
if ch == '\\' {
escaped = true;
continue;
}
if ch == '.' {
if current.is_empty() {
return InvalidSubfieldPathSegmentSnafu {
segment: encoded.to_string(),
}
.fail();
}
parts.push(std::mem::take(&mut current));
continue;
}
current.push(ch);
}
if escaped {
return InvalidSubfieldPathSegmentSnafu {
segment: encoded.to_string(),
}
.fail();
}
if current.is_empty() {
return InvalidSubfieldPathSegmentSnafu {
segment: encoded.to_string(),
}
.fail();
}
parts.push(current);
Ok(parts)
}

#[cfg(test)]
mod tests {
use super::*;
Expand All @@ -104,4 +272,17 @@
let err = IndexTarget::decode("1a2").unwrap_err();
assert!(matches!(err, TargetKeyError::InvalidCharacters { .. }));
}

#[test]
fn encode_decode_subfield() {
let target = IndexTarget::SubField {
column_id: 42,
path: vec!["a".to_string(), "b.c".to_string()],
value_type: IndexValueType::String,
};
let key = format!("{}", target);
assert_eq!(key, "sub:42:string:a.b\\.c");
let decoded = IndexTarget::decode(&key).unwrap();
assert_eq!(decoded, target);
}
}
9 changes: 9 additions & 0 deletions src/mito2/src/engine/puffin_index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -370,6 +370,14 @@ fn decode_target_info(target_key: &str) -> (String, String) {
TARGET_TYPE_COLUMN.to_string(),
json!({ "column": id }).to_string(),
),
Ok(IndexTarget::SubField {
column_id,
path,
value_type,
}) => (
"subfield".to_string(),
json!({ "column": column_id, "path": path, "type": value_type }).to_string(),
),
_ => (
TARGET_TYPE_UNKNOWN.to_string(),
json!({ "error": "failed_to_decode" }).to_string(),
Expand All @@ -380,6 +388,7 @@ fn decode_target_info(target_key: &str) -> (String, String) {
fn decode_column_id(target_key: &str) -> Option<ColumnId> {
match IndexTarget::decode(target_key) {
Ok(IndexTarget::ColumnId(id)) => Some(id),
Ok(IndexTarget::SubField { column_id, .. }) => Some(column_id),
_ => None,
}
}
Expand Down
59 changes: 59 additions & 0 deletions src/mito2/src/region/options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,15 @@
use std::time::Duration;

use common_base::readable_size::ReadableSize;
use common_stat::get_total_memory_readable;

Check warning on line 23 in src/mito2/src/region/options.rs

View workflow job for this annotation

GitHub Actions / Rustfmt

Diff in /home/runner/work/greptimedb/greptimedb/src/mito2/src/region/options.rs
use common_time::TimeToLive;
use common_wal::options::{WAL_OPTIONS_KEY, WalOptions};
use serde::de::Error as _;
use serde::{Deserialize, Deserializer, Serialize};
use serde_json::Value;
use serde_with::{DisplayFromStr, NoneAsEmptyString, serde_as, with_prefix};

Check warning on line 29 in src/mito2/src/region/options.rs

View workflow job for this annotation

GitHub Actions / Rustfmt

Diff in /home/runner/work/greptimedb/greptimedb/src/mito2/src/region/options.rs
use snafu::{ResultExt, ensure};
use index::target::IndexValueType;
use store_api::codec::PrimaryKeyEncoding;
use store_api::mito_engine_options::COMPACTION_OVERRIDE;
use store_api::storage::ColumnId;
Expand Down Expand Up @@ -317,17 +318,32 @@
/// The number of rows in a segment.
#[serde_as(as = "DisplayFromStr")]
pub segment_row_count: usize,

/// Nested sub-field index targets encoded as JSON string.
/// Example:
/// `[{"column_id":1,"path":["a","b"],"value_type":"string"}]`
#[serde(deserialize_with = "deserialize_sub_fields")]
#[serde(serialize_with = "serialize_sub_fields")]
pub sub_fields: Vec<InvertedSubfieldIndexOption>,
}

impl Default for InvertedIndexOptions {
fn default() -> Self {
Self {
ignore_column_ids: Vec::new(),
segment_row_count: DEFAULT_INDEX_SEGMENT_ROW_COUNT,
sub_fields: Vec::new(),
}
}
}

#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct InvertedSubfieldIndexOption {
pub column_id: ColumnId,
pub path: Vec<String>,
pub value_type: IndexValueType,
}

/// Options for region level memtable.
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[serde(tag = "memtable.type", rename_all = "snake_case")]
Expand Down Expand Up @@ -415,6 +431,30 @@
serializer.serialize_str(&s)
}

fn deserialize_sub_fields<'de, D>(
deserializer: D,
) -> Result<Vec<InvertedSubfieldIndexOption>, D::Error>
where
D: Deserializer<'de>,
{
let s: String = Deserialize::deserialize(deserializer)?;
if s.is_empty() {
return Ok(Vec::new());
}
serde_json::from_str(&s).map_err(D::Error::custom)
}

fn serialize_sub_fields<S>(
sub_fields: &[InvertedSubfieldIndexOption],
serializer: S,
) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
let s = serde_json::to_string(sub_fields).map_err(serde::ser::Error::custom)?;
serializer.serialize_str(&s)
}

/// Converts the `options` map to a json object.
///
/// Replaces "null" strings by `null` json values.
Expand Down Expand Up @@ -560,13 +600,29 @@
inverted_index: InvertedIndexOptions {
ignore_column_ids: vec![1, 2, 3],
segment_row_count: 512,
sub_fields: vec![],
},
},
..Default::default()
};
assert_eq!(expect, options);
}

#[test]
fn test_with_inverted_index_sub_fields() {
let sub_fields = r#"[{"column_id":1,"path":["a","b"],"value_type":"string"}]"#;
let map = make_map(&[("index.inverted_index.sub_fields", sub_fields)]);
let options = RegionOptions::try_from(&map).unwrap();
assert_eq!(
vec![InvertedSubfieldIndexOption {
column_id: 1,
path: vec!["a".to_string(), "b".to_string()],
value_type: IndexValueType::String,
}],
options.index_options.inverted_index.sub_fields
);
}

// No need to add compatible tests for RegionOptions since the above tests already check for compatibility.
#[test]
fn test_with_any_wal_options() {
Expand Down Expand Up @@ -667,6 +723,7 @@
inverted_index: InvertedIndexOptions {
ignore_column_ids: vec![1, 2, 3],
segment_row_count: 512,
sub_fields: vec![],
},
},
memtable: Some(MemtableOptions::PartitionTree(PartitionTreeOptions {
Expand Down Expand Up @@ -702,6 +759,7 @@
inverted_index: InvertedIndexOptions {
ignore_column_ids: vec![1, 2, 3],
segment_row_count: 512,
sub_fields: vec![],
},
},
memtable: Some(MemtableOptions::PartitionTree(PartitionTreeOptions {
Expand Down Expand Up @@ -767,6 +825,7 @@
inverted_index: InvertedIndexOptions {
ignore_column_ids: vec![],
segment_row_count: 512,
sub_fields: vec![],
},
},
memtable: Some(MemtableOptions::PartitionTree(PartitionTreeOptions {
Expand Down
Loading
Loading