diff --git a/src/catalog/src/kvbackend.rs b/src/catalog/src/kvbackend.rs index 334acc999c62..77eb3f204823 100644 --- a/src/catalog/src/kvbackend.rs +++ b/src/catalog/src/kvbackend.rs @@ -20,6 +20,6 @@ mod table_cache; pub use builder::{ CatalogManagerConfigurator, CatalogManagerConfiguratorRef, KvBackendCatalogManagerBuilder, }; -pub use client::{CachedKvBackend, CachedKvBackendBuilder, MetaKvBackend}; +pub use client::{CachedKvBackend, CachedKvBackendBuilder, new_read_only_meta_kv_backend}; pub use manager::KvBackendCatalogManager; pub use table_cache::{TableCache, TableCacheRef, new_table_cache}; diff --git a/src/catalog/src/kvbackend/client.rs b/src/catalog/src/kvbackend/client.rs index f74509217fb0..69b31d62322e 100644 --- a/src/catalog/src/kvbackend/client.rs +++ b/src/catalog/src/kvbackend/client.rs @@ -21,7 +21,10 @@ use std::time::Duration; use common_error::ext::BoxedError; use common_meta::cache_invalidator::KvCacheInvalidator; use common_meta::error::Error::CacheNotGet; -use common_meta::error::{CacheNotGetSnafu, Error, ExternalSnafu, GetKvCacheSnafu, Result}; +use common_meta::error::{ + CacheNotGetSnafu, Error, ExternalSnafu, GetKvCacheSnafu, Result, UnsupportedSnafu, +}; +use common_meta::kv_backend::read_only::ReadOnlyKvBackend; use common_meta::kv_backend::txn::{Txn, TxnResponse}; use common_meta::kv_backend::{KvBackend, KvBackendRef, TxnService}; use common_meta::rpc::KeyValue; @@ -357,19 +360,35 @@ impl CachedKvBackend { } #[derive(Debug)] -pub struct MetaKvBackend { - pub client: Arc, +pub(crate) struct MetaKvBackend { + client: Arc, } impl MetaKvBackend { /// Constructs a [MetaKvBackend]. - pub fn new(client: Arc) -> MetaKvBackend { + fn new(client: Arc) -> MetaKvBackend { MetaKvBackend { client } } } +pub fn new_read_only_meta_kv_backend(client: Arc) -> KvBackendRef { + Arc::new(ReadOnlyKvBackend::new(Arc::new(MetaKvBackend::new(client)))) +} + +#[async_trait::async_trait] impl TxnService for MetaKvBackend { type Error = Error; + + async fn txn(&self, _txn: Txn) -> Result { + UnsupportedSnafu { + operation: "MetaKvBackend txn", + } + .fail() + } + + fn max_txn_ops(&self) -> usize { + usize::MAX + } } /// Implement `KvBackend` trait for `MetaKvBackend` instead of opendal's `Accessor` since @@ -465,6 +484,9 @@ mod tests { use std::sync::atomic::{AtomicU32, Ordering}; use async_trait::async_trait; + use common_meta::kv_backend::memory::MemoryKvBackend; + use common_meta::kv_backend::read_only::ReadOnlyKvBackend; + use common_meta::kv_backend::txn::{Txn, TxnOp}; use common_meta::kv_backend::{KvBackend, TxnService}; use common_meta::rpc::KeyValue; use common_meta::rpc::store::{ @@ -473,8 +495,9 @@ mod tests { PutResponse, RangeRequest, RangeResponse, }; use dashmap::DashMap; + use meta_client::client::MetaClientBuilder; - use super::CachedKvBackend; + use super::{CachedKvBackend, new_read_only_meta_kv_backend}; #[derive(Default)] pub struct SimpleKvBackend { @@ -579,6 +602,62 @@ mod tests { } } + #[tokio::test] + async fn test_cached_kv_backend_rejects_writes_with_read_only_inner() { + let inner = Arc::new(MemoryKvBackend::::new()); + let cached_kv = CachedKvBackend::wrap(Arc::new(ReadOnlyKvBackend::new(inner))); + + let err = cached_kv + .put(PutRequest { + key: b"k1".to_vec(), + value: b"v1".to_vec(), + prev_kv: false, + }) + .await + .unwrap_err(); + + assert!(matches!( + err, + common_meta::error::Error::ReadOnlyKvBackend { .. } + )); + } + + #[tokio::test] + async fn test_read_only_meta_kv_backend_rejects_writes() { + let meta_client = Arc::new(MetaClientBuilder::frontend_default_options().build()); + let backend = new_read_only_meta_kv_backend(meta_client); + + let err = backend + .put(PutRequest { + key: b"k1".to_vec(), + value: b"v1".to_vec(), + prev_kv: false, + }) + .await + .unwrap_err(); + + assert!(matches!( + err, + common_meta::error::Error::ReadOnlyKvBackend { .. } + )); + } + + #[tokio::test] + async fn test_read_only_meta_kv_backend_does_not_emulate_txn() { + let meta_client = Arc::new(MetaClientBuilder::frontend_default_options().build()); + let backend = new_read_only_meta_kv_backend(meta_client); + + let result = backend + .txn(Txn::new().and_then(vec![TxnOp::Get(b"k1".to_vec())])) + .await; + let err = match result { + Ok(_) => panic!("expected unsupported txn error"), + Err(err) => err, + }; + + assert!(matches!(err, common_meta::error::Error::Unsupported { .. })); + } + async fn add_some_vals(kv_backend: &impl KvBackend) { kv_backend .put(PutRequest { diff --git a/src/cmd/src/datanode/builder.rs b/src/cmd/src/datanode/builder.rs index cfde0c349aed..90f38b22bd68 100644 --- a/src/cmd/src/datanode/builder.rs +++ b/src/cmd/src/datanode/builder.rs @@ -15,7 +15,7 @@ use std::sync::Arc; use cache::build_datanode_cache_registry; -use catalog::kvbackend::MetaKvBackend; +use catalog::kvbackend::new_read_only_meta_kv_backend; use common_base::Plugins; use common_meta::cache::LayeredCacheRegistryBuilder; use common_telemetry::info; @@ -99,9 +99,7 @@ impl InstanceBuilder { .await .context(MetaClientInitSnafu)?; - let backend = Arc::new(MetaKvBackend { - client: client.clone(), - }); + let backend = new_read_only_meta_kv_backend(client.clone()); let mut builder = DatanodeBuilder::new(dn_opts.clone(), plugins.clone(), backend.clone()); let registry = Arc::new( diff --git a/src/cmd/src/flownode.rs b/src/cmd/src/flownode.rs index 535c3e1933c8..df7a0725e9a4 100644 --- a/src/cmd/src/flownode.rs +++ b/src/cmd/src/flownode.rs @@ -19,7 +19,9 @@ use std::time::Duration; use cache::{build_fundamental_cache_registry, with_default_composite_cache_registry}; use catalog::information_extension::DistributedInformationExtension; -use catalog::kvbackend::{CachedKvBackendBuilder, KvBackendCatalogManagerBuilder, MetaKvBackend}; +use catalog::kvbackend::{ + CachedKvBackendBuilder, KvBackendCatalogManagerBuilder, new_read_only_meta_kv_backend, +}; use clap::Parser; use client::client_manager::NodeClients; use common_base::Plugins; @@ -46,8 +48,8 @@ use snafu::{OptionExt, ResultExt, ensure}; use tracing_appender::non_blocking::WorkerGuard; use crate::error::{ - BuildCacheRegistrySnafu, InitMetadataSnafu, LoadLayeredConfigSnafu, MetaClientInitSnafu, - MissingConfigSnafu, OtherSnafu, Result, ShutdownFlownodeSnafu, StartFlownodeSnafu, + BuildCacheRegistrySnafu, LoadLayeredConfigSnafu, MetaClientInitSnafu, MissingConfigSnafu, + OtherSnafu, Result, ShutdownFlownodeSnafu, StartFlownodeSnafu, }; use crate::options::{GlobalOptions, GreptimeOptions}; use crate::{App, create_resource_limit_metrics, log_versions, maybe_activate_heap_profile}; @@ -300,13 +302,14 @@ impl StartCommand { let cache_ttl = meta_config.metadata_cache_ttl; let cache_tti = meta_config.metadata_cache_tti; + let readonly_meta_backend = new_read_only_meta_kv_backend(meta_client.clone()); + // TODO(discord9): add helper function to ease the creation of cache registry&such - let cached_meta_backend = - CachedKvBackendBuilder::new(Arc::new(MetaKvBackend::new(meta_client.clone()))) - .cache_max_capacity(cache_max_capacity) - .cache_ttl(cache_ttl) - .cache_tti(cache_tti) - .build(); + let cached_meta_backend = CachedKvBackendBuilder::new(readonly_meta_backend.clone()) + .cache_max_capacity(cache_max_capacity) + .cache_ttl(cache_ttl) + .cache_tti(cache_tti) + .build(); let cached_meta_backend = Arc::new(cached_meta_backend); // Builds cache registry @@ -316,7 +319,7 @@ impl StartCommand { .build(), ); let fundamental_cache_registry = - build_fundamental_cache_registry(Arc::new(MetaKvBackend::new(meta_client.clone()))); + build_fundamental_cache_registry(readonly_meta_backend.clone()); let layered_cache_registry = Arc::new( with_default_composite_cache_registry( layered_cache_builder.add_cache_registry(fundamental_cache_registry), @@ -346,10 +349,6 @@ impl StartCommand { let table_metadata_manager = Arc::new(TableMetadataManager::new(cached_meta_backend.clone())); - table_metadata_manager - .init() - .await - .context(InitMetadataSnafu)?; let executor = HandlerGroupExecutor::new(vec![ Arc::new(ParseMailboxMessageHandler), diff --git a/src/cmd/src/frontend.rs b/src/cmd/src/frontend.rs index 24b11b00a7f5..deaf31aa819e 100644 --- a/src/cmd/src/frontend.rs +++ b/src/cmd/src/frontend.rs @@ -23,7 +23,7 @@ use catalog::information_extension::DistributedInformationExtension; use catalog::information_schema::InformationExtensionRef; use catalog::kvbackend::{ CachedKvBackendBuilder, CatalogManagerConfiguratorRef, KvBackendCatalogManagerBuilder, - MetaKvBackend, + new_read_only_meta_kv_backend, }; use catalog::process_manager::ProcessManager; use clap::Parser; @@ -393,13 +393,14 @@ impl StartCommand { .await .context(error::StartFrontendSnafu)?; + let readonly_meta_backend = new_read_only_meta_kv_backend(meta_client.clone()); + // TODO(discord9): add helper function to ease the creation of cache registry&such - let cached_meta_backend = - CachedKvBackendBuilder::new(Arc::new(MetaKvBackend::new(meta_client.clone()))) - .cache_max_capacity(cache_max_capacity) - .cache_ttl(cache_ttl) - .cache_tti(cache_tti) - .build(); + let cached_meta_backend = CachedKvBackendBuilder::new(readonly_meta_backend.clone()) + .cache_max_capacity(cache_max_capacity) + .cache_ttl(cache_ttl) + .cache_tti(cache_tti) + .build(); let cached_meta_backend = Arc::new(cached_meta_backend); // Builds cache registry @@ -409,7 +410,7 @@ impl StartCommand { .build(), ); let fundamental_cache_registry = - build_fundamental_cache_registry(Arc::new(MetaKvBackend::new(meta_client.clone()))); + build_fundamental_cache_registry(readonly_meta_backend.clone()); let layered_cache_registry = Arc::new( with_default_composite_cache_registry( layered_cache_builder.add_cache_registry(fundamental_cache_registry), diff --git a/src/common/meta/src/error.rs b/src/common/meta/src/error.rs index 05b5af393bcd..54a5c30fede7 100644 --- a/src/common/meta/src/error.rs +++ b/src/common/meta/src/error.rs @@ -155,6 +155,13 @@ pub enum Error { location: Location, }, + #[snafu(display("Trying to write to a read-only kv backend: {}", name))] + ReadOnlyKvBackend { + name: String, + #[snafu(implicit)] + location: Location, + }, + #[snafu(display("Failed to get procedure state receiver, procedure id: {procedure_id}"))] ProcedureStateReceiver { procedure_id: ProcedureId, @@ -1146,7 +1153,7 @@ impl ErrorExt for Error { | ColumnIdMismatch { .. } | TimestampMismatch { .. } => StatusCode::Unexpected, - Unsupported { .. } => StatusCode::Unsupported, + Unsupported { .. } | ReadOnlyKvBackend { .. } => StatusCode::Unsupported, WriteObject { .. } | ReadObject { .. } => StatusCode::StorageUnavailable, SerdeJson { .. } diff --git a/src/common/meta/src/key.rs b/src/common/meta/src/key.rs index 3fd5648093d4..44ed11148a01 100644 --- a/src/common/meta/src/key.rs +++ b/src/common/meta/src/key.rs @@ -1660,6 +1660,7 @@ mod tests { }; use crate::kv_backend::KvBackend; use crate::kv_backend::memory::MemoryKvBackend; + use crate::kv_backend::read_only::ReadOnlyKvBackend; use crate::peer::Peer; use crate::rpc::router::{LeaderState, Region, RegionRoute, region_distribution}; use crate::rpc::store::{PutRequest, RangeRequest}; @@ -1948,6 +1949,46 @@ mod tests { assert_eq!(region_routes[0].follower_peers[0].addr, "new-a3"); } + #[tokio::test] + async fn test_get_full_table_info_with_read_only_kv_backend() { + let mem_kv = Arc::new(MemoryKvBackend::default()); + let writable_manager = TableMetadataManager::new(mem_kv.clone()); + + let region_routes = vec![new_test_region_route()]; + let table_info = new_test_table_info(); + let table_id = table_info.ident.table_id; + + create_physical_table_metadata( + &writable_manager, + table_info.clone(), + region_routes.clone(), + HashMap::new(), + ) + .await + .unwrap(); + + let read_only_kv = Arc::new(ReadOnlyKvBackend::new(mem_kv)); + let read_only_manager = TableMetadataManager::new(read_only_kv); + + let (remote_table_info, remote_table_route) = read_only_manager + .get_full_table_info(table_id) + .await + .unwrap(); + + assert_eq!( + remote_table_info.unwrap().into_inner().table_info, + table_info + ); + assert_eq!( + remote_table_route + .unwrap() + .into_inner() + .region_routes() + .unwrap(), + ®ion_routes + ); + } + #[tokio::test] async fn test_create_logic_tables_metadata() { let mem_kv = Arc::new(MemoryKvBackend::default()); diff --git a/src/common/meta/src/kv_backend.rs b/src/common/meta/src/kv_backend.rs index 7f747508d44c..10e51578505e 100644 --- a/src/common/meta/src/kv_backend.rs +++ b/src/common/meta/src/kv_backend.rs @@ -33,6 +33,7 @@ pub mod etcd; pub mod memory; #[cfg(any(feature = "mysql_kvbackend", feature = "pg_kvbackend"))] pub mod rds; +pub mod read_only; pub mod test; #[cfg(any(test, feature = "testing"))] pub mod test_util; diff --git a/src/common/meta/src/kv_backend/read_only.rs b/src/common/meta/src/kv_backend/read_only.rs new file mode 100644 index 000000000000..43c8d2067a7d --- /dev/null +++ b/src/common/meta/src/kv_backend/read_only.rs @@ -0,0 +1,348 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::any::Any; + +use crate::error::{ReadOnlyKvBackendSnafu, Result}; +use crate::kv_backend::txn::{Txn, TxnOp, TxnResponse}; +use crate::kv_backend::{KvBackend, KvBackendRef, TxnService}; +use crate::rpc::KeyValue; +use crate::rpc::store::{ + BatchDeleteRequest, BatchDeleteResponse, BatchGetRequest, BatchGetResponse, BatchPutRequest, + BatchPutResponse, CompareAndPutRequest, CompareAndPutResponse, DeleteRangeRequest, + DeleteRangeResponse, PutRequest, PutResponse, RangeRequest, RangeResponse, +}; + +/// A [`KvBackend`] wrapper that forwards reads and rejects writes. +pub struct ReadOnlyKvBackend { + inner: KvBackendRef, + name: String, +} + +impl ReadOnlyKvBackend { + pub fn new(inner: KvBackendRef) -> Self { + let name = format!("ReadOnlyKvBackend({})", inner.name()); + Self { inner, name } + } + + fn read_only(&self) -> Result { + ReadOnlyKvBackendSnafu { + name: self.name.clone(), + } + .fail() + } + + fn validate_read_only_ops(&self, ops: &[TxnOp]) -> Result<()> { + if ops + .iter() + .any(|op| matches!(op, TxnOp::Put(_, _) | TxnOp::Delete(_))) + { + self.read_only() + } else { + Ok(()) + } + } + + fn validate_read_only_txn(&self, txn: &Txn) -> Result<()> { + self.validate_read_only_ops(&txn.req.success)?; + self.validate_read_only_ops(&txn.req.failure) + } +} + +#[async_trait::async_trait] +impl TxnService for ReadOnlyKvBackend { + type Error = crate::error::Error; + + async fn txn(&self, txn: Txn) -> Result { + self.validate_read_only_txn(&txn)?; + self.inner.txn(txn).await + } + + fn max_txn_ops(&self) -> usize { + self.inner.max_txn_ops() + } +} + +#[async_trait::async_trait] +impl KvBackend for ReadOnlyKvBackend { + fn name(&self) -> &str { + &self.name + } + + fn as_any(&self) -> &dyn Any { + self + } + + async fn range(&self, req: RangeRequest) -> Result { + self.inner.range(req).await + } + + async fn put(&self, _req: PutRequest) -> Result { + self.read_only() + } + + async fn batch_put(&self, _req: BatchPutRequest) -> Result { + self.read_only() + } + + async fn batch_get(&self, req: BatchGetRequest) -> Result { + self.inner.batch_get(req).await + } + + async fn compare_and_put(&self, _req: CompareAndPutRequest) -> Result { + self.read_only() + } + + async fn delete_range(&self, _req: DeleteRangeRequest) -> Result { + self.read_only() + } + + async fn batch_delete(&self, _req: BatchDeleteRequest) -> Result { + self.read_only() + } + + async fn get(&self, key: &[u8]) -> Result> { + self.inner.get(key).await + } +} + +#[cfg(test)] +mod tests { + use std::sync::Arc; + + use common_error::ext::ErrorExt; + use common_error::status_code::StatusCode; + + use super::*; + use crate::error::Error; + use crate::kv_backend::memory::MemoryKvBackend; + use crate::kv_backend::txn::{Compare, CompareOp, TxnOpResponse}; + use crate::rpc::store::{ + BatchDeleteRequest, BatchGetRequest, BatchPutRequest, CompareAndPutRequest, + DeleteRangeRequest, PutRequest, + }; + + async fn read_only_backend() -> ReadOnlyKvBackend { + let inner = Arc::new(MemoryKvBackend::::new()); + inner + .put(PutRequest::new().with_key(b"k1").with_value(b"v1")) + .await + .unwrap(); + inner + .put(PutRequest::new().with_key(b"k2").with_value(b"v2")) + .await + .unwrap(); + + ReadOnlyKvBackend::new(inner) + } + + fn assert_read_only(result: Result) { + let err = match result { + Ok(_) => panic!("expected read-only error"), + Err(err) => err, + }; + assert!(matches!(err, Error::ReadOnlyKvBackend { .. })); + assert_eq!(err.status_code(), StatusCode::Unsupported); + } + + struct TxnOnlyBackend; + + #[async_trait::async_trait] + impl TxnService for TxnOnlyBackend { + type Error = Error; + + async fn txn(&self, _txn: Txn) -> Result { + Ok(TxnResponse { + succeeded: true, + responses: vec![TxnOpResponse::ResponseGet(RangeResponse { + kvs: vec![KeyValue { + key: b"k1".to_vec(), + value: b"v1".to_vec(), + }], + more: false, + })], + }) + } + + fn max_txn_ops(&self) -> usize { + 7 + } + } + + #[async_trait::async_trait] + impl KvBackend for TxnOnlyBackend { + fn name(&self) -> &str { + "TxnOnlyBackend" + } + + fn as_any(&self) -> &dyn Any { + self + } + + async fn range(&self, _req: RangeRequest) -> Result { + unimplemented!("read-only txn should delegate to inner txn") + } + + async fn put(&self, _req: PutRequest) -> Result { + unimplemented!("read-only txn should delegate to inner txn") + } + + async fn batch_put(&self, _req: BatchPutRequest) -> Result { + unimplemented!("read-only txn should delegate to inner txn") + } + + async fn batch_get(&self, _req: BatchGetRequest) -> Result { + unimplemented!("read-only txn should delegate to inner txn") + } + + async fn delete_range(&self, _req: DeleteRangeRequest) -> Result { + unimplemented!("read-only txn should delegate to inner txn") + } + + async fn batch_delete(&self, _req: BatchDeleteRequest) -> Result { + unimplemented!("read-only txn should delegate to inner txn") + } + } + + #[tokio::test] + async fn test_read_only_backend_forwards_reads() { + let backend = read_only_backend().await; + + let range = backend + .range(RangeRequest::new().with_key(b"k1")) + .await + .unwrap(); + assert_eq!(range.kvs.len(), 1); + assert_eq!(range.kvs[0].value, b"v1"); + + let kv = backend.get(b"k2").await.unwrap().unwrap(); + assert_eq!(kv.value, b"v2"); + + let batch = backend + .batch_get(BatchGetRequest::new().add_key(b"k1").add_key(b"k2")) + .await + .unwrap(); + assert_eq!(batch.kvs.len(), 2); + } + + #[tokio::test] + async fn test_read_only_backend_rejects_writes() { + let backend = read_only_backend().await; + + assert_read_only( + backend + .put(PutRequest::new().with_key(b"k3").with_value(b"v3")) + .await, + ); + assert_read_only( + backend + .batch_put(BatchPutRequest::new().add_kv(b"k3", b"v3")) + .await, + ); + assert_read_only( + backend + .compare_and_put( + CompareAndPutRequest::new() + .with_key(b"k1") + .with_expect(b"v1") + .with_value(b"v3"), + ) + .await, + ); + assert_read_only( + backend + .delete_range(DeleteRangeRequest::new().with_key(b"k1")) + .await, + ); + assert_read_only( + backend + .batch_delete(BatchDeleteRequest::new().add_key(b"k1")) + .await, + ); + } + + #[tokio::test] + async fn test_read_only_backend_rejects_write_txn() { + let backend = read_only_backend().await; + + assert_eq!(backend.max_txn_ops(), usize::MAX); + assert_read_only( + backend + .txn(Txn::put_if_not_exists(b"k3".to_vec(), b"v3".to_vec())) + .await, + ); + } + + #[tokio::test] + async fn test_read_only_backend_delegates_read_txn() { + let backend = ReadOnlyKvBackend::new(Arc::new(TxnOnlyBackend)); + + assert_eq!(backend.max_txn_ops(), 7); + let resp = backend + .txn(Txn::new().and_then(vec![TxnOp::Get(b"k1".to_vec())])) + .await + .unwrap(); + + assert!(resp.succeeded); + let TxnOpResponse::ResponseGet(range) = &resp.responses[0] else { + panic!("expected get response"); + }; + assert_eq!(range.kvs[0].value, b"v1"); + } + + #[tokio::test] + async fn test_read_only_backend_allows_get_only_txn() { + let backend = read_only_backend().await; + + let resp = backend + .txn(Txn::new().and_then(vec![TxnOp::Get(b"k1".to_vec()), TxnOp::Get(b"k2".to_vec())])) + .await + .unwrap(); + + assert!(resp.succeeded); + assert_eq!(resp.responses.len(), 2); + let TxnOpResponse::ResponseGet(range) = &resp.responses[0] else { + panic!("expected get response"); + }; + assert_eq!(range.kvs.len(), 1); + assert_eq!(range.kvs[0].value, b"v1"); + let TxnOpResponse::ResponseGet(range) = &resp.responses[1] else { + panic!("expected get response"); + }; + assert_eq!(range.kvs.len(), 1); + assert_eq!(range.kvs[0].value, b"v2"); + } + + #[tokio::test] + async fn test_read_only_backend_allows_compare_and_get_txn() { + let backend = read_only_backend().await; + + let txn = Txn::new() + .when(vec![Compare::with_value( + b"k1".to_vec(), + CompareOp::Equal, + b"v1".to_vec(), + )]) + .and_then(vec![TxnOp::Get(b"k2".to_vec())]) + .or_else(vec![TxnOp::Get(b"k1".to_vec())]); + let resp = backend.txn(txn).await.unwrap(); + + assert!(resp.succeeded); + let TxnOpResponse::ResponseGet(range) = &resp.responses[0] else { + panic!("expected get response"); + }; + assert_eq!(range.kvs.len(), 1); + assert_eq!(range.kvs[0].value, b"v2"); + } +} diff --git a/src/meta-client/examples/meta_client.rs b/src/meta-client/examples/meta_client.rs index 5db13c965875..e1cea4be4dac 100644 --- a/src/meta-client/examples/meta_client.rs +++ b/src/meta-client/examples/meta_client.rs @@ -16,10 +16,7 @@ use std::time::Duration; use api::v1::meta::{HeartbeatRequest, Peer}; use common_grpc::channel_manager::{ChannelConfig, ChannelManager}; -use common_meta::rpc::store::{ - BatchDeleteRequest, BatchGetRequest, BatchPutRequest, CompareAndPutRequest, DeleteRangeRequest, - PutRequest, RangeRequest, -}; +use common_meta::rpc::store::{BatchGetRequest, RangeRequest}; use meta_client::client::MetaClientBuilder; use tracing::{Level, event, subscriber}; use tracing_subscriber::FmtSubscriber; @@ -67,14 +64,6 @@ async fn run() { } }); - // put - let put = PutRequest::new() - .with_key(b"key1".to_vec()) - .with_value(b"value1".to_vec()) - .with_prev_kv(); - let res = meta_client.put(put).await.unwrap(); - event!(Level::INFO, "put result: {:#?}", res); - // get let range = RangeRequest::new().with_key(b"key1".to_vec()); let res = meta_client.range(range.clone()).await.unwrap(); @@ -85,60 +74,10 @@ async fn run() { let res = meta_client.range(range2.clone()).await.unwrap(); event!(Level::INFO, "get prefix result: {:#?}", res); - // batch put - let batch_put = BatchPutRequest::new() - .add_kv(b"batch_put1".to_vec(), b"batch_put_v1".to_vec()) - .add_kv(b"batch_put2".to_vec(), b"batch_put_v2".to_vec()) - .with_prev_kv(); - let res = meta_client.batch_put(batch_put).await.unwrap(); - event!(Level::INFO, "batch put result: {:#?}", res); - - // cas - let cas = CompareAndPutRequest::new() - .with_key(b"batch_put1".to_vec()) - .with_expect(b"batch_put_v_fail".to_vec()) - .with_value(b"batch_put_v111".to_vec()); - - let res = meta_client.compare_and_put(cas).await.unwrap(); - event!(Level::INFO, "cas 0 result: {:#?}", res); - - let cas = CompareAndPutRequest::new() - .with_key(b"batch_put1".to_vec()) - .with_expect(b"batch_put_v1".to_vec()) - .with_value(b"batch_put_v111".to_vec()); - - let res = meta_client.compare_and_put(cas).await.unwrap(); - event!(Level::INFO, "cas 1 result: {:#?}", res); - - // delete - let delete_range = DeleteRangeRequest::new().with_key(b"key1".to_vec()); - let res = meta_client.delete_range(delete_range).await.unwrap(); - event!(Level::INFO, "delete range result: {:#?}", res); - - // get none - let res = meta_client.range(range).await.unwrap(); - event!(Level::INFO, "get range result: {:#?}", res); - - // batch delete - // put two - let batch_put = BatchPutRequest::new() - .add_kv(b"batch_put1".to_vec(), b"batch_put_v1".to_vec()) - .add_kv(b"batch_put2".to_vec(), b"batch_put_v2".to_vec()) - .with_prev_kv(); - let res = meta_client.batch_put(batch_put).await.unwrap(); - event!(Level::INFO, "batch put result: {:#?}", res); - - // delete one - let batch_delete = BatchDeleteRequest::new() - .add_key(b"batch_put1".to_vec()) - .with_prev_kv(); - let res = meta_client.batch_delete(batch_delete).await.unwrap(); - event!(Level::INFO, "batch delete result: {:#?}", res); - - // get other one + // batch get let batch_get = BatchGetRequest::new() - .add_key(b"batch_put1".to_vec()) - .add_key(b"batch_put2".to_vec()); + .add_key(b"key1".to_vec()) + .add_key(b"key2".to_vec()); let res = meta_client.batch_get(batch_get).await.unwrap(); event!(Level::INFO, "batch get result: {:#?}", res); diff --git a/src/meta-client/src/client.rs b/src/meta-client/src/client.rs index 205226c84583..5f361d8b829a 100644 --- a/src/meta-client/src/client.rs +++ b/src/meta-client/src/client.rs @@ -91,6 +91,8 @@ pub struct MetaClientBuilder { role: Role, enable_heartbeat: bool, enable_store: bool, + #[cfg(test)] + enable_direct_store_writes: bool, enable_procedure: bool, enable_access_cluster_info: bool, region_follower: Option, @@ -143,6 +145,10 @@ impl MetaClientBuilder { } } + /// Enables the Store client in read-only mode. + /// + /// Store write methods fail fast by default. Metadata writes from production + /// frontend/datanode/flownode clients should go through metasrv procedures. pub fn enable_store(self) -> Self { Self { enable_store: true, @@ -150,6 +156,18 @@ impl MetaClientBuilder { } } + /// Enables direct Store write RPCs for tests. + /// + /// Production metadata writes should use metasrv-owned write paths instead. + #[cfg(test)] + pub(super) fn enable_direct_store_writes_for_test(self) -> Self { + Self { + enable_store: true, + enable_direct_store_writes: true, + ..self + } + } + pub fn enable_procedure(self) -> Self { Self { enable_procedure: true, @@ -216,9 +234,16 @@ impl MetaClientBuilder { let config = self .enable_heartbeat .then(|| ConfigClient::new(self.id, self.role, mgr.clone())); - let store = self - .enable_store - .then(|| StoreClient::new(self.id, self.role, mgr.clone())); + let store = self.enable_store.then(|| { + #[cfg(test)] + { + if self.enable_direct_store_writes { + return StoreClient::new_writable(self.id, self.role, mgr.clone()); + } + } + + StoreClient::new(self.id, self.role, mgr.clone()) + }); let procedure = self.enable_procedure.then(|| { let mgr = self.ddl_channel_manager.unwrap_or(mgr.clone()); ProcedureClient::new( @@ -985,6 +1010,19 @@ mod tests { assert!(matches!(res.err(), Some(error::Error::NotStarted { .. }))); } + #[tokio::test] + async fn test_store_writes_are_read_only_by_default() { + let meta_client = MetaClientBuilder::new(0, Role::Datanode) + .enable_store() + .build(); + + let res = meta_client.put(PutRequest::default()).await; + assert!(matches!( + res.err(), + Some(error::Error::ReadOnlyKvBackend { .. }) + )); + } + #[tokio::test] async fn test_ask_leader() { let tc = new_client("test_ask_leader").await; diff --git a/src/meta-client/src/client/store.rs b/src/meta-client/src/client/store.rs index a7cf449629e9..2aee8d83231b 100644 --- a/src/meta-client/src/client/store.rs +++ b/src/meta-client/src/client/store.rs @@ -39,11 +39,27 @@ pub struct Client { impl Client { pub fn new(id: Id, role: Role, channel_manager: ChannelManager) -> Self { + Self::new_with_read_only(id, role, channel_manager, true) + } + + /// Builds a writable direct Store RPC client for tests. + #[cfg(test)] + pub(super) fn new_writable(id: Id, role: Role, channel_manager: ChannelManager) -> Self { + Self::new_with_read_only(id, role, channel_manager, false) + } + + fn new_with_read_only( + id: Id, + role: Role, + channel_manager: ChannelManager, + read_only: bool, + ) -> Self { let inner = Arc::new(RwLock::new(Inner { id, role, channel_manager, peers: vec![], + read_only, })); Self { inner } @@ -103,6 +119,7 @@ struct Inner { role: Role, channel_manager: ChannelManager, peers: Vec, + read_only: bool, } impl Inner { @@ -142,6 +159,8 @@ impl Inner { } async fn put(&self, mut req: PutRequest) -> Result { + self.ensure_writable()?; + let mut client = self.random_client()?; req.set_header( self.id, @@ -167,6 +186,8 @@ impl Inner { } async fn batch_put(&self, mut req: BatchPutRequest) -> Result { + self.ensure_writable()?; + let mut client = self.random_client()?; req.set_header( self.id, @@ -179,6 +200,8 @@ impl Inner { } async fn batch_delete(&self, mut req: BatchDeleteRequest) -> Result { + self.ensure_writable()?; + let mut client = self.random_client()?; req.set_header( self.id, @@ -194,6 +217,8 @@ impl Inner { &self, mut req: CompareAndPutRequest, ) -> Result { + self.ensure_writable()?; + let mut client = self.random_client()?; req.set_header( self.id, @@ -209,6 +234,8 @@ impl Inner { } async fn delete_range(&self, mut req: DeleteRangeRequest) -> Result { + self.ensure_writable()?; + let mut client = self.random_client()?; req.set_header( self.id, @@ -231,6 +258,17 @@ impl Inner { self.make_client(peer) } + fn ensure_writable(&self) -> Result<()> { + if self.read_only { + return error::ReadOnlyKvBackendSnafu { + name: "MetaClient Store".to_string(), + } + .fail(); + } + + Ok(()) + } + fn make_client(&self, addr: impl AsRef) -> Result> { let channel = self .channel_manager @@ -284,4 +322,26 @@ mod test { .unwrap(); assert_eq!(1, client.inner.write().await.peers.len()); } + + #[tokio::test] + async fn test_read_only_store_rejects_writes_before_rpc() { + let client = Client::new(0, Role::Frontend, ChannelManager::default()); + + fn assert_read_only(result: Result) { + assert!(matches!( + result, + Err(error::Error::ReadOnlyKvBackend { .. }) + )); + } + + assert_read_only(client.put(PutRequest::default()).await); + assert_read_only(client.batch_put(BatchPutRequest::default()).await); + assert_read_only(client.batch_delete(BatchDeleteRequest::default()).await); + assert_read_only( + client + .compare_and_put(CompareAndPutRequest::default()) + .await, + ); + assert_read_only(client.delete_range(DeleteRangeRequest::default()).await); + } } diff --git a/src/meta-client/src/error.rs b/src/meta-client/src/error.rs index 95c820c0390f..19577f672cc0 100644 --- a/src/meta-client/src/error.rs +++ b/src/meta-client/src/error.rs @@ -144,9 +144,10 @@ impl ErrorExt for Error { | Error::CreateHeartbeatStream { .. } | Error::CreateChannel { .. } | Error::RetryTimesExceeded { .. } - | Error::ReadOnlyKvBackend { .. } | Error::ConvertMetaConfig { .. } => StatusCode::Internal, + Error::ReadOnlyKvBackend { .. } => StatusCode::Unsupported, + Error::MetaServer { code, .. } => *code, Error::InvalidResponseHeader { source, .. } diff --git a/src/meta-client/src/mocks.rs b/src/meta-client/src/mocks.rs index 294ac16ef94a..2c80091c5872 100644 --- a/src/meta-client/src/mocks.rs +++ b/src/meta-client/src/mocks.rs @@ -63,6 +63,7 @@ pub async fn mock_client_by(server_addr: String, channel_manager: ChannelManager let id = 2000u64; let mut meta_client = MetaClientBuilder::datanode_default_options(id) .enable_access_cluster_info() + .enable_direct_store_writes_for_test() .channel_manager(channel_manager) .build(); meta_client.start(&[&server_addr]).await.unwrap(); diff --git a/tests-integration/src/cluster.rs b/tests-integration/src/cluster.rs index 4a73014dbf63..0719fcd9f1d5 100644 --- a/tests-integration/src/cluster.rs +++ b/tests-integration/src/cluster.rs @@ -26,7 +26,9 @@ use cache::{ with_default_composite_cache_registry, }; use catalog::information_extension::DistributedInformationExtension; -use catalog::kvbackend::{CachedKvBackendBuilder, KvBackendCatalogManagerBuilder, MetaKvBackend}; +use catalog::kvbackend::{ + CachedKvBackendBuilder, KvBackendCatalogManagerBuilder, new_read_only_meta_kv_backend, +}; use catalog::process_manager::ProcessManager; use client::Client; use client::client_manager::NodeClients; @@ -285,6 +287,9 @@ impl GreptimeDbClusterBuilder { ..Default::default() }; + test_util::prepare_another_catalog_and_schema_with_kv_backend(self.kv_backend.clone()) + .await; + let metasrv = meta_srv::mocks::mock( opt, self.kv_backend.clone(), @@ -307,8 +312,6 @@ impl GreptimeDbClusterBuilder { .build_frontend(metasrv.clone(), datanode_clients, start_frontend_servers) .await; - test_util::prepare_another_catalog_and_schema(&frontend.instance).await; - frontend.start().await.unwrap(); GreptimeDbCluster { @@ -422,9 +425,7 @@ impl GreptimeDbClusterBuilder { meta_client.start(&[&metasrv.server_addr]).await.unwrap(); let meta_client = Arc::new(meta_client); - let meta_backend = Arc::new(MetaKvBackend { - client: meta_client.clone(), - }); + let meta_backend = new_read_only_meta_kv_backend(meta_client.clone()); let layered_cache_registry = Arc::new( LayeredCacheRegistryBuilder::default() @@ -456,9 +457,9 @@ impl GreptimeDbClusterBuilder { meta_client.start(&[&metasrv.server_addr]).await.unwrap(); let meta_client = Arc::new(meta_client); - let cached_meta_backend = Arc::new( - CachedKvBackendBuilder::new(Arc::new(MetaKvBackend::new(meta_client.clone()))).build(), - ); + let readonly_meta_backend = new_read_only_meta_kv_backend(meta_client.clone()); + let cached_meta_backend = + Arc::new(CachedKvBackendBuilder::new(readonly_meta_backend.clone()).build()); let layered_cache_builder = LayeredCacheRegistryBuilder::default().add_cache_registry( CacheRegistryBuilder::default() @@ -466,7 +467,7 @@ impl GreptimeDbClusterBuilder { .build(), ); let fundamental_cache_registry = - build_fundamental_cache_registry(Arc::new(MetaKvBackend::new(meta_client.clone()))); + build_fundamental_cache_registry(readonly_meta_backend.clone()); let cache_registry = Arc::new( with_default_composite_cache_registry( layered_cache_builder.add_cache_registry(fundamental_cache_registry), diff --git a/tests-integration/src/test_util.rs b/tests-integration/src/test_util.rs index 8e7c3ce8a6b7..15d65c34eacf 100644 --- a/tests-integration/src/test_util.rs +++ b/tests-integration/src/test_util.rs @@ -22,8 +22,10 @@ use axum::Router; use catalog::kvbackend::KvBackendCatalogManager; use common_base::Plugins; use common_config::Configurable; +use common_meta::key::TableMetadataManager; use common_meta::key::catalog_name::CatalogNameKey; use common_meta::key::schema_name::SchemaNameKey; +use common_meta::kv_backend::KvBackendRef; use common_query::Output; use common_runtime::runtime::BuilderBuild; use common_runtime::{Builder as RuntimeBuilder, Runtime}; @@ -907,6 +909,17 @@ pub(crate) async fn prepare_another_catalog_and_schema(instance: &Instance) { .unwrap(); let table_metadata_manager = catalog_manager.table_metadata_manager_ref(); + prepare_another_catalog_and_schema_with_manager(table_metadata_manager).await; +} + +pub(crate) async fn prepare_another_catalog_and_schema_with_kv_backend(kv_backend: KvBackendRef) { + let table_metadata_manager = TableMetadataManager::new(kv_backend); + prepare_another_catalog_and_schema_with_manager(&table_metadata_manager).await; +} + +async fn prepare_another_catalog_and_schema_with_manager( + table_metadata_manager: &TableMetadataManager, +) { table_metadata_manager .catalog_manager() .create(CatalogNameKey::new("another_catalog"), true)