Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/catalog/src/kvbackend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,6 @@ mod table_cache;
pub use builder::{
CatalogManagerConfigurator, CatalogManagerConfiguratorRef, KvBackendCatalogManagerBuilder,
};
pub use client::{CachedKvBackend, CachedKvBackendBuilder, MetaKvBackend};
pub use client::{CachedKvBackend, CachedKvBackendBuilder, new_read_only_meta_kv_backend};
pub use manager::KvBackendCatalogManager;
pub use table_cache::{TableCache, TableCacheRef, new_table_cache};
89 changes: 84 additions & 5 deletions src/catalog/src/kvbackend/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,10 @@ use std::time::Duration;
use common_error::ext::BoxedError;
use common_meta::cache_invalidator::KvCacheInvalidator;
use common_meta::error::Error::CacheNotGet;
use common_meta::error::{CacheNotGetSnafu, Error, ExternalSnafu, GetKvCacheSnafu, Result};
use common_meta::error::{
CacheNotGetSnafu, Error, ExternalSnafu, GetKvCacheSnafu, Result, UnsupportedSnafu,
};
use common_meta::kv_backend::read_only::ReadOnlyKvBackend;
use common_meta::kv_backend::txn::{Txn, TxnResponse};
use common_meta::kv_backend::{KvBackend, KvBackendRef, TxnService};
use common_meta::rpc::KeyValue;
Expand Down Expand Up @@ -357,19 +360,35 @@ impl CachedKvBackend {
}

#[derive(Debug)]
pub struct MetaKvBackend {
pub client: Arc<MetaClient>,
pub(crate) struct MetaKvBackend {
client: Arc<MetaClient>,
}

impl MetaKvBackend {
/// Constructs a [MetaKvBackend].
pub fn new(client: Arc<MetaClient>) -> MetaKvBackend {
fn new(client: Arc<MetaClient>) -> MetaKvBackend {
MetaKvBackend { client }
}
}

pub fn new_read_only_meta_kv_backend(client: Arc<MetaClient>) -> KvBackendRef {
Arc::new(ReadOnlyKvBackend::new(Arc::new(MetaKvBackend::new(client))))
}

#[async_trait::async_trait]
impl TxnService for MetaKvBackend {
type Error = Error;

async fn txn(&self, _txn: Txn) -> Result<TxnResponse> {
UnsupportedSnafu {
operation: "MetaKvBackend txn",
}
.fail()
}

fn max_txn_ops(&self) -> usize {
usize::MAX
}
}

/// Implement `KvBackend` trait for `MetaKvBackend` instead of opendal's `Accessor` since
Expand Down Expand Up @@ -465,6 +484,9 @@ mod tests {
use std::sync::atomic::{AtomicU32, Ordering};

use async_trait::async_trait;
use common_meta::kv_backend::memory::MemoryKvBackend;
use common_meta::kv_backend::read_only::ReadOnlyKvBackend;
use common_meta::kv_backend::txn::{Txn, TxnOp};
use common_meta::kv_backend::{KvBackend, TxnService};
use common_meta::rpc::KeyValue;
use common_meta::rpc::store::{
Expand All @@ -473,8 +495,9 @@ mod tests {
PutResponse, RangeRequest, RangeResponse,
};
use dashmap::DashMap;
use meta_client::client::MetaClientBuilder;

use super::CachedKvBackend;
use super::{CachedKvBackend, new_read_only_meta_kv_backend};

#[derive(Default)]
pub struct SimpleKvBackend {
Expand Down Expand Up @@ -579,6 +602,62 @@ mod tests {
}
}

#[tokio::test]
async fn test_cached_kv_backend_rejects_writes_with_read_only_inner() {
let inner = Arc::new(MemoryKvBackend::<common_meta::error::Error>::new());
let cached_kv = CachedKvBackend::wrap(Arc::new(ReadOnlyKvBackend::new(inner)));

let err = cached_kv
.put(PutRequest {
key: b"k1".to_vec(),
value: b"v1".to_vec(),
prev_kv: false,
})
.await
.unwrap_err();

assert!(matches!(
err,
common_meta::error::Error::ReadOnlyKvBackend { .. }
));
}

#[tokio::test]
async fn test_read_only_meta_kv_backend_rejects_writes() {
let meta_client = Arc::new(MetaClientBuilder::frontend_default_options().build());
let backend = new_read_only_meta_kv_backend(meta_client);

let err = backend
.put(PutRequest {
key: b"k1".to_vec(),
value: b"v1".to_vec(),
prev_kv: false,
})
.await
.unwrap_err();

assert!(matches!(
err,
common_meta::error::Error::ReadOnlyKvBackend { .. }
));
}

#[tokio::test]
async fn test_read_only_meta_kv_backend_does_not_emulate_txn() {
let meta_client = Arc::new(MetaClientBuilder::frontend_default_options().build());
let backend = new_read_only_meta_kv_backend(meta_client);

let result = backend
.txn(Txn::new().and_then(vec![TxnOp::Get(b"k1".to_vec())]))
.await;
let err = match result {
Ok(_) => panic!("expected unsupported txn error"),
Err(err) => err,
};

assert!(matches!(err, common_meta::error::Error::Unsupported { .. }));
}

async fn add_some_vals(kv_backend: &impl KvBackend) {
kv_backend
.put(PutRequest {
Expand Down
6 changes: 2 additions & 4 deletions src/cmd/src/datanode/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
use std::sync::Arc;

use cache::build_datanode_cache_registry;
use catalog::kvbackend::MetaKvBackend;
use catalog::kvbackend::new_read_only_meta_kv_backend;
use common_base::Plugins;
use common_meta::cache::LayeredCacheRegistryBuilder;
use common_telemetry::info;
Expand Down Expand Up @@ -99,9 +99,7 @@ impl InstanceBuilder {
.await
.context(MetaClientInitSnafu)?;

let backend = Arc::new(MetaKvBackend {
client: client.clone(),
});
let backend = new_read_only_meta_kv_backend(client.clone());
let mut builder = DatanodeBuilder::new(dn_opts.clone(), plugins.clone(), backend.clone());

let registry = Arc::new(
Expand Down
27 changes: 13 additions & 14 deletions src/cmd/src/flownode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@ use std::time::Duration;

use cache::{build_fundamental_cache_registry, with_default_composite_cache_registry};
use catalog::information_extension::DistributedInformationExtension;
use catalog::kvbackend::{CachedKvBackendBuilder, KvBackendCatalogManagerBuilder, MetaKvBackend};
use catalog::kvbackend::{
CachedKvBackendBuilder, KvBackendCatalogManagerBuilder, new_read_only_meta_kv_backend,
};
use clap::Parser;
use client::client_manager::NodeClients;
use common_base::Plugins;
Expand All @@ -46,8 +48,8 @@ use snafu::{OptionExt, ResultExt, ensure};
use tracing_appender::non_blocking::WorkerGuard;

use crate::error::{
BuildCacheRegistrySnafu, InitMetadataSnafu, LoadLayeredConfigSnafu, MetaClientInitSnafu,
MissingConfigSnafu, OtherSnafu, Result, ShutdownFlownodeSnafu, StartFlownodeSnafu,
BuildCacheRegistrySnafu, LoadLayeredConfigSnafu, MetaClientInitSnafu, MissingConfigSnafu,
OtherSnafu, Result, ShutdownFlownodeSnafu, StartFlownodeSnafu,
};
use crate::options::{GlobalOptions, GreptimeOptions};
use crate::{App, create_resource_limit_metrics, log_versions, maybe_activate_heap_profile};
Expand Down Expand Up @@ -300,13 +302,14 @@ impl StartCommand {
let cache_ttl = meta_config.metadata_cache_ttl;
let cache_tti = meta_config.metadata_cache_tti;

let readonly_meta_backend = new_read_only_meta_kv_backend(meta_client.clone());

// TODO(discord9): add helper function to ease the creation of cache registry&such
let cached_meta_backend =
CachedKvBackendBuilder::new(Arc::new(MetaKvBackend::new(meta_client.clone())))
.cache_max_capacity(cache_max_capacity)
.cache_ttl(cache_ttl)
.cache_tti(cache_tti)
.build();
let cached_meta_backend = CachedKvBackendBuilder::new(readonly_meta_backend.clone())
.cache_max_capacity(cache_max_capacity)
.cache_ttl(cache_ttl)
.cache_tti(cache_tti)
.build();
let cached_meta_backend = Arc::new(cached_meta_backend);

// Builds cache registry
Expand All @@ -316,7 +319,7 @@ impl StartCommand {
.build(),
);
let fundamental_cache_registry =
build_fundamental_cache_registry(Arc::new(MetaKvBackend::new(meta_client.clone())));
build_fundamental_cache_registry(readonly_meta_backend.clone());
let layered_cache_registry = Arc::new(
with_default_composite_cache_registry(
layered_cache_builder.add_cache_registry(fundamental_cache_registry),
Expand Down Expand Up @@ -346,10 +349,6 @@ impl StartCommand {

let table_metadata_manager =
Arc::new(TableMetadataManager::new(cached_meta_backend.clone()));
table_metadata_manager
.init()
.await
.context(InitMetadataSnafu)?;

let executor = HandlerGroupExecutor::new(vec![
Arc::new(ParseMailboxMessageHandler),
Expand Down
17 changes: 9 additions & 8 deletions src/cmd/src/frontend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use catalog::information_extension::DistributedInformationExtension;
use catalog::information_schema::InformationExtensionRef;
use catalog::kvbackend::{
CachedKvBackendBuilder, CatalogManagerConfiguratorRef, KvBackendCatalogManagerBuilder,
MetaKvBackend,
new_read_only_meta_kv_backend,
};
use catalog::process_manager::ProcessManager;
use clap::Parser;
Expand Down Expand Up @@ -393,13 +393,14 @@ impl StartCommand {
.await
.context(error::StartFrontendSnafu)?;

let readonly_meta_backend = new_read_only_meta_kv_backend(meta_client.clone());

// TODO(discord9): add helper function to ease the creation of cache registry&such
let cached_meta_backend =
CachedKvBackendBuilder::new(Arc::new(MetaKvBackend::new(meta_client.clone())))
.cache_max_capacity(cache_max_capacity)
.cache_ttl(cache_ttl)
.cache_tti(cache_tti)
.build();
let cached_meta_backend = CachedKvBackendBuilder::new(readonly_meta_backend.clone())
.cache_max_capacity(cache_max_capacity)
.cache_ttl(cache_ttl)
.cache_tti(cache_tti)
.build();
let cached_meta_backend = Arc::new(cached_meta_backend);

// Builds cache registry
Expand All @@ -409,7 +410,7 @@ impl StartCommand {
.build(),
);
let fundamental_cache_registry =
build_fundamental_cache_registry(Arc::new(MetaKvBackend::new(meta_client.clone())));
build_fundamental_cache_registry(readonly_meta_backend.clone());
let layered_cache_registry = Arc::new(
with_default_composite_cache_registry(
layered_cache_builder.add_cache_registry(fundamental_cache_registry),
Expand Down
9 changes: 8 additions & 1 deletion src/common/meta/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,13 @@ pub enum Error {
location: Location,
},

#[snafu(display("Trying to write to a read-only kv backend: {}", name))]
ReadOnlyKvBackend {
name: String,
#[snafu(implicit)]
location: Location,
},

#[snafu(display("Failed to get procedure state receiver, procedure id: {procedure_id}"))]
ProcedureStateReceiver {
procedure_id: ProcedureId,
Expand Down Expand Up @@ -1146,7 +1153,7 @@ impl ErrorExt for Error {
| ColumnIdMismatch { .. }
| TimestampMismatch { .. } => StatusCode::Unexpected,

Unsupported { .. } => StatusCode::Unsupported,
Unsupported { .. } | ReadOnlyKvBackend { .. } => StatusCode::Unsupported,
WriteObject { .. } | ReadObject { .. } => StatusCode::StorageUnavailable,

SerdeJson { .. }
Expand Down
41 changes: 41 additions & 0 deletions src/common/meta/src/key.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1660,6 +1660,7 @@ mod tests {
};
use crate::kv_backend::KvBackend;
use crate::kv_backend::memory::MemoryKvBackend;
use crate::kv_backend::read_only::ReadOnlyKvBackend;
use crate::peer::Peer;
use crate::rpc::router::{LeaderState, Region, RegionRoute, region_distribution};
use crate::rpc::store::{PutRequest, RangeRequest};
Expand Down Expand Up @@ -1948,6 +1949,46 @@ mod tests {
assert_eq!(region_routes[0].follower_peers[0].addr, "new-a3");
}

#[tokio::test]
async fn test_get_full_table_info_with_read_only_kv_backend() {
Comment thread
WenyXu marked this conversation as resolved.
let mem_kv = Arc::new(MemoryKvBackend::default());
let writable_manager = TableMetadataManager::new(mem_kv.clone());

let region_routes = vec![new_test_region_route()];
let table_info = new_test_table_info();
let table_id = table_info.ident.table_id;

create_physical_table_metadata(
&writable_manager,
table_info.clone(),
region_routes.clone(),
HashMap::new(),
)
.await
.unwrap();

let read_only_kv = Arc::new(ReadOnlyKvBackend::new(mem_kv));
let read_only_manager = TableMetadataManager::new(read_only_kv);

let (remote_table_info, remote_table_route) = read_only_manager
.get_full_table_info(table_id)
.await
.unwrap();

assert_eq!(
remote_table_info.unwrap().into_inner().table_info,
table_info
);
assert_eq!(
remote_table_route
.unwrap()
.into_inner()
.region_routes()
.unwrap(),
&region_routes
);
}

#[tokio::test]
async fn test_create_logic_tables_metadata() {
let mem_kv = Arc::new(MemoryKvBackend::default());
Expand Down
1 change: 1 addition & 0 deletions src/common/meta/src/kv_backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ pub mod etcd;
pub mod memory;
#[cfg(any(feature = "mysql_kvbackend", feature = "pg_kvbackend"))]
pub mod rds;
pub mod read_only;
pub mod test;
#[cfg(any(test, feature = "testing"))]
pub mod test_util;
Expand Down
Loading
Loading