Skip to content
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/catalog/src/kvbackend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,6 @@ mod table_cache;
pub use builder::{
CatalogManagerConfigurator, CatalogManagerConfiguratorRef, KvBackendCatalogManagerBuilder,
};
pub use client::{CachedKvBackend, CachedKvBackendBuilder, MetaKvBackend};
pub use client::{CachedKvBackend, CachedKvBackendBuilder, new_read_only_meta_kv_backend};
pub use manager::KvBackendCatalogManager;
pub use table_cache::{TableCache, TableCacheRef, new_table_cache};
89 changes: 84 additions & 5 deletions src/catalog/src/kvbackend/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,10 @@ use std::time::Duration;
use common_error::ext::BoxedError;
use common_meta::cache_invalidator::KvCacheInvalidator;
use common_meta::error::Error::CacheNotGet;
use common_meta::error::{CacheNotGetSnafu, Error, ExternalSnafu, GetKvCacheSnafu, Result};
use common_meta::error::{
CacheNotGetSnafu, Error, ExternalSnafu, GetKvCacheSnafu, Result, UnsupportedSnafu,
};
use common_meta::kv_backend::read_only::ReadOnlyKvBackend;
use common_meta::kv_backend::txn::{Txn, TxnResponse};
use common_meta::kv_backend::{KvBackend, KvBackendRef, TxnService};
use common_meta::rpc::KeyValue;
Expand Down Expand Up @@ -357,19 +360,35 @@ impl CachedKvBackend {
}

#[derive(Debug)]
pub struct MetaKvBackend {
pub client: Arc<MetaClient>,
pub(crate) struct MetaKvBackend {
client: Arc<MetaClient>,
}

impl MetaKvBackend {
/// Constructs a [MetaKvBackend].
pub fn new(client: Arc<MetaClient>) -> MetaKvBackend {
fn new(client: Arc<MetaClient>) -> MetaKvBackend {
MetaKvBackend { client }
}
}

pub fn new_read_only_meta_kv_backend(client: Arc<MetaClient>) -> KvBackendRef {
Arc::new(ReadOnlyKvBackend::new(Arc::new(MetaKvBackend::new(client))))
}

#[async_trait::async_trait]
impl TxnService for MetaKvBackend {
type Error = Error;

async fn txn(&self, _txn: Txn) -> Result<TxnResponse> {
UnsupportedSnafu {
operation: "MetaKvBackend txn",
}
.fail()
}

fn max_txn_ops(&self) -> usize {
usize::MAX
}
}

/// Implement `KvBackend` trait for `MetaKvBackend` instead of opendal's `Accessor` since
Expand Down Expand Up @@ -465,6 +484,9 @@ mod tests {
use std::sync::atomic::{AtomicU32, Ordering};

use async_trait::async_trait;
use common_meta::kv_backend::memory::MemoryKvBackend;
use common_meta::kv_backend::read_only::ReadOnlyKvBackend;
use common_meta::kv_backend::txn::{Txn, TxnOp};
use common_meta::kv_backend::{KvBackend, TxnService};
use common_meta::rpc::KeyValue;
use common_meta::rpc::store::{
Expand All @@ -473,8 +495,9 @@ mod tests {
PutResponse, RangeRequest, RangeResponse,
};
use dashmap::DashMap;
use meta_client::client::MetaClientBuilder;

use super::CachedKvBackend;
use super::{CachedKvBackend, new_read_only_meta_kv_backend};

#[derive(Default)]
pub struct SimpleKvBackend {
Expand Down Expand Up @@ -579,6 +602,62 @@ mod tests {
}
}

#[tokio::test]
async fn test_cached_kv_backend_rejects_writes_with_read_only_inner() {
let inner = Arc::new(MemoryKvBackend::<common_meta::error::Error>::new());
let cached_kv = CachedKvBackend::wrap(Arc::new(ReadOnlyKvBackend::new(inner)));

let err = cached_kv
.put(PutRequest {
key: b"k1".to_vec(),
value: b"v1".to_vec(),
prev_kv: false,
})
.await
.unwrap_err();

assert!(matches!(
err,
common_meta::error::Error::ReadOnlyKvBackend { .. }
));
}

#[tokio::test]
async fn test_read_only_meta_kv_backend_rejects_writes() {
let meta_client = Arc::new(MetaClientBuilder::frontend_default_options().build());
let backend = new_read_only_meta_kv_backend(meta_client);

let err = backend
.put(PutRequest {
key: b"k1".to_vec(),
value: b"v1".to_vec(),
prev_kv: false,
})
.await
.unwrap_err();

assert!(matches!(
err,
common_meta::error::Error::ReadOnlyKvBackend { .. }
));
}

#[tokio::test]
async fn test_read_only_meta_kv_backend_does_not_emulate_txn() {
let meta_client = Arc::new(MetaClientBuilder::frontend_default_options().build());
let backend = new_read_only_meta_kv_backend(meta_client);

let result = backend
.txn(Txn::new().and_then(vec![TxnOp::Get(b"k1".to_vec())]))
.await;
let err = match result {
Ok(_) => panic!("expected unsupported txn error"),
Err(err) => err,
};

assert!(matches!(err, common_meta::error::Error::Unsupported { .. }));
}

async fn add_some_vals(kv_backend: &impl KvBackend) {
kv_backend
.put(PutRequest {
Expand Down
6 changes: 2 additions & 4 deletions src/cmd/src/datanode/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
use std::sync::Arc;

use cache::build_datanode_cache_registry;
use catalog::kvbackend::MetaKvBackend;
use catalog::kvbackend::new_read_only_meta_kv_backend;
use common_base::Plugins;
use common_meta::cache::LayeredCacheRegistryBuilder;
use common_telemetry::info;
Expand Down Expand Up @@ -99,9 +99,7 @@ impl InstanceBuilder {
.await
.context(MetaClientInitSnafu)?;

let backend = Arc::new(MetaKvBackend {
client: client.clone(),
});
let backend = new_read_only_meta_kv_backend(client.clone());
let mut builder = DatanodeBuilder::new(dn_opts.clone(), plugins.clone(), backend.clone());

let registry = Arc::new(
Expand Down
27 changes: 13 additions & 14 deletions src/cmd/src/flownode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@ use std::time::Duration;

use cache::{build_fundamental_cache_registry, with_default_composite_cache_registry};
use catalog::information_extension::DistributedInformationExtension;
use catalog::kvbackend::{CachedKvBackendBuilder, KvBackendCatalogManagerBuilder, MetaKvBackend};
use catalog::kvbackend::{
CachedKvBackendBuilder, KvBackendCatalogManagerBuilder, new_read_only_meta_kv_backend,
};
use clap::Parser;
use client::client_manager::NodeClients;
use common_base::Plugins;
Expand All @@ -46,8 +48,8 @@ use snafu::{OptionExt, ResultExt, ensure};
use tracing_appender::non_blocking::WorkerGuard;

use crate::error::{
BuildCacheRegistrySnafu, InitMetadataSnafu, LoadLayeredConfigSnafu, MetaClientInitSnafu,
MissingConfigSnafu, OtherSnafu, Result, ShutdownFlownodeSnafu, StartFlownodeSnafu,
BuildCacheRegistrySnafu, LoadLayeredConfigSnafu, MetaClientInitSnafu, MissingConfigSnafu,
OtherSnafu, Result, ShutdownFlownodeSnafu, StartFlownodeSnafu,
};
use crate::options::{GlobalOptions, GreptimeOptions};
use crate::{App, create_resource_limit_metrics, log_versions, maybe_activate_heap_profile};
Expand Down Expand Up @@ -300,13 +302,14 @@ impl StartCommand {
let cache_ttl = meta_config.metadata_cache_ttl;
let cache_tti = meta_config.metadata_cache_tti;

let readonly_meta_backend = new_read_only_meta_kv_backend(meta_client.clone());

// TODO(discord9): add helper function to ease the creation of cache registry&such
let cached_meta_backend =
CachedKvBackendBuilder::new(Arc::new(MetaKvBackend::new(meta_client.clone())))
.cache_max_capacity(cache_max_capacity)
.cache_ttl(cache_ttl)
.cache_tti(cache_tti)
.build();
let cached_meta_backend = CachedKvBackendBuilder::new(readonly_meta_backend.clone())
.cache_max_capacity(cache_max_capacity)
.cache_ttl(cache_ttl)
.cache_tti(cache_tti)
.build();
let cached_meta_backend = Arc::new(cached_meta_backend);

// Builds cache registry
Expand All @@ -316,7 +319,7 @@ impl StartCommand {
.build(),
);
let fundamental_cache_registry =
build_fundamental_cache_registry(Arc::new(MetaKvBackend::new(meta_client.clone())));
build_fundamental_cache_registry(readonly_meta_backend.clone());
let layered_cache_registry = Arc::new(
with_default_composite_cache_registry(
layered_cache_builder.add_cache_registry(fundamental_cache_registry),
Expand Down Expand Up @@ -346,10 +349,6 @@ impl StartCommand {

let table_metadata_manager =
Arc::new(TableMetadataManager::new(cached_meta_backend.clone()));
table_metadata_manager
.init()
.await
.context(InitMetadataSnafu)?;

let executor = HandlerGroupExecutor::new(vec![
Arc::new(ParseMailboxMessageHandler),
Expand Down
17 changes: 9 additions & 8 deletions src/cmd/src/frontend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use catalog::information_extension::DistributedInformationExtension;
use catalog::information_schema::InformationExtensionRef;
use catalog::kvbackend::{
CachedKvBackendBuilder, CatalogManagerConfiguratorRef, KvBackendCatalogManagerBuilder,
MetaKvBackend,
new_read_only_meta_kv_backend,
};
use catalog::process_manager::ProcessManager;
use clap::Parser;
Expand Down Expand Up @@ -393,13 +393,14 @@ impl StartCommand {
.await
.context(error::StartFrontendSnafu)?;

let readonly_meta_backend = new_read_only_meta_kv_backend(meta_client.clone());

// TODO(discord9): add helper function to ease the creation of cache registry&such
let cached_meta_backend =
CachedKvBackendBuilder::new(Arc::new(MetaKvBackend::new(meta_client.clone())))
.cache_max_capacity(cache_max_capacity)
.cache_ttl(cache_ttl)
.cache_tti(cache_tti)
.build();
let cached_meta_backend = CachedKvBackendBuilder::new(readonly_meta_backend.clone())
.cache_max_capacity(cache_max_capacity)
.cache_ttl(cache_ttl)
.cache_tti(cache_tti)
.build();
let cached_meta_backend = Arc::new(cached_meta_backend);

// Builds cache registry
Expand All @@ -409,7 +410,7 @@ impl StartCommand {
.build(),
);
let fundamental_cache_registry =
build_fundamental_cache_registry(Arc::new(MetaKvBackend::new(meta_client.clone())));
build_fundamental_cache_registry(readonly_meta_backend.clone());
let layered_cache_registry = Arc::new(
with_default_composite_cache_registry(
layered_cache_builder.add_cache_registry(fundamental_cache_registry),
Expand Down
9 changes: 8 additions & 1 deletion src/common/meta/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,13 @@ pub enum Error {
location: Location,
},

#[snafu(display("Trying to write to a read-only kv backend: {}", name))]
ReadOnlyKvBackend {
name: String,
#[snafu(implicit)]
location: Location,
},

#[snafu(display("Failed to get procedure state receiver, procedure id: {procedure_id}"))]
ProcedureStateReceiver {
procedure_id: ProcedureId,
Expand Down Expand Up @@ -1146,7 +1153,7 @@ impl ErrorExt for Error {
| ColumnIdMismatch { .. }
| TimestampMismatch { .. } => StatusCode::Unexpected,

Unsupported { .. } => StatusCode::Unsupported,
Unsupported { .. } | ReadOnlyKvBackend { .. } => StatusCode::Unsupported,
WriteObject { .. } | ReadObject { .. } => StatusCode::StorageUnavailable,

SerdeJson { .. }
Expand Down
41 changes: 41 additions & 0 deletions src/common/meta/src/key.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1660,6 +1660,7 @@ mod tests {
};
use crate::kv_backend::KvBackend;
use crate::kv_backend::memory::MemoryKvBackend;
use crate::kv_backend::read_only::ReadOnlyKvBackend;
use crate::peer::Peer;
use crate::rpc::router::{LeaderState, Region, RegionRoute, region_distribution};
use crate::rpc::store::{PutRequest, RangeRequest};
Expand Down Expand Up @@ -1948,6 +1949,46 @@ mod tests {
assert_eq!(region_routes[0].follower_peers[0].addr, "new-a3");
}

#[tokio::test]
async fn test_get_full_table_info_with_read_only_kv_backend() {
Comment thread
WenyXu marked this conversation as resolved.
let mem_kv = Arc::new(MemoryKvBackend::default());
let writable_manager = TableMetadataManager::new(mem_kv.clone());

let region_routes = vec![new_test_region_route()];
let table_info = new_test_table_info();
let table_id = table_info.ident.table_id;

create_physical_table_metadata(
&writable_manager,
table_info.clone(),
region_routes.clone(),
HashMap::new(),
)
.await
.unwrap();

let read_only_kv = Arc::new(ReadOnlyKvBackend::new(mem_kv));
let read_only_manager = TableMetadataManager::new(read_only_kv);

let (remote_table_info, remote_table_route) = read_only_manager
.get_full_table_info(table_id)
.await
.unwrap();

assert_eq!(
remote_table_info.unwrap().into_inner().table_info,
table_info
);
assert_eq!(
remote_table_route
.unwrap()
.into_inner()
.region_routes()
.unwrap(),
&region_routes
);
}

#[tokio::test]
async fn test_create_logic_tables_metadata() {
let mem_kv = Arc::new(MemoryKvBackend::default());
Expand Down
1 change: 1 addition & 0 deletions src/common/meta/src/kv_backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ pub mod etcd;
pub mod memory;
#[cfg(any(feature = "mysql_kvbackend", feature = "pg_kvbackend"))]
pub mod rds;
pub mod read_only;
pub mod test;
#[cfg(any(test, feature = "testing"))]
pub mod test_util;
Expand Down
Loading
Loading