Skip to content
Draft
10 changes: 0 additions & 10 deletions config/config.md
Original file line number Diff line number Diff line change
Expand Up @@ -192,11 +192,6 @@
| `region_engine.mito.bloom_filter_index.create_on_compaction` | String | `auto` | Whether to create the bloom filter on compaction.<br/>- `auto`: automatically (default)<br/>- `disable`: never |
| `region_engine.mito.bloom_filter_index.apply_on_query` | String | `auto` | Whether to apply the bloom filter on query<br/>- `auto`: automatically (default)<br/>- `disable`: never |
| `region_engine.mito.bloom_filter_index.mem_threshold_on_create` | String | `auto` | Memory threshold for bloom filter creation.<br/>- `auto`: automatically determine the threshold based on the system memory size (default)<br/>- `unlimited`: no memory limit<br/>- `[size]` e.g. `64MB`: fixed memory threshold |
| `region_engine.mito.memtable` | -- | -- | -- |
| `region_engine.mito.memtable.type` | String | `time_series` | Memtable type.<br/>- `time_series`: time-series memtable<br/>- `partition_tree`: partition tree memtable (experimental) |
| `region_engine.mito.memtable.index_max_keys_per_shard` | Integer | `8192` | The max number of keys in one shard.<br/>Only available for `partition_tree` memtable. |
| `region_engine.mito.memtable.data_freeze_threshold` | Integer | `32768` | The max rows of data inside the actively writing buffer in one shard.<br/>Only available for `partition_tree` memtable. |
| `region_engine.mito.memtable.fork_dictionary_bytes` | String | `1GiB` | Max dictionary bytes.<br/>Only available for `partition_tree` memtable. |
| `region_engine.file` | -- | -- | Enable the file engine. |
| `region_engine.metric` | -- | -- | Metric engine options. |
| `region_engine.metric.sparse_primary_key_encoding` | Bool | `true` | Whether to use sparse primary key encoding. |
Expand Down Expand Up @@ -589,11 +584,6 @@
| `region_engine.mito.bloom_filter_index.create_on_compaction` | String | `auto` | Whether to create the index on compaction.<br/>- `auto`: automatically (default)<br/>- `disable`: never |
| `region_engine.mito.bloom_filter_index.apply_on_query` | String | `auto` | Whether to apply the index on query<br/>- `auto`: automatically (default)<br/>- `disable`: never |
| `region_engine.mito.bloom_filter_index.mem_threshold_on_create` | String | `auto` | Memory threshold for the index creation.<br/>- `auto`: automatically determine the threshold based on the system memory size (default)<br/>- `unlimited`: no memory limit<br/>- `[size]` e.g. `64MB`: fixed memory threshold |
| `region_engine.mito.memtable` | -- | -- | -- |
| `region_engine.mito.memtable.type` | String | `time_series` | Memtable type.<br/>- `time_series`: time-series memtable<br/>- `partition_tree`: partition tree memtable (experimental) |
| `region_engine.mito.memtable.index_max_keys_per_shard` | Integer | `8192` | The max number of keys in one shard.<br/>Only available for `partition_tree` memtable. |
| `region_engine.mito.memtable.data_freeze_threshold` | Integer | `32768` | The max rows of data inside the actively writing buffer in one shard.<br/>Only available for `partition_tree` memtable. |
| `region_engine.mito.memtable.fork_dictionary_bytes` | String | `1GiB` | Max dictionary bytes.<br/>Only available for `partition_tree` memtable. |
| `region_engine.mito.gc` | -- | -- | -- |
| `region_engine.mito.gc.enable` | Bool | `false` | Whether GC is enabled. Need to be the same with metasrv's `gc.enable` or unexpected behavior will occur |
| `region_engine.mito.gc.lingering_time` | String | `1m` | Lingering time before deleting files.<br/>Should be long enough to allow long running queries to finish.<br/>If set to None, then unused files will be deleted immediately. |
Expand Down
18 changes: 0 additions & 18 deletions config/datanode.example.toml
Original file line number Diff line number Diff line change
Expand Up @@ -654,24 +654,6 @@ apply_on_query = "auto"
## - `[size]` e.g. `64MB`: fixed memory threshold
mem_threshold_on_create = "auto"

[region_engine.mito.memtable]
## Memtable type.
## - `time_series`: time-series memtable
## - `partition_tree`: partition tree memtable (experimental)
type = "time_series"

## The max number of keys in one shard.
## Only available for `partition_tree` memtable.
index_max_keys_per_shard = 8192

## The max rows of data inside the actively writing buffer in one shard.
## Only available for `partition_tree` memtable.
data_freeze_threshold = 32768

## Max dictionary bytes.
## Only available for `partition_tree` memtable.
fork_dictionary_bytes = "1GiB"

[region_engine.mito.gc]
## Whether GC is enabled. Need to be the same with metasrv's `gc.enable` or unexpected behavior will occur
enable = false
Expand Down
18 changes: 0 additions & 18 deletions config/standalone.example.toml
Original file line number Diff line number Diff line change
Expand Up @@ -757,24 +757,6 @@ apply_on_query = "auto"
## - `[size]` e.g. `64MB`: fixed memory threshold
mem_threshold_on_create = "auto"

[region_engine.mito.memtable]
## Memtable type.
## - `time_series`: time-series memtable
## - `partition_tree`: partition tree memtable (experimental)
type = "time_series"

## The max number of keys in one shard.
## Only available for `partition_tree` memtable.
index_max_keys_per_shard = 8192

## The max rows of data inside the actively writing buffer in one shard.
## Only available for `partition_tree` memtable.
data_freeze_threshold = 32768

## Max dictionary bytes.
## Only available for `partition_tree` memtable.
fork_dictionary_bytes = "1GiB"

[[region_engine]]
## Enable the file engine.
[region_engine.file]
Expand Down
262 changes: 4 additions & 258 deletions src/metric-engine/src/engine/bulk_insert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,20 +14,14 @@

use std::collections::HashSet;

use api::v1::{ArrowIpc, ColumnDataType, SemanticType};
use api::v1::{ArrowIpc, SemanticType};
use bytes::Bytes;
use common_error::ext::ErrorExt;
use common_error::status_code::StatusCode;
use common_grpc::flight::{FlightEncoder, FlightMessage};
use common_query::prelude::{greptime_timestamp, greptime_value};
use datatypes::arrow::array::{Array, Float64Array, StringArray, TimestampMillisecondArray};
use datatypes::arrow::record_batch::RecordBatch;
use snafu::{OptionExt, ensure};
use store_api::codec::PrimaryKeyEncoding;
use store_api::metadata::RegionMetadataRef;
use store_api::region_request::{
AffectedRows, RegionBulkInsertsRequest, RegionPutRequest, RegionRequest,
};
use store_api::region_request::{AffectedRows, RegionBulkInsertsRequest, RegionRequest};
use store_api::storage::RegionId;

use crate::batch_modifier::{TagColumnInfo, modify_batch_sparse};
Expand All @@ -42,8 +36,7 @@ impl MetricEngineInner {
/// **Logical region path:** The request payload is a logical `RecordBatch`
/// (timestamp, value and tag columns). It is transformed to physical format
/// via `modify_batch_sparse`, encoded to Arrow IPC, and forwarded as a
/// `BulkInserts` request to the data region. If mito reports
/// `StatusCode::Unsupported`, the request is transparently retried as a `Put`.
/// `BulkInserts` request to the data region.
///
/// **Physical region path:** The request payload is already in physical format
/// (produced by the batcher's `flush_batch_physical`). It is forwarded directly
Expand Down Expand Up @@ -134,27 +127,9 @@ impl MetricEngineInner {
},
partition_expr_version,
};
match self
.data_region
self.data_region
.write_data(data_region_id, RegionRequest::BulkInserts(request))
.await
{
Ok(affected_rows) => Ok(affected_rows),
Err(err) if err.status_code() == StatusCode::Unsupported => {
// todo(hl): fallback path for PartitionTreeMemtable, remove this once we remove it
let rows = record_batch_to_rows(&batch, region_id)?;
self.put_region(
region_id,
RegionPutRequest {
rows,
hint: None,
partition_expr_version,
},
)
.await
}
Err(err) => Err(err),
}
}

fn resolve_tag_columns_from_metadata(
Expand Down Expand Up @@ -214,174 +189,6 @@ impl MetricEngineInner {
}
}

fn record_batch_to_rows(batch: &RecordBatch, logical_region_id: RegionId) -> Result<api::v1::Rows> {
let schema_ref = batch.schema();
let fields = schema_ref.fields();

let mut ts_idx = None;
let mut val_idx = None;
let mut tag_indices = Vec::new();

for (idx, field) in fields.iter().enumerate() {
if field.name() == greptime_timestamp() {
ts_idx = Some(idx);
if !matches!(
field.data_type(),
datatypes::arrow::datatypes::DataType::Timestamp(
datatypes::arrow::datatypes::TimeUnit::Millisecond,
_
)
) {
return error::UnexpectedRequestSnafu {
reason: format!(
"Timestamp column '{}' in region {:?} has incompatible type: {:?}",
field.name(),
logical_region_id,
field.data_type()
),
}
.fail();
}
} else if field.name() == greptime_value() {
val_idx = Some(idx);
if !matches!(
field.data_type(),
datatypes::arrow::datatypes::DataType::Float64
) {
return error::UnexpectedRequestSnafu {
reason: format!(
"Value column '{}' in region {:?} has incompatible type: {:?}",
field.name(),
logical_region_id,
field.data_type()
),
}
.fail();
}
} else {
if !matches!(
field.data_type(),
datatypes::arrow::datatypes::DataType::Utf8
) {
return error::UnexpectedRequestSnafu {
reason: format!(
"Tag column '{}' in region {:?} must be Utf8, found: {:?}",
field.name(),
logical_region_id,
field.data_type()
),
}
.fail();
}
tag_indices.push(idx);
}
}

let ts_idx = ts_idx.with_context(|| error::UnexpectedRequestSnafu {
reason: format!(
"Timestamp column '{}' not found in RecordBatch for region {:?}",
greptime_timestamp(),
logical_region_id
),
})?;
let val_idx = val_idx.with_context(|| error::UnexpectedRequestSnafu {
reason: format!(
"Value column '{}' not found in RecordBatch for region {:?}",
greptime_value(),
logical_region_id
),
})?;

let mut schema = Vec::with_capacity(2 + tag_indices.len());
schema.push(api::v1::ColumnSchema {
column_name: greptime_timestamp().to_string(),
datatype: ColumnDataType::TimestampMillisecond as i32,
semantic_type: SemanticType::Timestamp as i32,
datatype_extension: None,
options: None,
});
schema.push(api::v1::ColumnSchema {
column_name: greptime_value().to_string(),
datatype: ColumnDataType::Float64 as i32,
semantic_type: SemanticType::Field as i32,
datatype_extension: None,
options: None,
});
for &idx in &tag_indices {
let field = &fields[idx];
schema.push(api::v1::ColumnSchema {
column_name: field.name().clone(),
datatype: ColumnDataType::String as i32,
semantic_type: SemanticType::Tag as i32,
datatype_extension: None,
options: None,
});
}

let ts_array = batch
.column(ts_idx)
.as_any()
.downcast_ref::<TimestampMillisecondArray>()
.expect("validated as TimestampMillisecond");
let val_array = batch
.column(val_idx)
.as_any()
.downcast_ref::<Float64Array>()
.expect("validated as Float64");
let tag_arrays: Vec<&StringArray> = tag_indices
.iter()
.map(|&idx| {
batch
.column(idx)
.as_any()
.downcast_ref::<StringArray>()
.expect("validated as Utf8")
})
.collect();

let num_rows = batch.num_rows();
let mut rows = Vec::with_capacity(num_rows);
for row_idx in 0..num_rows {
let mut values = Vec::with_capacity(2 + tag_arrays.len());

if ts_array.is_null(row_idx) {
values.push(api::v1::Value { value_data: None });
} else {
values.push(api::v1::Value {
value_data: Some(api::v1::value::ValueData::TimestampMillisecondValue(
ts_array.value(row_idx),
)),
});
}

if val_array.is_null(row_idx) {
values.push(api::v1::Value { value_data: None });
} else {
values.push(api::v1::Value {
value_data: Some(api::v1::value::ValueData::F64Value(
val_array.value(row_idx),
)),
});
}

for arr in &tag_arrays {
if arr.is_null(row_idx) {
values.push(api::v1::Value { value_data: None });
} else {
values.push(api::v1::Value {
value_data: Some(api::v1::value::ValueData::StringValue(
arr.value(row_idx).to_string(),
)),
});
}
}

rows.push(api::v1::Row { values });
}

Ok(api::v1::Rows { schema, rows })
}

fn record_batch_to_ipc(record_batch: &RecordBatch) -> Result<(Bytes, Bytes, Bytes)> {
let mut encoder = FlightEncoder::default();
let schema = encoder.encode_schema(record_batch.schema().as_ref());
Expand Down Expand Up @@ -810,65 +617,4 @@ mod tests {

assert_eq!(put_output, bulk_output);
}

#[test]
fn test_record_batch_to_rows_with_null_values() {
use datatypes::arrow::array::{Float64Array, StringArray, TimestampMillisecondArray};
use datatypes::arrow::datatypes::{DataType, Field, Schema as ArrowSchema, TimeUnit};
use datatypes::arrow::record_batch::RecordBatch;
use store_api::storage::RegionId;

use crate::engine::bulk_insert::record_batch_to_rows;

let schema = Arc::new(ArrowSchema::new(vec![
Field::new(
greptime_timestamp(),
DataType::Timestamp(TimeUnit::Millisecond, None),
true,
),
Field::new(greptime_value(), DataType::Float64, true),
Field::new("job", DataType::Utf8, true),
Field::new("host", DataType::Utf8, true),
]));

let ts_array = TimestampMillisecondArray::from(vec![Some(1000), None, Some(3000)]);
let val_array = Float64Array::from(vec![Some(1.0), Some(2.0), None]);
let job_array = StringArray::from(vec![Some("job1"), None, Some("job3")]);
let host_array = StringArray::from(vec![None, Some("host2"), Some("host3")]);

let batch = RecordBatch::try_new(
schema,
vec![
Arc::new(ts_array),
Arc::new(val_array),
Arc::new(job_array),
Arc::new(host_array),
],
)
.unwrap();

let region_id = RegionId::new(1, 1);
let rows = record_batch_to_rows(&batch, region_id).unwrap();

assert_eq!(rows.rows.len(), 3);
assert_eq!(rows.schema.len(), 4);

// Row 0: all non-null except host
assert!(rows.rows[0].values[0].value_data.is_some());
assert!(rows.rows[0].values[1].value_data.is_some());
assert!(rows.rows[0].values[2].value_data.is_some());
assert!(rows.rows[0].values[3].value_data.is_none());

// Row 1: null timestamp, null job
assert!(rows.rows[1].values[0].value_data.is_none());
assert!(rows.rows[1].values[1].value_data.is_some());
assert!(rows.rows[1].values[2].value_data.is_none());
assert!(rows.rows[1].values[3].value_data.is_some());

// Row 2: null value
assert!(rows.rows[2].values[0].value_data.is_some());
assert!(rows.rows[2].values[1].value_data.is_none());
assert!(rows.rows[2].values[2].value_data.is_some());
assert!(rows.rows[2].values[3].value_data.is_some());
}
}
Loading
Loading