diff --git a/Cargo.lock b/Cargo.lock index 29dc40ff85f0..5afdf07a2ad5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5764,7 +5764,7 @@ dependencies = [ [[package]] name = "greptime-proto" version = "0.1.0" -source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=0de5437582920c8b30d6c34212f161db71d95c50#0de5437582920c8b30d6c34212f161db71d95c50" +source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=0bf04b8124fe9d750a4303114989d4563818c666#0bf04b8124fe9d750a4303114989d4563818c666" dependencies = [ "prost 0.14.1", "prost-types 0.14.1", @@ -10728,7 +10728,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "22505a5c94da8e3b7c2996394d1c933236c4d743e81a410bcca4e6989fc066a4" dependencies = [ "bytes", - "heck 0.4.1", + "heck 0.5.0", "itertools 0.12.1", "log", "multimap", @@ -10748,8 +10748,8 @@ version = "0.14.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ac6c3320f9abac597dcbc668774ef006702672474aad53c6d596b62e487b40b1" dependencies = [ - "heck 0.4.1", - "itertools 0.14.0", + "heck 0.5.0", + "itertools 0.11.0", "log", "multimap", "once_cell", @@ -10797,7 +10797,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8a56d757972c98b346a9b766e3f02746cde6dd1cd1d1d563472929fdd74bec4d" dependencies = [ "anyhow", - "itertools 0.14.0", + "itertools 0.11.0", "proc-macro2", "quote", "syn 2.0.117", @@ -10810,7 +10810,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9120690fafc389a67ba3803df527d0ec9cbbc9cc45e4cc20b332996dfb672425" dependencies = [ "anyhow", - "itertools 0.14.0", + "itertools 0.11.0", "proc-macro2", "quote", "syn 2.0.117", @@ -12964,7 +12964,7 @@ version = "0.8.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1961e2ef424c1424204d3a5d6975f934f56b6d50ff5732382d84ebf460e147f7" dependencies = [ - "heck 0.4.1", + "heck 0.5.0", "proc-macro2", "quote", "syn 2.0.117", diff --git a/Cargo.toml b/Cargo.toml index 2972f1bf96c2..a58e8c7ed19c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -157,7 +157,7 @@ fs2 = "0.4" fst = "0.4.7" futures = "0.3" futures-util = "0.3" -greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "0de5437582920c8b30d6c34212f161db71d95c50" } +greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "0bf04b8124fe9d750a4303114989d4563818c666" } hex = "0.4" http = "1" humantime = "2.1" diff --git a/src/cmd/src/standalone.rs b/src/cmd/src/standalone.rs index 0578e145cd12..2ede91923f79 100644 --- a/src/cmd/src/standalone.rs +++ b/src/cmd/src/standalone.rs @@ -531,6 +531,7 @@ impl StartCommand { flow_metadata_manager: flow_metadata_manager.clone(), flow_metadata_allocator: flow_metadata_allocator.clone(), region_failure_detector_controller: Arc::new(NoopRegionFailureDetectorControl), + soft_drop_enabled: false, }; let ddl_manager = DdlManager::try_new( diff --git a/src/common/meta/src/ddl.rs b/src/common/meta/src/ddl.rs index 8fc647433a3c..35a95d54843c 100644 --- a/src/common/meta/src/ddl.rs +++ b/src/common/meta/src/ddl.rs @@ -44,12 +44,14 @@ pub mod drop_flow; pub mod drop_table; pub mod drop_view; pub mod flow_meta; +pub mod purge_dropped_table; pub mod table_meta; #[cfg(any(test, feature = "testing"))] pub mod test_util; #[cfg(test)] pub(crate) mod tests; pub mod truncate_table; +pub mod undrop_table; pub mod utils; /// Metadata allocated to a table. @@ -112,6 +114,8 @@ pub struct DdlContext { pub flow_metadata_allocator: FlowMetadataAllocatorRef, /// controller of region failure detector. pub region_failure_detector_controller: RegionFailureDetectorControllerRef, + /// Whether table drops should stop after tombstoning metadata. + pub soft_drop_enabled: bool, } impl DdlContext { @@ -129,7 +133,10 @@ impl DdlContext { /// /// Once the regions were dropped, subsequent heartbeats no longer include these regions. /// Therefore, we should remove the failure detectors for these dropped regions. - async fn deregister_failure_detectors(&self, detecting_regions: Vec) { + pub(crate) async fn deregister_failure_detectors( + &self, + detecting_regions: Vec, + ) { self.region_failure_detector_controller .deregister_failure_detectors(detecting_regions) .await; diff --git a/src/common/meta/src/ddl/drop_table.rs b/src/common/meta/src/ddl/drop_table.rs index 4f4e2d605351..ed3d551f548a 100644 --- a/src/common/meta/src/ddl/drop_table.rs +++ b/src/common/meta/src/ddl/drop_table.rs @@ -36,7 +36,7 @@ use table::table_reference::TableReference; use self::executor::DropTableExecutor; use crate::ddl::DdlContext; -use crate::ddl::utils::map_to_procedure_error; +use crate::ddl::utils::{convert_region_routes_to_detecting_regions, map_to_procedure_error}; use crate::error::{self, Result}; use crate::key::table_route::TableRouteValue; use crate::lock_key::{CatalogLock, SchemaLock, TableLock}; @@ -149,6 +149,7 @@ impl DropTableProcedure { /// Broadcasts invalidate table cache instruction. async fn on_broadcast(&mut self) -> Result { self.executor.invalidate_table_cache(&self.context).await?; + self.data.state = DropTableState::DatanodeDropRegions; Ok(Status::executing(true)) @@ -172,6 +173,23 @@ impl DropTableProcedure { .await?; } + if self.context.soft_drop_enabled { + self.executor + .on_close_regions( + &self.context.node_manager, + &self.context.leader_region_registry, + &self.data.physical_region_routes, + ) + .await?; + self.context + .deregister_failure_detectors(convert_region_routes_to_detecting_regions( + &self.data.physical_region_routes, + )) + .await; + self.dropping_regions.clear(); + return Ok(Status::done()); + } + self.executor .on_drop_regions( &self.context.node_manager, @@ -182,6 +200,7 @@ impl DropTableProcedure { false, ) .await?; + self.data.state = DropTableState::DeleteTombstone; Ok(Status::executing(true)) } diff --git a/src/common/meta/src/ddl/drop_table/executor.rs b/src/common/meta/src/ddl/drop_table/executor.rs index 271bdbfedee8..1e85a64ef60c 100644 --- a/src/common/meta/src/ddl/drop_table/executor.rs +++ b/src/common/meta/src/ddl/drop_table/executor.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::HashMap; +use std::collections::{HashMap, HashSet}; use api::v1::region::{ CloseRequest as PbCloseRegionRequest, DropRequest as PbDropRegionRequest, RegionRequest, @@ -82,6 +82,8 @@ impl DropTableExecutor { /// Checks whether table exists. /// - Early returns if table not exists and `drop_if_exists` is `true`. /// - Throws an error if table not exists and `drop_if_exists` is `false`. + /// - Rejects dropping a recreated live table while an older tombstone still owns the same + /// fully qualified name. pub async fn on_prepare(&self, ctx: &DdlContext) -> Result> { let table_ref = self.table.table_ref(); @@ -106,6 +108,21 @@ impl DropTableExecutor { } ); + if ctx.soft_drop_enabled + && let Some(dropped_table) = ctx + .table_metadata_manager + .get_dropped_table(&self.table) + .await? + && dropped_table.table_id != self.table_id + { + return error::TableNameTombstoneConflictSnafu { + table_name: table_ref.to_string(), + existing_table_id: dropped_table.table_id, + dropping_table_id: self.table_id, + } + .fail(); + } + Ok(Control::Continue(())) } @@ -211,7 +228,17 @@ impl DropTableExecutor { Ok(()) } - /// Drops region on datanode. + /// Drops regions on datanodes. + /// + /// Arguments: + /// - `node_manager`: resolves datanode clients from peers in `region_routes`. + /// - `leader_region_registry`: tracks in-flight leader region operations. + /// - `region_routes`: table region placement; leaders receive drop requests and followers + /// receive close requests. + /// - `fast_path`: forwards to datanode drop requests to skip extra cleanup when safe. + /// - `force`: forwards to datanode drop requests to allow forced region removal. + /// - `partial_drop`: forwards to datanode drop requests for partial table/region drops. + #[allow(clippy::too_many_arguments)] pub async fn on_drop_regions( &self, node_manager: &NodeManagerRef, @@ -245,6 +272,7 @@ impl DropTableExecutor { fast_path, force, partial_drop, + soft_drop: false, })), }; let datanode = datanode.clone(); @@ -318,6 +346,64 @@ impl DropTableExecutor { Ok(()) } + + /// Closes all table regions on datanodes without deleting region files or metadata tombstones. + pub async fn on_close_regions( + &self, + node_manager: &NodeManagerRef, + leader_region_registry: &LeaderRegionRegistryRef, + region_routes: &[RegionRoute], + ) -> Result<()> { + let table_id = self.table_id; + let mut seen_peer_ids = HashSet::new(); + let peers = find_leaders(region_routes) + .into_iter() + .chain(find_followers(region_routes)) + .filter(|peer| seen_peer_ids.insert(peer.id)); + let mut close_region_tasks = Vec::new(); + + for datanode in peers { + let requester = node_manager.datanode(&datanode).await; + let region_ids = find_leader_regions(region_routes, &datanode) + .into_iter() + .chain(find_follower_regions(region_routes, &datanode)) + .map(|region_number| RegionId::new(table_id, region_number)); + + for region_id in region_ids { + debug!("Closing region {region_id} on Datanode {datanode:?}"); + let request = RegionRequest { + header: Some(RegionRequestHeader { + tracing_context: TracingContext::from_current_span().to_w3c(), + ..Default::default() + }), + body: Some(region_request::Body::Close(PbCloseRegionRequest { + region_id: region_id.as_u64(), + })), + }; + + let datanode = datanode.clone(); + let requester = requester.clone(); + close_region_tasks.push(async move { + if let Err(err) = requester.handle(request).await + && err.status_code() != StatusCode::RegionNotFound + { + return Err(add_peer_context_if_needed(datanode)(err)); + } + Ok(()) + }); + } + } + + join_all(close_region_tasks) + .await + .into_iter() + .collect::>>()?; + + let region_ids = operating_leader_regions(region_routes); + leader_region_registry.batch_delete(region_ids.into_iter().map(|(region_id, _)| region_id)); + + Ok(()) + } } #[cfg(test)] diff --git a/src/common/meta/src/ddl/drop_table/metadata.rs b/src/common/meta/src/ddl/drop_table/metadata.rs index c1a8a90d4e1d..6289b6c61abb 100644 --- a/src/common/meta/src/ddl/drop_table/metadata.rs +++ b/src/common/meta/src/ddl/drop_table/metadata.rs @@ -13,12 +13,13 @@ // limitations under the License. use common_catalog::format_full_table_name; -use snafu::OptionExt; +use snafu::{OptionExt, ensure}; use store_api::metric_engine_consts::METRIC_ENGINE_NAME; use crate::ddl::drop_table::DropTableProcedure; -use crate::ddl::utils::extract_region_wal_options; +use crate::ddl::utils::{extract_region_wal_options, is_metric_engine_logical_table}; use crate::error::{self, Result}; +use crate::key::table_route::TableRouteValue; impl DropTableProcedure { /// Fetches the table info and physical table route. @@ -31,18 +32,34 @@ impl DropTableProcedure { .get_physical_table_route(task.table_id) .await?; - if physical_table_id == self.data.table_id() { - let table_info_value = self - .context - .table_metadata_manager - .table_info_manager() - .get(task.table_id) - .await? - .with_context(|| error::TableInfoNotFoundSnafu { - table: format_full_table_name(&task.catalog, &task.schema, &task.table), - })? - .into_inner(); + let table_info_value = self + .context + .table_metadata_manager + .table_info_manager() + .get(task.table_id) + .await? + .with_context(|| error::TableInfoNotFoundSnafu { + table: format_full_table_name(&task.catalog, &task.schema, &task.table), + })? + .into_inner(); + let table_route_value = TableRouteValue::new( + self.data.table_id(), + physical_table_id, + physical_table_route_value.region_routes.clone(), + ); + // TODO(hl): support soft-dropping logical tables. + ensure!( + !(self.context.soft_drop_enabled + && is_metric_engine_logical_table( + &table_info_value.table_info, + &table_route_value + )), + error::UnsupportedSnafu { + operation: "soft-dropping metric logical tables".to_string() + } + ); + if physical_table_id == self.data.table_id() { let engine = table_info_value.table_info.meta.engine; // rollback only if dropping the metric physical table fails self.data.allow_rollback = engine.as_str() == METRIC_ENGINE_NAME; diff --git a/src/common/meta/src/ddl/purge_dropped_table.rs b/src/common/meta/src/ddl/purge_dropped_table.rs new file mode 100644 index 000000000000..c9835b46a490 --- /dev/null +++ b/src/common/meta/src/ddl/purge_dropped_table.rs @@ -0,0 +1,246 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::HashMap; + +use async_trait::async_trait; +use common_procedure::error::{FromJsonSnafu, ToJsonSnafu}; +use common_procedure::{ + Context as ProcedureContext, LockKey, Procedure, Result as ProcedureResult, Status, +}; +use common_wal::options::WalOptions; +use serde::{Deserialize, Serialize}; +use snafu::{ResultExt, ensure}; +use store_api::storage::{RegionNumber, TableId}; +use strum::AsRefStr; +use table::metadata::TableInfo; +use table::table_name::TableName; + +use crate::ddl::DdlContext; +use crate::ddl::drop_table::executor::DropTableExecutor; +use crate::ddl::undrop_table::open_regions; +use crate::ddl::utils::{ + convert_region_routes_to_detecting_regions, is_metric_engine_logical_table, + map_to_procedure_error, +}; +use crate::error::{self, Result}; +use crate::key::table_route::TableRouteValue; +use crate::lock_key::{CatalogLock, SchemaLock, TableLock, TableNameLock}; +use crate::rpc::ddl::PurgeDroppedTableTask; +use crate::rpc::router::RegionRoute; + +pub struct PurgeDroppedTableProcedure { + context: DdlContext, + data: PurgeDroppedTableData, +} + +impl PurgeDroppedTableProcedure { + pub const TYPE_NAME: &'static str = "metasrv-procedure::PurgeDroppedTable"; + + pub fn new(task: PurgeDroppedTableTask, context: DdlContext) -> Self { + Self { + context, + data: PurgeDroppedTableData::new(task), + } + } + + pub fn from_json(json: &str, context: DdlContext) -> ProcedureResult { + let data: PurgeDroppedTableData = serde_json::from_str(json).context(FromJsonSnafu)?; + Ok(Self { context, data }) + } + + async fn on_prepare(&mut self) -> Result { + let dropped_table = if let Some(table_id) = self.data.task.table_id { + self.context + .table_metadata_manager + .get_dropped_table_by_id(table_id) + .await? + } else { + self.context + .table_metadata_manager + .get_dropped_table(&self.data.task.table_name()) + .await? + }; + + let Some(dropped_table) = dropped_table else { + return Ok(Status::done()); + }; + ensure!( + !is_metric_engine_logical_table( + &dropped_table.table_info_value.table_info, + &dropped_table.table_route_value + ), + error::UnsupportedSnafu { + operation: "purging metric logical tables".to_string() + } + ); + self.data.table_id = Some(dropped_table.table_id); + self.data.table_name = Some(dropped_table.table_name); + self.data.table_info = Some(dropped_table.table_info_value.table_info); + self.data.table_route_value = Some(dropped_table.table_route_value); + self.data.region_wal_options = dropped_table.region_wal_options; + self.data.state = PurgeDroppedTableState::OpenRegions; + Ok(Status::executing(true)) + } + + async fn on_open_regions(&mut self) -> Result { + if let Some(region_routes) = self.data.physical_region_routes() { + open_regions( + &self.context, + self.data.table_id(), + self.data.table_name(), + self.data.table_info(), + region_routes, + &self.data.region_wal_options, + ) + .await?; + } + self.data.state = PurgeDroppedTableState::DropRegions; + Ok(Status::executing(true)) + } + + async fn on_drop_regions(&mut self) -> Result { + if let Some(region_routes) = self.data.physical_region_routes() { + self.executor() + .on_drop_regions( + &self.context.node_manager, + &self.context.leader_region_registry, + region_routes, + false, + false, + false, + ) + .await?; + self.context + .deregister_failure_detectors(convert_region_routes_to_detecting_regions( + region_routes, + )) + .await; + } + self.data.state = PurgeDroppedTableState::DeleteTombstone; + Ok(Status::executing(true)) + } + + async fn on_delete_tombstone(&mut self) -> Result { + self.context + .table_metadata_manager + .delete_table_metadata_tombstone( + self.data.table_id(), + self.data.table_name(), + self.data.table_route_value(), + &self.data.region_wal_options, + ) + .await?; + Ok(Status::done()) + } + + fn executor(&self) -> DropTableExecutor { + DropTableExecutor::new(self.data.table_name().clone(), self.data.table_id(), false) + } +} + +#[async_trait] +impl Procedure for PurgeDroppedTableProcedure { + fn type_name(&self) -> &str { + Self::TYPE_NAME + } + + fn recover(&mut self) -> ProcedureResult<()> { + Ok(()) + } + + async fn execute(&mut self, _: &ProcedureContext) -> ProcedureResult { + match self.data.state { + PurgeDroppedTableState::Prepare => self.on_prepare().await, + PurgeDroppedTableState::OpenRegions => self.on_open_regions().await, + PurgeDroppedTableState::DropRegions => self.on_drop_regions().await, + PurgeDroppedTableState::DeleteTombstone => self.on_delete_tombstone().await, + } + .map_err(map_to_procedure_error) + } + + fn dump(&self) -> ProcedureResult { + serde_json::to_string(&self.data).context(ToJsonSnafu) + } + + fn lock_key(&self) -> LockKey { + let table_ref = self.data.task.table_ref(); + let mut keys = vec![ + CatalogLock::Read(table_ref.catalog).into(), + SchemaLock::read(table_ref.catalog, table_ref.schema).into(), + TableNameLock::new(table_ref.catalog, table_ref.schema, table_ref.table).into(), + ]; + if let Some(table_id) = self.data.task.table_id { + keys.push(TableLock::Write(table_id).into()); + } + LockKey::new(keys) + } +} + +#[derive(Debug, Serialize, Deserialize)] +pub struct PurgeDroppedTableData { + state: PurgeDroppedTableState, + task: PurgeDroppedTableTask, + table_id: Option, + table_name: Option, + table_info: Option, + table_route_value: Option, + #[serde(default)] + region_wal_options: HashMap, +} + +impl PurgeDroppedTableData { + fn new(task: PurgeDroppedTableTask) -> Self { + Self { + state: PurgeDroppedTableState::Prepare, + task, + table_id: None, + table_name: None, + table_info: None, + table_route_value: None, + region_wal_options: HashMap::new(), + } + } + + fn table_id(&self) -> TableId { + self.table_id.unwrap() + } + + fn table_name(&self) -> &TableName { + self.table_name.as_ref().unwrap() + } + + fn table_info(&self) -> &TableInfo { + self.table_info.as_ref().unwrap() + } + + fn table_route_value(&self) -> &TableRouteValue { + self.table_route_value.as_ref().unwrap() + } + + fn physical_region_routes(&self) -> Option<&[RegionRoute]> { + match self.table_route_value() { + TableRouteValue::Physical(route) => Some(&route.region_routes), + TableRouteValue::Logical(_) => None, + } + } +} + +#[derive(Debug, Serialize, Deserialize, AsRefStr, PartialEq)] +enum PurgeDroppedTableState { + Prepare, + OpenRegions, + DropRegions, + DeleteTombstone, +} diff --git a/src/common/meta/src/ddl/tests/drop_table.rs b/src/common/meta/src/ddl/tests/drop_table.rs index c518bb36a2f9..0c1a401ecbb5 100644 --- a/src/common/meta/src/ddl/tests/drop_table.rs +++ b/src/common/meta/src/ddl/tests/drop_table.rs @@ -12,10 +12,12 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::assert_matches; use std::collections::{HashMap, HashSet}; use std::sync::Arc; use api::v1::region::{RegionRequest, region_request}; +use async_trait::async_trait; use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; use common_error::ext::ErrorExt; use common_error::status_code::StatusCode; @@ -26,21 +28,26 @@ use common_procedure_test::{ use store_api::region_engine::RegionRole; use store_api::storage::RegionId; use table::metadata::TableId; -use tokio::sync::mpsc; +use tokio::sync::{Mutex, mpsc}; -use crate::ddl::TableMetadata; use crate::ddl::create_logical_tables::CreateLogicalTablesProcedure; +use crate::ddl::create_table::CreateTableProcedure; use crate::ddl::drop_table::{DropTableProcedure, DropTableState}; +use crate::ddl::purge_dropped_table::PurgeDroppedTableProcedure; use crate::ddl::test_util::create_table::test_create_table_task; use crate::ddl::test_util::datanode_handler::{DatanodeWatcher, NaiveDatanodeHandler}; use crate::ddl::test_util::{ create_logical_table, create_physical_table, create_physical_table_metadata, put_datanode_address, test_create_logical_table_task, test_create_physical_table_task, }; +use crate::ddl::undrop_table::UndropTableProcedure; +use crate::ddl::{DetectingRegion, RegionFailureDetectorController, TableMetadata}; +use crate::error::Error; +use crate::key::table_name::TableNameKey; use crate::key::table_route::TableRouteValue; use crate::kv_backend::memory::MemoryKvBackend; use crate::peer::Peer; -use crate::rpc::ddl::DropTableTask; +use crate::rpc::ddl::{DropTableTask, PurgeDroppedTableTask, UndropTableTask}; use crate::rpc::router::{Region, RegionRoute}; use crate::test_util::{MockDatanodeManager, new_ddl_context, new_ddl_context_with_kv_backend}; @@ -231,6 +238,668 @@ async fn test_on_datanode_drop_regions_remaps_addresses_when_retrying() { assert_eq!(peers[1].addr, "new-follower"); } +#[tokio::test] +async fn test_soft_drop_closes_regions_and_keeps_tombstone() { + let (tx, mut rx) = mpsc::channel(8); + let datanode_handler = DatanodeWatcher::new(tx); + let node_manager = Arc::new(MockDatanodeManager::new(datanode_handler)); + let detector_controller = Arc::new(RecordingRegionFailureDetectorController::default()); + let mut ddl_context = new_ddl_context(node_manager); + ddl_context.soft_drop_enabled = true; + ddl_context.region_failure_detector_controller = detector_controller.clone(); + let table_id = 1024; + let table_name = "foo"; + let task = test_create_table_task(table_name, table_id); + ddl_context + .table_metadata_manager + .create_table_metadata( + task.table_info.clone(), + TableRouteValue::physical(vec![ + RegionRoute { + region: Region::new_test(RegionId::new(table_id, 1)), + leader_peer: Some(Peer::empty(1)), + follower_peers: vec![Peer::empty(2)], + leader_state: None, + leader_down_since: None, + write_route_policy: None, + }, + RegionRoute { + region: Region::new_test(RegionId::new(table_id, 2)), + leader_peer: Some(Peer::empty(2)), + follower_peers: vec![Peer::empty(1)], + leader_state: None, + leader_down_since: None, + write_route_policy: None, + }, + ]), + HashMap::new(), + ) + .await + .unwrap(); + + let task = new_drop_table_task(table_name, table_id, false); + let mut procedure = DropTableProcedure::new(task, ddl_context.clone()); + + execute_procedure_until_done(&mut procedure).await; + + assert!(procedure.dropping_regions.is_empty()); + assert_eq!(ddl_context.memory_region_keeper.len(), 0); + + let mut requests = Vec::new(); + for _ in 0..4 { + let (peer, request) = rx.try_recv().unwrap(); + let Some(region_request::Body::Close(req)) = request.body else { + unreachable!(); + }; + requests.push((peer.id, req.region_id)); + } + requests.sort_unstable(); + assert_eq!( + requests, + vec![ + (1, RegionId::new(table_id, 1).as_u64()), + (1, RegionId::new(table_id, 2).as_u64()), + (2, RegionId::new(table_id, 1).as_u64()), + (2, RegionId::new(table_id, 2).as_u64()), + ] + ); + assert!(rx.try_recv().is_err()); + + let table_name = procedure.data.task.table_name(); + let live_table = ddl_context + .table_metadata_manager + .table_name_manager() + .get(TableNameKey::from(&table_name)) + .await + .unwrap(); + assert!(live_table.is_none()); + + let dropped_table = ddl_context + .table_metadata_manager + .get_dropped_table(&table_name) + .await + .unwrap(); + assert!(dropped_table.is_some()); + + assert_eq!( + detector_controller.deregistered().await, + vec![ + (1, RegionId::new(table_id, 1)), + (2, RegionId::new(table_id, 2)) + ] + ); +} + +#[tokio::test] +async fn test_hard_drop_keeps_delete_tombstone_flow() { + let node_manager = Arc::new(MockDatanodeManager::new(NaiveDatanodeHandler)); + let ddl_context = new_ddl_context(node_manager); + let table_id = 1024; + let table_name = "foo"; + let task = test_create_table_task(table_name, table_id); + ddl_context + .table_metadata_manager + .create_table_metadata( + task.table_info.clone(), + TableRouteValue::physical(vec![RegionRoute { + region: Region::new_test(RegionId::new(table_id, 1)), + leader_peer: Some(Peer::empty(1)), + follower_peers: vec![], + leader_state: None, + leader_down_since: None, + write_route_policy: None, + }]), + HashMap::new(), + ) + .await + .unwrap(); + + let task = new_drop_table_task(table_name, table_id, false); + let mut procedure = DropTableProcedure::new(task, ddl_context.clone()); + + execute_procedure_until(&mut procedure, |p| { + p.data.state == DropTableState::DeleteTombstone + }) + .await; + + assert_eq!(procedure.data.state, DropTableState::DeleteTombstone); + + execute_procedure_until_done(&mut procedure).await; + + let dropped_table = ddl_context + .table_metadata_manager + .get_dropped_table(&procedure.data.task.table_name()) + .await + .unwrap(); + assert!(dropped_table.is_none()); +} + +#[tokio::test] +async fn test_create_table_succeeds_while_tombstone_exists() { + let node_manager = Arc::new(MockDatanodeManager::new(NaiveDatanodeHandler)); + let mut ddl_context = new_ddl_context(node_manager); + ddl_context.soft_drop_enabled = true; + let dropped_table_id = 1024; + let table_name = "foo"; + let task = test_create_table_task(table_name, dropped_table_id); + ddl_context + .table_metadata_manager + .create_table_metadata( + task.table_info.clone(), + TableRouteValue::physical(vec![]), + HashMap::new(), + ) + .await + .unwrap(); + + let drop_task = new_drop_table_task(table_name, dropped_table_id, false); + let mut drop_procedure = DropTableProcedure::new(drop_task, ddl_context.clone()); + execute_procedure_until_done(&mut drop_procedure).await; + + let mut create_task = test_create_table_task(table_name, 1025); + create_task.create_table.table_id = None; + create_task.table_info.ident.table_id = 0; + let mut create_procedure = CreateTableProcedure::new(create_task, ddl_context.clone()).unwrap(); + execute_procedure_until_done(&mut create_procedure).await; + + let live_table = ddl_context + .table_metadata_manager + .table_name_manager() + .get(TableNameKey::new( + DEFAULT_CATALOG_NAME, + DEFAULT_SCHEMA_NAME, + table_name, + )) + .await + .unwrap() + .unwrap(); + assert_eq!(live_table.table_id(), create_procedure.table_id()); + + let dropped_table = ddl_context + .table_metadata_manager + .get_dropped_table(&create_procedure.data.task.table_name()) + .await + .unwrap(); + assert_eq!(dropped_table.unwrap().table_id, dropped_table_id); +} + +#[tokio::test] +async fn test_drop_recreated_table_fails_when_previous_tombstone_exists() { + let node_manager = Arc::new(MockDatanodeManager::new(NaiveDatanodeHandler)); + let mut ddl_context = new_ddl_context(node_manager); + ddl_context.soft_drop_enabled = true; + let original_table_id = 1024; + let recreated_table_id = 1025; + let table_name = "foo"; + let task = test_create_table_task(table_name, original_table_id); + ddl_context + .table_metadata_manager + .create_table_metadata( + task.table_info.clone(), + TableRouteValue::physical(vec![]), + HashMap::new(), + ) + .await + .unwrap(); + + let drop_task = new_drop_table_task(table_name, original_table_id, false); + let mut drop_procedure = DropTableProcedure::new(drop_task, ddl_context.clone()); + execute_procedure_until_done(&mut drop_procedure).await; + + ddl_context + .table_metadata_manager + .create_table_metadata( + test_create_table_task(table_name, recreated_table_id).table_info, + TableRouteValue::physical(vec![]), + HashMap::new(), + ) + .await + .unwrap(); + + let mut procedure = DropTableProcedure::new( + new_drop_table_task(table_name, recreated_table_id, false), + ddl_context, + ); + let err = procedure.on_prepare().await.unwrap_err(); + + assert_matches!(err, Error::TableNameTombstoneConflict { .. }); +} + +#[tokio::test] +async fn test_hard_drop_recreated_table_ignores_previous_orphan_tombstone() { + let node_manager = Arc::new(MockDatanodeManager::new(NaiveDatanodeHandler)); + let ddl_context = new_ddl_context(node_manager); + let original_table_id = 1024; + let recreated_table_id = 1025; + let table_name = "foo"; + let task = test_create_table_task(table_name, original_table_id); + ddl_context + .table_metadata_manager + .create_table_metadata( + task.table_info.clone(), + TableRouteValue::physical(vec![]), + HashMap::new(), + ) + .await + .unwrap(); + + let drop_task = new_drop_table_task(table_name, original_table_id, false); + let mut drop_procedure = DropTableProcedure::new(drop_task, ddl_context.clone()); + execute_procedure_until(&mut drop_procedure, |p| { + p.data.state == DropTableState::DeleteTombstone + }) + .await; + + ddl_context + .table_metadata_manager + .create_table_metadata( + test_create_table_task(table_name, recreated_table_id).table_info, + TableRouteValue::physical(vec![]), + HashMap::new(), + ) + .await + .unwrap(); + + let mut procedure = DropTableProcedure::new( + new_drop_table_task(table_name, recreated_table_id, false), + ddl_context, + ); + + procedure.on_prepare().await.unwrap(); +} + +#[tokio::test] +async fn test_undrop_table_restores_metadata_and_reopens_regions() { + let (tx, mut rx) = mpsc::channel(8); + let datanode_handler = DatanodeWatcher::new(tx); + let node_manager = Arc::new(MockDatanodeManager::new(datanode_handler)); + let detector_controller = Arc::new(RecordingRegionFailureDetectorController::default()); + let mut ddl_context = new_ddl_context(node_manager); + ddl_context.soft_drop_enabled = true; + ddl_context.region_failure_detector_controller = detector_controller.clone(); + let table_id = 1024; + let table_name = "foo"; + let task = test_create_table_task(table_name, table_id); + ddl_context + .table_metadata_manager + .create_table_metadata( + task.table_info.clone(), + TableRouteValue::physical(vec![RegionRoute { + region: Region::new_test(RegionId::new(table_id, 1)), + leader_peer: Some(Peer::empty(1)), + follower_peers: vec![Peer::empty(2)], + leader_state: None, + leader_down_since: None, + write_route_policy: None, + }]), + HashMap::new(), + ) + .await + .unwrap(); + + let mut drop_procedure = DropTableProcedure::new( + new_drop_table_task(table_name, table_id, false), + ddl_context.clone(), + ); + execute_procedure_until_done(&mut drop_procedure).await; + detector_controller.clear().await; + + while rx.try_recv().is_ok() {} + + let mut procedure = UndropTableProcedure::new( + new_undrop_table_task(table_name, table_id), + ddl_context.clone(), + ); + execute_procedure_until_done(&mut procedure).await; + + let live_table = ddl_context + .table_metadata_manager + .table_name_manager() + .get(TableNameKey::new( + DEFAULT_CATALOG_NAME, + DEFAULT_SCHEMA_NAME, + table_name, + )) + .await + .unwrap() + .unwrap(); + assert_eq!(live_table.table_id(), table_id); + assert!( + ddl_context + .table_metadata_manager + .get_dropped_table(&drop_procedure.data.task.table_name()) + .await + .unwrap() + .is_none() + ); + + let mut opened_regions = HashSet::new(); + for _ in 0..2 { + let (peer, request) = rx.try_recv().unwrap(); + let Some(region_request::Body::Open(req)) = request.body else { + unreachable!(); + }; + opened_regions.insert((peer.id, req.region_id)); + } + assert_eq!( + opened_regions, + HashSet::from([ + (1, RegionId::new(table_id, 1).as_u64()), + (2, RegionId::new(table_id, 1).as_u64()), + ]) + ); + assert!(rx.try_recv().is_err()); + + assert_eq!( + detector_controller.registered().await, + vec![(1, RegionId::new(table_id, 1))] + ); +} + +#[tokio::test] +async fn test_undrop_logical_table_skips_datanode_open() { + let (tx, mut rx) = mpsc::channel(8); + let datanode_handler = DatanodeWatcher::new(tx); + let node_manager = Arc::new(MockDatanodeManager::new(datanode_handler)); + let mut ddl_context = new_ddl_context(node_manager); + ddl_context.soft_drop_enabled = true; + let physical_table_id = 1024; + let logical_table_id = 1025; + let table_name = "foo"; + ddl_context + .table_metadata_manager + .create_table_metadata( + test_create_table_task("phy", physical_table_id).table_info, + TableRouteValue::physical(vec![RegionRoute { + region: Region::new_test(RegionId::new(physical_table_id, 1)), + leader_peer: Some(Peer::empty(1)), + follower_peers: vec![], + leader_state: None, + leader_down_since: None, + write_route_policy: None, + }]), + HashMap::new(), + ) + .await + .unwrap(); + let task = test_create_table_task(table_name, logical_table_id); + ddl_context + .table_metadata_manager + .create_table_metadata( + task.table_info.clone(), + TableRouteValue::logical(physical_table_id), + HashMap::new(), + ) + .await + .unwrap(); + + let mut drop_procedure = DropTableProcedure::new( + new_drop_table_task(table_name, logical_table_id, false), + ddl_context.clone(), + ); + execute_procedure_until_done(&mut drop_procedure).await; + + while rx.try_recv().is_ok() {} + + let mut procedure = UndropTableProcedure::new( + new_undrop_table_task(table_name, logical_table_id), + ddl_context.clone(), + ); + execute_procedure_until_done(&mut procedure).await; + + let live_table = ddl_context + .table_metadata_manager + .table_name_manager() + .get(TableNameKey::new( + DEFAULT_CATALOG_NAME, + DEFAULT_SCHEMA_NAME, + table_name, + )) + .await + .unwrap() + .unwrap(); + assert_eq!(live_table.table_id(), logical_table_id); + assert!(rx.try_recv().is_err()); +} + +#[tokio::test] +async fn test_soft_drop_metric_logical_table_fails() { + let node_manager = Arc::new(MockDatanodeManager::new(NaiveDatanodeHandler)); + let mut ddl_context = new_ddl_context(node_manager); + ddl_context.soft_drop_enabled = true; + let physical_table_id = create_physical_table(&ddl_context, "phy").await; + let logical_table_id = + create_logical_table(ddl_context.clone(), physical_table_id, "foo").await; + + let mut procedure = DropTableProcedure::new( + new_drop_table_task("foo", logical_table_id, false), + ddl_context, + ); + let err = procedure.on_prepare().await.unwrap_err(); + + assert_eq!(err.status_code(), StatusCode::Unsupported); +} + +#[tokio::test] +async fn test_undrop_metric_logical_table_fails() { + let node_manager = Arc::new(MockDatanodeManager::new(NaiveDatanodeHandler)); + let ddl_context = new_ddl_context(node_manager); + let physical_table_id = create_physical_table(&ddl_context, "phy").await; + let logical_table_id = + create_metric_logical_table_tombstone(&ddl_context, physical_table_id, "foo").await; + + let mut procedure = + UndropTableProcedure::new(new_undrop_table_task("foo", logical_table_id), ddl_context); + let err = procedure.on_prepare().await.unwrap_err(); + + assert_eq!(err.status_code(), StatusCode::Unsupported); +} + +#[tokio::test] +async fn test_purge_metric_logical_table_fails() { + let node_manager = Arc::new(MockDatanodeManager::new(NaiveDatanodeHandler)); + let ddl_context = new_ddl_context(node_manager); + let physical_table_id = create_physical_table(&ddl_context, "phy").await; + let logical_table_id = + create_metric_logical_table_tombstone(&ddl_context, physical_table_id, "foo").await; + + let mut procedure = PurgeDroppedTableProcedure::new( + new_purge_dropped_table_task("foo", Some(logical_table_id)), + ddl_context, + ); + let err = procedure + .execute(&new_test_procedure_context()) + .await + .unwrap_err(); + + assert_eq!(err.status_code(), StatusCode::Unsupported); +} + +#[tokio::test] +async fn test_undrop_table_fails_when_live_name_exists() { + let node_manager = Arc::new(MockDatanodeManager::new(NaiveDatanodeHandler)); + let mut ddl_context = new_ddl_context(node_manager); + ddl_context.soft_drop_enabled = true; + let dropped_table_id = 1024; + let live_table_id = 1025; + let table_name = "foo"; + let task = test_create_table_task(table_name, dropped_table_id); + ddl_context + .table_metadata_manager + .create_table_metadata( + task.table_info.clone(), + TableRouteValue::physical(vec![]), + HashMap::new(), + ) + .await + .unwrap(); + let mut drop_procedure = DropTableProcedure::new( + new_drop_table_task(table_name, dropped_table_id, false), + ddl_context.clone(), + ); + execute_procedure_until_done(&mut drop_procedure).await; + ddl_context + .table_metadata_manager + .create_table_metadata( + test_create_table_task(table_name, live_table_id).table_info, + TableRouteValue::physical(vec![]), + HashMap::new(), + ) + .await + .unwrap(); + + let mut procedure = UndropTableProcedure::new( + new_undrop_table_task(table_name, dropped_table_id), + ddl_context, + ); + let err = procedure.on_prepare().await.unwrap_err(); + + assert_matches!(err, Error::TableAlreadyExists { .. }); +} + +#[tokio::test] +async fn test_purge_dropped_table_drops_regions_and_deletes_tombstone() { + let (tx, mut rx) = mpsc::channel(8); + let datanode_handler = DatanodeWatcher::new(tx); + let node_manager = Arc::new(MockDatanodeManager::new(datanode_handler)); + let detector_controller = Arc::new(RecordingRegionFailureDetectorController::default()); + let mut ddl_context = new_ddl_context(node_manager); + ddl_context.soft_drop_enabled = true; + ddl_context.region_failure_detector_controller = detector_controller.clone(); + let table_id = 1024; + let table_name = "foo"; + let task = test_create_table_task(table_name, table_id); + ddl_context + .table_metadata_manager + .create_table_metadata( + task.table_info.clone(), + TableRouteValue::physical(vec![RegionRoute { + region: Region::new_test(RegionId::new(table_id, 1)), + leader_peer: Some(Peer::empty(1)), + follower_peers: vec![Peer::empty(2)], + leader_state: None, + leader_down_since: None, + write_route_policy: None, + }]), + HashMap::new(), + ) + .await + .unwrap(); + let mut drop_procedure = DropTableProcedure::new( + new_drop_table_task(table_name, table_id, false), + ddl_context.clone(), + ); + execute_procedure_until_done(&mut drop_procedure).await; + while rx.try_recv().is_ok() {} + detector_controller.clear().await; + + let mut procedure = PurgeDroppedTableProcedure::new( + new_purge_dropped_table_task(table_name, Some(table_id)), + ddl_context.clone(), + ); + execute_procedure_until_done(&mut procedure).await; + + let mut requests = Vec::new(); + for _ in 0..4 { + let (peer, request) = rx.try_recv().unwrap(); + requests.push((peer.id, request.body.unwrap())); + } + requests.sort_unstable_by_key(|(peer_id, _)| *peer_id); + assert_matches!(requests[0].1, region_request::Body::Open(_)); + assert_matches!(requests[1].1, region_request::Body::Drop(_)); + assert_matches!(requests[2].1, region_request::Body::Open(_)); + assert_matches!(requests[3].1, region_request::Body::Close(_)); + assert!(rx.try_recv().is_err()); + assert!( + ddl_context + .table_metadata_manager + .get_dropped_table(&drop_procedure.data.task.table_name()) + .await + .unwrap() + .is_none() + ); + + assert_eq!( + detector_controller.deregistered().await, + vec![(1, RegionId::new(table_id, 1))] + ); +} + +#[tokio::test] +async fn test_purge_dropped_table_by_name_selects_tombstone_when_live_table_exists() { + let (tx, mut rx) = mpsc::channel(8); + let datanode_handler = DatanodeWatcher::new(tx); + let node_manager = Arc::new(MockDatanodeManager::new(datanode_handler)); + let mut ddl_context = new_ddl_context(node_manager); + ddl_context.soft_drop_enabled = true; + let dropped_table_id = 1024; + let live_table_id = 1025; + let table_name = "foo"; + ddl_context + .table_metadata_manager + .create_table_metadata( + test_create_table_task(table_name, dropped_table_id).table_info, + TableRouteValue::physical(vec![RegionRoute { + region: Region::new_test(RegionId::new(dropped_table_id, 1)), + leader_peer: Some(Peer::empty(1)), + follower_peers: vec![], + leader_state: None, + leader_down_since: None, + write_route_policy: None, + }]), + HashMap::new(), + ) + .await + .unwrap(); + let mut drop_procedure = DropTableProcedure::new( + new_drop_table_task(table_name, dropped_table_id, false), + ddl_context.clone(), + ); + execute_procedure_until_done(&mut drop_procedure).await; + ddl_context + .table_metadata_manager + .create_table_metadata( + test_create_table_task(table_name, live_table_id).table_info, + TableRouteValue::physical(vec![]), + HashMap::new(), + ) + .await + .unwrap(); + while rx.try_recv().is_ok() {} + + let mut procedure = PurgeDroppedTableProcedure::new( + new_purge_dropped_table_task(table_name, None), + ddl_context.clone(), + ); + execute_procedure_until_done(&mut procedure).await; + + let live_table = ddl_context + .table_metadata_manager + .table_name_manager() + .get(TableNameKey::new( + DEFAULT_CATALOG_NAME, + DEFAULT_SCHEMA_NAME, + table_name, + )) + .await + .unwrap() + .unwrap(); + assert_eq!(live_table.table_id(), live_table_id); + + let (_, request) = rx.try_recv().unwrap(); + let Some(region_request::Body::Open(req)) = request.body else { + unreachable!(); + }; + assert_eq!(req.region_id, RegionId::new(dropped_table_id, 1).as_u64()); + + let (_, request) = rx.try_recv().unwrap(); + let Some(region_request::Body::Drop(req)) = request.body else { + unreachable!(); + }; + assert_eq!(req.region_id, RegionId::new(dropped_table_id, 1).as_u64()); + assert!(rx.try_recv().is_err()); +} + #[tokio::test] async fn test_on_rollback() { let node_manager = Arc::new(MockDatanodeManager::new(NaiveDatanodeHandler)); @@ -302,6 +971,81 @@ fn new_drop_table_task(table_name: &str, table_id: TableId, drop_if_exists: bool } } +fn new_undrop_table_task(table_name: &str, table_id: TableId) -> UndropTableTask { + UndropTableTask { + catalog: DEFAULT_CATALOG_NAME.to_string(), + schema: DEFAULT_SCHEMA_NAME.to_string(), + table: table_name.to_string(), + table_id, + } +} + +fn new_purge_dropped_table_task( + table_name: &str, + table_id: Option, +) -> PurgeDroppedTableTask { + PurgeDroppedTableTask { + catalog: DEFAULT_CATALOG_NAME.to_string(), + schema: DEFAULT_SCHEMA_NAME.to_string(), + table: table_name.to_string(), + table_id, + } +} + +async fn create_metric_logical_table_tombstone( + ddl_context: &crate::ddl::DdlContext, + physical_table_id: TableId, + table_name: &str, +) -> TableId { + let logical_table_id = + create_logical_table(ddl_context.clone(), physical_table_id, table_name).await; + let mut task = test_create_logical_table_task(table_name); + task.set_table_id(logical_table_id); + ddl_context + .table_metadata_manager + .delete_table_metadata( + logical_table_id, + &task.table_name(), + &TableRouteValue::logical(physical_table_id), + &HashMap::new(), + ) + .await + .unwrap(); + logical_table_id +} + +#[derive(Default)] +struct RecordingRegionFailureDetectorController { + registered: Mutex>, + deregistered: Mutex>, +} + +impl RecordingRegionFailureDetectorController { + async fn registered(&self) -> Vec { + self.registered.lock().await.clone() + } + + async fn deregistered(&self) -> Vec { + self.deregistered.lock().await.clone() + } + + async fn clear(&self) { + self.registered.lock().await.clear(); + self.deregistered.lock().await.clear(); + } +} + +#[async_trait] +impl RegionFailureDetectorController for RecordingRegionFailureDetectorController { + async fn register_failure_detectors(&self, detecting_regions: Vec) { + self.registered.lock().await.extend(detecting_regions); + } + + async fn deregister_failure_detectors(&self, detecting_regions: Vec) { + self.deregistered.lock().await.extend(detecting_regions); + } +} + #[tokio::test] async fn test_memory_region_keeper_guard_dropped_on_procedure_done() { let node_manager = Arc::new(MockDatanodeManager::new(NaiveDatanodeHandler)); diff --git a/src/common/meta/src/ddl/undrop_table.rs b/src/common/meta/src/ddl/undrop_table.rs new file mode 100644 index 000000000000..dcdb0b8ef2d6 --- /dev/null +++ b/src/common/meta/src/ddl/undrop_table.rs @@ -0,0 +1,322 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::{HashMap, HashSet}; + +use api::v1::region::{ + OpenRequest as PbOpenRegionRequest, RegionRequest, RegionRequestHeader, region_request, +}; +use async_trait::async_trait; +use common_procedure::error::{FromJsonSnafu, ToJsonSnafu}; +use common_procedure::{ + Context as ProcedureContext, LockKey, Procedure, Result as ProcedureResult, Status, +}; +use common_telemetry::tracing_context::TracingContext; +use common_wal::options::WalOptions; +use futures::future::join_all; +use serde::{Deserialize, Serialize}; +use snafu::{OptionExt, ResultExt, ensure}; +use store_api::storage::{RegionId, RegionNumber}; +use strum::AsRefStr; +use table::metadata::TableId; +use table::table_name::TableName; +use table::table_reference::TableReference; + +use crate::ddl::utils::{ + add_peer_context_if_needed, convert_region_routes_to_detecting_regions, + is_metric_engine_logical_table, map_to_procedure_error, region_storage_path, +}; +use crate::ddl::{CreateRequestBuilder, DdlContext, build_template_from_raw_table_info}; +use crate::error::{self, Result}; +use crate::instruction::CacheIdent; +use crate::key::table_name::TableNameKey; +use crate::key::table_route::TableRouteValue; +use crate::lock_key::{CatalogLock, SchemaLock, TableLock, TableNameLock}; +use crate::rpc::ddl::UndropTableTask; +use crate::rpc::router::{ + RegionRoute, find_follower_regions, find_followers, find_leader_regions, find_leaders, +}; + +pub struct UndropTableProcedure { + context: DdlContext, + data: UndropTableData, +} + +impl UndropTableProcedure { + pub const TYPE_NAME: &'static str = "metasrv-procedure::UndropTable"; + + pub fn new(task: UndropTableTask, context: DdlContext) -> Self { + Self { + context, + data: UndropTableData::new(task), + } + } + + pub fn from_json(json: &str, context: DdlContext) -> ProcedureResult { + let data: UndropTableData = serde_json::from_str(json).context(FromJsonSnafu)?; + Ok(Self { context, data }) + } + + pub(crate) async fn on_prepare(&mut self) -> Result { + let table_ref = self.data.table_ref(); + ensure!( + !self + .context + .table_metadata_manager + .table_name_manager() + .exists(TableNameKey::new( + table_ref.catalog, + table_ref.schema, + table_ref.table + )) + .await?, + error::TableAlreadyExistsSnafu { + table_name: table_ref.to_string() + } + ); + + let dropped_table = self + .context + .table_metadata_manager + .get_dropped_table_by_id(self.data.task.table_id) + .await? + .with_context(|| error::TableNotFoundSnafu { + table_name: table_ref.to_string(), + })?; + self.data.table_name = Some(dropped_table.table_name.clone()); + self.data.table_route_value = Some(dropped_table.table_route_value.clone()); + self.data.region_wal_options = dropped_table.region_wal_options; + ensure!( + !is_metric_engine_logical_table( + &dropped_table.table_info_value.table_info, + self.data.table_route_value() + ), + error::UnsupportedSnafu { + operation: "undropping metric logical tables".to_string() + } + ); + self.data.table_info = Some(dropped_table.table_info_value.table_info); + self.data.state = UndropTableState::RestoreMetadata; + Ok(Status::executing(true)) + } + + async fn on_restore_metadata(&mut self) -> Result { + let table_route_value = self.data.table_route_value(); + self.context + .table_metadata_manager + .restore_table_metadata( + self.data.task.table_id, + self.data.table_name(), + table_route_value, + &self.data.region_wal_options, + ) + .await?; + self.data.state = UndropTableState::OpenRegions; + Ok(Status::executing(true)) + } + + async fn on_open_regions(&mut self) -> Result { + let TableRouteValue::Physical(route) = self.data.table_route_value() else { + self.data.state = UndropTableState::InvalidateTableCache; + return Ok(Status::executing(true)); + }; + + open_regions( + &self.context, + self.data.task.table_id, + self.data.table_name(), + self.data.table_info(), + &route.region_routes, + &self.data.region_wal_options, + ) + .await?; + self.data.state = UndropTableState::InvalidateTableCache; + Ok(Status::executing(true)) + } + + async fn on_broadcast(&mut self) -> Result { + let ctx = crate::cache_invalidator::Context { + subject: Some(format!( + "Invalidate table cache by undropping table {}, table_id: {}", + self.data.table_name().table_ref(), + self.data.task.table_id, + )), + }; + self.context + .cache_invalidator + .invalidate( + &ctx, + &[ + CacheIdent::TableName(self.data.table_name().table_ref().into()), + CacheIdent::TableId(self.data.task.table_id), + ], + ) + .await?; + Ok(Status::done()) + } +} + +#[async_trait] +impl Procedure for UndropTableProcedure { + fn type_name(&self) -> &str { + Self::TYPE_NAME + } + + fn recover(&mut self) -> ProcedureResult<()> { + Ok(()) + } + + async fn execute(&mut self, _: &ProcedureContext) -> ProcedureResult { + match self.data.state { + UndropTableState::Prepare => self.on_prepare().await, + UndropTableState::RestoreMetadata => self.on_restore_metadata().await, + UndropTableState::OpenRegions => self.on_open_regions().await, + UndropTableState::InvalidateTableCache => self.on_broadcast().await, + } + .map_err(map_to_procedure_error) + } + + fn dump(&self) -> ProcedureResult { + serde_json::to_string(&self.data).context(ToJsonSnafu) + } + + fn lock_key(&self) -> LockKey { + let table_ref = self.data.table_ref(); + LockKey::new(vec![ + CatalogLock::Read(table_ref.catalog).into(), + SchemaLock::read(table_ref.catalog, table_ref.schema).into(), + TableNameLock::new(table_ref.catalog, table_ref.schema, table_ref.table).into(), + TableLock::Write(self.data.task.table_id).into(), + ]) + } +} + +pub(crate) async fn open_regions( + context: &DdlContext, + table_id: TableId, + table_name: &TableName, + table_info: &table::metadata::TableInfo, + region_routes: &[RegionRoute], + region_wal_options: &HashMap, +) -> Result<()> { + let template = build_template_from_raw_table_info(table_info)?; + let builder = CreateRequestBuilder::new(template, None); + let storage_path = region_storage_path(&table_name.catalog_name, &table_name.schema_name); + let wal_options = region_wal_options + .iter() + .map(|(region_number, wal_options)| { + serde_json::to_string(wal_options) + .map(|wal_options| (*region_number, wal_options)) + .context(error::SerdeJsonSnafu) + }) + .collect::>>()?; + let mut seen_peer_ids = HashSet::new(); + let peers = find_leaders(region_routes) + .into_iter() + .chain(find_followers(region_routes)) + .filter(|peer| seen_peer_ids.insert(peer.id)); + let mut tasks = Vec::new(); + for datanode in peers { + let requester = context.node_manager.datanode(&datanode).await; + let region_numbers = find_leader_regions(region_routes, &datanode) + .into_iter() + .chain(find_follower_regions(region_routes, &datanode)); + for region_number in region_numbers { + let region_id = RegionId::new(table_id, region_number); + let create_request = builder.build_one( + region_id, + storage_path.clone(), + &wal_options, + &HashMap::new(), + ); + let request = RegionRequest { + header: Some(RegionRequestHeader { + tracing_context: TracingContext::from_current_span().to_w3c(), + ..Default::default() + }), + body: Some(region_request::Body::Open(PbOpenRegionRequest { + region_id: create_request.region_id, + engine: create_request.engine, + path: create_request.path, + options: create_request.options, + })), + }; + let datanode = datanode.clone(); + let requester = requester.clone(); + tasks.push(async move { + requester + .handle(request) + .await + .map_err(add_peer_context_if_needed(datanode)) + }); + } + } + + join_all(tasks) + .await + .into_iter() + .collect::>>()?; + context + .register_failure_detectors(convert_region_routes_to_detecting_regions(region_routes)) + .await; + Ok(()) +} + +#[derive(Debug, Serialize, Deserialize)] +pub struct UndropTableData { + state: UndropTableState, + task: UndropTableTask, + table_name: Option, + table_info: Option, + table_route_value: Option, + #[serde(default)] + region_wal_options: HashMap, +} + +impl UndropTableData { + fn new(task: UndropTableTask) -> Self { + Self { + state: UndropTableState::Prepare, + task, + table_name: None, + table_info: None, + table_route_value: None, + region_wal_options: HashMap::new(), + } + } + + fn table_ref(&self) -> TableReference<'_> { + self.task.table_ref() + } + + fn table_name(&self) -> &TableName { + self.table_name.as_ref().unwrap() + } + + fn table_info(&self) -> &table::metadata::TableInfo { + self.table_info.as_ref().unwrap() + } + + fn table_route_value(&self) -> &TableRouteValue { + self.table_route_value.as_ref().unwrap() + } +} + +#[derive(Debug, Serialize, Deserialize, AsRefStr, PartialEq)] +enum UndropTableState { + Prepare, + RestoreMetadata, + OpenRegions, + InvalidateTableCache, +} diff --git a/src/common/meta/src/ddl/utils.rs b/src/common/meta/src/ddl/utils.rs index d88e402adfbc..1dc146634cf7 100644 --- a/src/common/meta/src/ddl/utils.rs +++ b/src/common/meta/src/ddl/utils.rs @@ -39,7 +39,7 @@ use store_api::metadata::ColumnMetadata; use store_api::metric_engine_consts::{LOGICAL_TABLE_METADATA_KEY, MANIFEST_INFO_EXTENSION_KEY}; use store_api::region_engine::RegionManifestInfo; use store_api::storage::{RegionId, RegionNumber}; -use table::metadata::TableId; +use table::metadata::{TableId, TableInfo}; use table::table_reference::TableReference; use crate::ddl::{DdlContext, DetectingRegion}; @@ -68,6 +68,14 @@ pub fn add_peer_context_if_needed(datanode: Peer) -> impl FnOnce(Error) -> Error } } +pub(crate) fn is_metric_engine_logical_table( + table_info: &TableInfo, + table_route_value: &TableRouteValue, +) -> bool { + table_info.meta.engine == METRIC_ENGINE + && matches!(table_route_value, TableRouteValue::Logical(_)) +} + /// Maps the error to the corresponding procedure error. /// /// This function determines whether the error should be retried and if poison cleanup is needed, diff --git a/src/common/meta/src/ddl_manager.rs b/src/common/meta/src/ddl_manager.rs index d0619ca74fbd..82f4435412f6 100644 --- a/src/common/meta/src/ddl_manager.rs +++ b/src/common/meta/src/ddl_manager.rs @@ -42,7 +42,9 @@ use crate::ddl::drop_database::DropDatabaseProcedure; use crate::ddl::drop_flow::DropFlowProcedure; use crate::ddl::drop_table::DropTableProcedure; use crate::ddl::drop_view::DropViewProcedure; +use crate::ddl::purge_dropped_table::PurgeDroppedTableProcedure; use crate::ddl::truncate_table::TruncateTableProcedure; +use crate::ddl::undrop_table::UndropTableProcedure; use crate::ddl::{DdlContext, utils}; use crate::error::{ self, CreateRepartitionProcedureSnafu, EmptyDdlTasksSnafu, ProcedureOutputSnafu, @@ -61,7 +63,7 @@ use crate::rpc::ddl::DdlTask::DropTrigger; use crate::rpc::ddl::DdlTask::{ AlterDatabase, AlterLogicalTables, AlterTable, CommentOn, CreateDatabase, CreateFlow, CreateLogicalTables, CreateTable, CreateView, DropDatabase, DropFlow, DropLogicalTables, - DropTable, DropView, TruncateTable, + DropTable, DropView, PurgeDroppedTable, TruncateTable, UndropTable, }; #[cfg(feature = "enterprise")] use crate::rpc::ddl::trigger::CreateTriggerTask; @@ -70,7 +72,8 @@ use crate::rpc::ddl::trigger::DropTriggerTask; use crate::rpc::ddl::{ AlterDatabaseTask, AlterTableTask, CommentOnTask, CreateDatabaseTask, CreateFlowTask, CreateTableTask, CreateViewTask, DropDatabaseTask, DropFlowTask, DropTableTask, DropViewTask, - QueryContext, SubmitDdlTaskRequest, SubmitDdlTaskResponse, TruncateTableTask, + PurgeDroppedTableTask, QueryContext, SubmitDdlTaskRequest, SubmitDdlTaskResponse, + TruncateTableTask, UndropTableTask, }; /// A configurator that customizes or enhances a [`DdlManager`]. @@ -236,6 +239,8 @@ impl DdlManager { AlterLogicalTablesProcedure, AlterDatabaseProcedure, DropTableProcedure, + UndropTableProcedure, + PurgeDroppedTableProcedure, DropFlowProcedure, TruncateTableProcedure, CreateDatabaseProcedure, @@ -427,6 +432,32 @@ impl DdlManager { self.execute_procedure_and_wait(procedure_with_id).await } + /// Submits and executes an undrop table task. + #[tracing::instrument(skip_all)] + pub async fn submit_undrop_table_task( + &self, + undrop_table_task: UndropTableTask, + ) -> Result<(ProcedureId, Option)> { + let context = self.create_context(); + let procedure = UndropTableProcedure::new(undrop_table_task, context); + let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure)); + + self.execute_procedure_and_wait(procedure_with_id).await + } + + /// Submits and executes a purge dropped table task. + #[tracing::instrument(skip_all)] + pub async fn submit_purge_dropped_table_task( + &self, + purge_dropped_table_task: PurgeDroppedTableTask, + ) -> Result<(ProcedureId, Option)> { + let context = self.create_context(); + let procedure = PurgeDroppedTableProcedure::new(purge_dropped_table_task, context); + let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure)); + + self.execute_procedure_and_wait(procedure_with_id).await + } + /// Submits and executes a create database task. #[tracing::instrument(skip_all)] pub async fn submit_create_database( @@ -596,6 +627,12 @@ impl DdlManager { handle_create_table_task(self, create_table_task).await } DropTable(drop_table_task) => handle_drop_table_task(self, drop_table_task).await, + UndropTable(undrop_table_task) => { + handle_undrop_table_task(self, undrop_table_task).await + } + PurgeDroppedTable(purge_dropped_table_task) => { + handle_purge_dropped_table_task(self, purge_dropped_table_task).await + } AlterTable(alter_table_task) => { handle_alter_table_task(self, alter_table_task, ddl_options).await } @@ -743,6 +780,39 @@ async fn handle_drop_table_task( }) } +async fn handle_undrop_table_task( + ddl_manager: &DdlManager, + undrop_table_task: UndropTableTask, +) -> Result { + let table_id = undrop_table_task.table_id; + let (id, _) = ddl_manager + .submit_undrop_table_task(undrop_table_task) + .await?; + + info!("Table: {table_id} is undropped via procedure_id {id:?}"); + + Ok(SubmitDdlTaskResponse { + key: id.to_string().into(), + ..Default::default() + }) +} + +async fn handle_purge_dropped_table_task( + ddl_manager: &DdlManager, + purge_dropped_table_task: PurgeDroppedTableTask, +) -> Result { + let (id, _) = ddl_manager + .submit_purge_dropped_table_task(purge_dropped_table_task) + .await?; + + info!("Dropped table is purged via procedure_id {id:?}"); + + Ok(SubmitDdlTaskResponse { + key: id.to_string().into(), + ..Default::default() + }) +} + async fn handle_create_table_task( ddl_manager: &DdlManager, create_table_task: CreateTableTask, @@ -1190,6 +1260,7 @@ mod tests { memory_region_keeper: Arc::new(MemoryRegionKeeper::default()), leader_region_registry: Arc::new(LeaderRegionRegistry::default()), region_failure_detector_controller: Arc::new(NoopRegionFailureDetectorControl), + soft_drop_enabled: false, }, procedure_manager.clone(), Arc::new(DummyRepartitionProcedureFactory), diff --git a/src/common/meta/src/error.rs b/src/common/meta/src/error.rs index 05b5af393bcd..123fef0353ae 100644 --- a/src/common/meta/src/error.rs +++ b/src/common/meta/src/error.rs @@ -363,6 +363,20 @@ pub enum Error { location: Location, }, + #[snafu(display( + "Cannot drop table '{}': tombstoned table id {} already uses the same full name", + table_name, + existing_table_id + ))] + /// Raised when a live table is recreated with a name still reserved by an older tombstone. + TableNameTombstoneConflict { + table_name: String, + existing_table_id: TableId, + dropping_table_id: TableId, + #[snafu(implicit)] + location: Location, + }, + #[snafu(display("View already exists, view: {}", view_name))] ViewAlreadyExists { view_name: String, @@ -1218,7 +1232,9 @@ impl ErrorExt for Error { ViewNotFound { .. } | TableNotFound { .. } | RegionNotFound { .. } => { StatusCode::TableNotFound } - ViewAlreadyExists { .. } | TableAlreadyExists { .. } => StatusCode::TableAlreadyExists, + ViewAlreadyExists { .. } + | TableAlreadyExists { .. } + | TableNameTombstoneConflict { .. } => StatusCode::TableAlreadyExists, SubmitProcedure { source, .. } | QueryProcedure { source, .. } diff --git a/src/common/meta/src/key.rs b/src/common/meta/src/key.rs index 3fd5648093d4..01c249ffb031 100644 --- a/src/common/meta/src/key.rs +++ b/src/common/meta/src/key.rs @@ -1154,7 +1154,10 @@ impl TableMetadataManager { table_route_value: &TableRouteValue, ) -> Result> { let mut region_wal_options = HashMap::new(); - let datanode_table_keys = region_distribution(table_route_value.region_routes()?) + let Some(region_routes) = table_route_value.region_routes().ok() else { + return Ok(region_wal_options); + }; + let datanode_table_keys = region_distribution(region_routes) .into_keys() .map(|datanode_id| DatanodeTableKey::new(datanode_id, table_id)) .collect::>(); @@ -1639,7 +1642,7 @@ mod tests { use common_wal::options::{KafkaWalOptions, WalOptions}; use futures::TryStreamExt; use store_api::storage::{RegionId, RegionNumber}; - use table::metadata::TableInfo; + use table::metadata::{TableId, TableInfo}; use table::table_name::TableName; use super::datanode_table::DatanodeTableKey; @@ -1647,7 +1650,7 @@ mod tests { use crate::ddl::allocator::wal_options::WalOptionsAllocator; use crate::ddl::test_util::create_table::test_create_table_task; use crate::ddl::utils::region_storage_path; - use crate::error::Result; + use crate::error::{Error, Result}; use crate::key::datanode_table::RegionInfo; use crate::key::node_address::{NodeAddressKey, NodeAddressValue}; use crate::key::table_info::TableInfoValue; @@ -1655,8 +1658,8 @@ mod tests { use crate::key::table_route::TableRouteValue; use crate::key::topic_region::TopicRegionKey; use crate::key::{ - DeserializedValueWithBytes, MetadataValue, RegionDistribution, RegionRoleSet, - TOPIC_REGION_PREFIX, TableMetadataManager, ViewInfoValue, + DeserializedValueWithBytes, DroppedTableMetadata, MetadataValue, RegionDistribution, + RegionRoleSet, TOPIC_REGION_PREFIX, TableMetadataManager, ViewInfoValue, }; use crate::kv_backend::KvBackend; use crate::kv_backend::memory::MemoryKvBackend; @@ -1785,6 +1788,100 @@ mod tests { ]) } + fn test_physical_region_route( + table_id: TableId, + region_number: RegionNumber, + leader_peer: u64, + follower_peers: Vec, + ) -> RegionRoute { + RegionRoute { + region: Region::new_test(RegionId::new(table_id, region_number)), + leader_peer: Some(Peer::empty(leader_peer)), + follower_peers: follower_peers.into_iter().map(Peer::empty).collect(), + leader_state: None, + leader_down_since: None, + write_route_policy: None, + } + } + + async fn create_dropped_physical_table_metadata( + table_id: TableId, + table_name: &str, + region_routes: Vec, + region_wal_options: HashMap, + ) -> ( + Arc>, + TableMetadataManager, + TableName, + TableInfo, + Vec, + HashMap, + ) { + let mem_kv = Arc::new(MemoryKvBackend::default()); + let table_metadata_manager = TableMetadataManager::new(mem_kv.clone()); + let task = test_create_table_task(table_name, table_id); + let table_info = task.table_info.clone(); + let serialized_options = region_wal_options + .iter() + .map(|(k, v)| (*k, serde_json::to_string(v).unwrap())) + .collect::>(); + table_metadata_manager + .create_table_metadata( + table_info.clone(), + TableRouteValue::physical(region_routes), + serialized_options, + ) + .await + .unwrap(); + + let table_route_value = table_metadata_manager + .table_route_manager + .table_route_storage() + .get_with_raw_bytes(table_id) + .await + .unwrap() + .unwrap(); + let region_routes = table_route_value.region_routes().unwrap().clone(); + let table_name = TableName::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, table_name); + let table_route_value = TableRouteValue::physical(region_routes.clone()); + table_metadata_manager + .delete_table_metadata( + table_id, + &table_name, + &table_route_value, + ®ion_wal_options, + ) + .await + .unwrap(); + + ( + mem_kv, + table_metadata_manager, + table_name, + table_info, + region_routes, + region_wal_options, + ) + } + + fn assert_dropped_table_metadata( + dropped_table: &DroppedTableMetadata, + table_id: TableId, + table_name: &TableName, + table_info: &TableInfo, + region_routes: &[RegionRoute], + region_wal_options: &HashMap, + ) { + assert_eq!(dropped_table.table_id, table_id); + assert_eq!(&dropped_table.table_name, table_name); + assert_eq!(&dropped_table.table_info_value.table_info, table_info); + assert_eq!( + dropped_table.table_route_value.region_routes().unwrap(), + region_routes + ); + assert_eq!(&dropped_table.region_wal_options, region_wal_options); + } + #[tokio::test] async fn test_raft_engine_topic_region_map() { let mem_kv = Arc::new(MemoryKvBackend::default()); @@ -2856,65 +2953,20 @@ mod tests { #[tokio::test] async fn test_dropped_table_metadata_enumeration_and_lookup() { - let mem_kv = Arc::new(MemoryKvBackend::default()); - let table_metadata_manager = TableMetadataManager::new(mem_kv.clone()); let table_id = 1025; let table_name = "foo"; - let task = test_create_table_task(table_name, table_id); - let table_info = task.table_info.clone(); - let options = create_mixed_region_wal_options(); - let serialized_options = options - .iter() - .map(|(k, v)| (*k, serde_json::to_string(v).unwrap())) - .collect::>(); - table_metadata_manager - .create_table_metadata( - table_info.clone(), - TableRouteValue::physical(vec![ - RegionRoute { - region: Region::new_test(RegionId::new(table_id, 1)), - leader_peer: Some(Peer::empty(1)), - follower_peers: vec![Peer::empty(5)], - leader_state: None, - leader_down_since: None, - write_route_policy: None, - }, - RegionRoute { - region: Region::new_test(RegionId::new(table_id, 2)), - leader_peer: Some(Peer::empty(2)), - follower_peers: vec![Peer::empty(4)], - leader_state: None, - leader_down_since: None, - write_route_policy: None, - }, - RegionRoute { - region: Region::new_test(RegionId::new(table_id, 3)), - leader_peer: Some(Peer::empty(3)), - follower_peers: vec![], - leader_state: None, - leader_down_since: None, - write_route_policy: None, - }, - ]), - serialized_options, + let (_, table_metadata_manager, table_name, table_info, region_routes, options) = + create_dropped_physical_table_metadata( + table_id, + table_name, + vec![ + test_physical_region_route(table_id, 1, 1, vec![5]), + test_physical_region_route(table_id, 2, 2, vec![4]), + test_physical_region_route(table_id, 3, 3, vec![]), + ], + create_mixed_region_wal_options(), ) - .await - .unwrap(); - let table_route_value = table_metadata_manager - .table_route_manager - .table_route_storage() - .get_with_raw_bytes(table_id) - .await - .unwrap() - .unwrap(); - let region_routes = table_route_value.region_routes().unwrap(); - let table_name = TableName::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, table_name); - let table_route_value = TableRouteValue::physical(region_routes.clone()); - - table_metadata_manager - .delete_table_metadata(table_id, &table_name, &table_route_value, &options) - .await - .unwrap(); + .await; let dropped_tables = table_metadata_manager.list_dropped_tables().await.unwrap(); assert_eq!(dropped_tables.len(), 1); @@ -2926,107 +2978,64 @@ mod tests { .await .unwrap() .unwrap(); - assert_eq!(dropped_table.table_id, table_id); - assert_eq!(dropped_table.table_name, table_name); - assert_eq!(dropped_table.table_info_value.table_info, table_info); - assert_eq!( - dropped_table.table_route_value.region_routes().unwrap(), - region_routes + assert_dropped_table_metadata( + &dropped_table, + table_id, + &table_name, + &table_info, + ®ion_routes, + &options, ); - assert_eq!(dropped_table.region_wal_options, options); let dropped_table_by_id = table_metadata_manager .get_dropped_table_by_id(table_id) .await .unwrap() .unwrap(); - assert_eq!(dropped_table_by_id.table_id, table_id); - assert_eq!(dropped_table_by_id.table_name, table_name); - assert_eq!(dropped_table_by_id.table_info_value.table_info, table_info); - assert_eq!( - dropped_table_by_id - .table_route_value - .region_routes() - .unwrap(), - region_routes + assert_dropped_table_metadata( + &dropped_table_by_id, + table_id, + &table_name, + &table_info, + ®ion_routes, + &options, ); - assert_eq!(dropped_table_by_id.region_wal_options, options); } #[tokio::test] async fn test_dropped_table_lookup_survives_live_name_recreation() { - let mem_kv = Arc::new(MemoryKvBackend::default()); - let table_metadata_manager = TableMetadataManager::new(mem_kv.clone()); let dropped_table_id = 1025; let recreated_table_id = 1026; let table_name = "foo"; - let dropped_task = test_create_table_task(table_name, dropped_table_id); - let dropped_table_info = dropped_task.table_info.clone(); - let options = create_mock_region_wal_options(); - let serialized_options = options - .iter() - .map(|(k, v)| (*k, serde_json::to_string(v).unwrap())) - .collect::>(); - table_metadata_manager - .create_table_metadata( - dropped_table_info.clone(), - TableRouteValue::physical(vec![ - RegionRoute { - region: Region::new_test(RegionId::new(dropped_table_id, 1)), - leader_peer: Some(Peer::empty(1)), - follower_peers: vec![Peer::empty(5)], - leader_state: None, - leader_down_since: None, - write_route_policy: None, - }, - RegionRoute { - region: Region::new_test(RegionId::new(dropped_table_id, 2)), - leader_peer: Some(Peer::empty(2)), - follower_peers: vec![Peer::empty(4)], - leader_state: None, - leader_down_since: None, - write_route_policy: None, - }, - ]), - serialized_options.clone(), - ) - .await - .unwrap(); - - let dropped_table_name = - TableName::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, table_name); - let dropped_table_route = table_metadata_manager - .table_route_manager - .table_route_storage() - .get_with_raw_bytes(dropped_table_id) - .await - .unwrap() - .unwrap(); - let dropped_table_route = - TableRouteValue::physical(dropped_table_route.region_routes().unwrap().clone()); - table_metadata_manager - .delete_table_metadata( - dropped_table_id, - &dropped_table_name, - &dropped_table_route, - &options, - ) - .await - .unwrap(); + let ( + _, + table_metadata_manager, + dropped_table_name, + dropped_table_info, + region_routes, + options, + ) = create_dropped_physical_table_metadata( + dropped_table_id, + table_name, + vec![ + test_physical_region_route(dropped_table_id, 1, 1, vec![5]), + test_physical_region_route(dropped_table_id, 2, 2, vec![4]), + ], + create_mock_region_wal_options(), + ) + .await; let recreated_task = test_create_table_task(table_name, recreated_table_id); table_metadata_manager .create_table_metadata( recreated_task.table_info, - TableRouteValue::physical(vec![RegionRoute { - region: Region::new_test(RegionId::new(recreated_table_id, 1)), - leader_peer: Some(Peer::empty(4)), - follower_peers: vec![], - leader_state: None, - leader_down_since: None, - write_route_policy: None, - }]), - serialized_options, + TableRouteValue::physical(vec![test_physical_region_route( + recreated_table_id, + 1, + 4, + vec![], + )]), + HashMap::new(), ) .await .unwrap(); @@ -3047,11 +3056,13 @@ mod tests { .await .unwrap() .unwrap(); - assert_eq!(dropped_table.table_id, dropped_table_id); - assert_eq!(dropped_table.table_name, dropped_table_name); - assert_eq!( - dropped_table.table_info_value.table_info, - dropped_table_info + assert_dropped_table_metadata( + &dropped_table, + dropped_table_id, + &dropped_table_name, + &dropped_table_info, + ®ion_routes, + &options, ); let dropped_tables = table_metadata_manager.list_dropped_tables().await.unwrap(); @@ -3062,58 +3073,19 @@ mod tests { #[tokio::test] async fn test_dropped_table_lookup_ignores_unrelated_malformed_datanode_tombstones() { - let mem_kv = Arc::new(MemoryKvBackend::default()); - let table_metadata_manager = TableMetadataManager::new(mem_kv.clone()); let table_id = 1025; let table_name = "foo"; - let task = test_create_table_task(table_name, table_id); - let table_info = task.table_info.clone(); - let options = create_mixed_region_wal_options(); - let serialized_options = options - .iter() - .map(|(k, v)| (*k, serde_json::to_string(v).unwrap())) - .collect::>(); - table_metadata_manager - .create_table_metadata( - table_info.clone(), - TableRouteValue::physical(vec![ - RegionRoute { - region: Region::new_test(RegionId::new(table_id, 1)), - leader_peer: Some(Peer::empty(1)), - follower_peers: vec![Peer::empty(5)], - leader_state: None, - leader_down_since: None, - write_route_policy: None, - }, - RegionRoute { - region: Region::new_test(RegionId::new(table_id, 2)), - leader_peer: Some(Peer::empty(2)), - follower_peers: vec![Peer::empty(4)], - leader_state: None, - leader_down_since: None, - write_route_policy: None, - }, - ]), - serialized_options, + let (mem_kv, table_metadata_manager, table_name, table_info, region_routes, options) = + create_dropped_physical_table_metadata( + table_id, + table_name, + vec![ + test_physical_region_route(table_id, 1, 1, vec![5]), + test_physical_region_route(table_id, 2, 2, vec![4]), + ], + create_mixed_region_wal_options(), ) - .await - .unwrap(); - - let table_route_value = table_metadata_manager - .table_route_manager - .table_route_storage() - .get_with_raw_bytes(table_id) - .await - .unwrap() - .unwrap(); - let region_routes = table_route_value.region_routes().unwrap(); - let table_name = TableName::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, table_name); - let table_route_value = TableRouteValue::physical(region_routes.clone()); - - table_metadata_manager - .delete_table_metadata(table_id, &table_name, &table_route_value, &options) - .await - .unwrap(); + .await; mem_kv .put( @@ -3129,10 +3101,14 @@ mod tests { .await .unwrap() .unwrap(); - assert_eq!(dropped_table.table_id, table_id); - assert_eq!(dropped_table.table_name, table_name); - assert_eq!(dropped_table.table_info_value.table_info, table_info); - assert_eq!(dropped_table.region_wal_options, options); + assert_dropped_table_metadata( + &dropped_table, + table_id, + &table_name, + &table_info, + ®ion_routes, + &options, + ); } #[tokio::test] diff --git a/src/common/meta/src/rpc/ddl.rs b/src/common/meta/src/rpc/ddl.rs index ed6f78154a00..de3772db2cae 100644 --- a/src/common/meta/src/rpc/ddl.rs +++ b/src/common/meta/src/rpc/ddl.rs @@ -31,7 +31,8 @@ use api::v1::meta::{ DdlTaskResponse as PbDdlTaskResponse, DropDatabaseTask as PbDropDatabaseTask, DropFlowTask as PbDropFlowTask, DropTableTask as PbDropTableTask, DropTableTasks as PbDropTableTasks, DropViewTask as PbDropViewTask, Partition, ProcedureId, - TruncateTableTask as PbTruncateTableTask, + PurgeDroppedTableTask as PbPurgeDroppedTableTask, TruncateTableTask as PbTruncateTableTask, + UndropTableTask as PbUndropTableTask, }; use api::v1::{ AlterDatabaseExpr, AlterTableExpr, CommentObjectType as PbCommentObjectType, CommentOnExpr, @@ -63,6 +64,8 @@ use crate::key::FlowId; pub enum DdlTask { CreateTable(CreateTableTask), DropTable(DropTableTask), + UndropTable(UndropTableTask), + PurgeDroppedTable(PurgeDroppedTableTask), AlterTable(AlterTableTask), TruncateTable(TruncateTableTask), CreateLogicalTables(Vec), @@ -144,6 +147,36 @@ impl DdlTask { }) } + /// Creates a [`DdlTask`] to undrop a table. + pub fn new_undrop_table( + catalog: String, + schema: String, + table: String, + table_id: TableId, + ) -> Self { + DdlTask::UndropTable(UndropTableTask { + catalog, + schema, + table, + table_id, + }) + } + + /// Creates a [`DdlTask`] to purge a dropped table. + pub fn new_purge_dropped_table( + catalog: String, + schema: String, + table: String, + table_id: Option, + ) -> Self { + DdlTask::PurgeDroppedTable(PurgeDroppedTableTask { + catalog, + schema, + table, + table_id, + }) + } + /// Creates a [`DdlTask`] to create a database. pub fn new_create_database( catalog: String, @@ -217,6 +250,12 @@ impl TryFrom for DdlTask { Ok(DdlTask::CreateTable(create_table.try_into()?)) } Task::DropTableTask(drop_table) => Ok(DdlTask::DropTable(drop_table.try_into()?)), + Task::UndropTableTask(undrop_table) => { + Ok(DdlTask::UndropTable(undrop_table.try_into()?)) + } + Task::PurgeDroppedTableTask(purge_dropped_table) => { + Ok(DdlTask::PurgeDroppedTable(purge_dropped_table.into())) + } Task::AlterTableTask(alter_table) => Ok(DdlTask::AlterTable(alter_table.try_into()?)), Task::TruncateTableTask(truncate_table) => { Ok(DdlTask::TruncateTable(truncate_table.try_into()?)) @@ -327,6 +366,8 @@ impl TryFrom for PbDdlTaskRequest { let task = match request.task { DdlTask::CreateTable(task) => Task::CreateTableTask(task.try_into()?), DdlTask::DropTable(task) => Task::DropTableTask(task.into()), + DdlTask::UndropTable(task) => Task::UndropTableTask(task.into()), + DdlTask::PurgeDroppedTable(task) => Task::PurgeDroppedTableTask(task.into()), DdlTask::AlterTable(task) => Task::AlterTableTask(task.try_into()?), DdlTask::TruncateTable(task) => Task::TruncateTableTask(task.try_into()?), DdlTask::CreateLogicalTables(tasks) => { @@ -588,6 +629,109 @@ pub struct DropTableTask { pub drop_if_exists: bool, } +#[derive(Debug, PartialEq, Serialize, Deserialize, Clone)] +pub struct UndropTableTask { + pub catalog: String, + pub schema: String, + pub table: String, + pub table_id: TableId, +} + +impl UndropTableTask { + pub fn table_ref(&self) -> TableReference<'_> { + TableReference { + catalog: &self.catalog, + schema: &self.schema, + table: &self.table, + } + } + + pub fn table_name(&self) -> TableName { + TableName { + catalog_name: self.catalog.clone(), + schema_name: self.schema.clone(), + table_name: self.table.clone(), + } + } +} + +#[derive(Debug, PartialEq, Serialize, Deserialize, Clone)] +pub struct PurgeDroppedTableTask { + pub catalog: String, + pub schema: String, + pub table: String, + pub table_id: Option, +} + +impl PurgeDroppedTableTask { + pub fn table_ref(&self) -> TableReference<'_> { + TableReference { + catalog: &self.catalog, + schema: &self.schema, + table: &self.table, + } + } + + pub fn table_name(&self) -> TableName { + TableName { + catalog_name: self.catalog.clone(), + schema_name: self.schema.clone(), + table_name: self.table.clone(), + } + } +} + +impl TryFrom for UndropTableTask { + type Error = error::Error; + + fn try_from(pb: PbUndropTableTask) -> Result { + Ok(Self { + catalog: pb.catalog_name, + schema: pb.schema_name, + table: pb.table_name, + table_id: pb + .table_id + .context(error::InvalidProtoMsgSnafu { + err_msg: "expected table_id", + })? + .id, + }) + } +} + +impl From for PbUndropTableTask { + fn from(task: UndropTableTask) -> Self { + Self { + catalog_name: task.catalog, + schema_name: task.schema, + table_name: task.table, + table_id: Some(api::v1::TableId { id: task.table_id }), + } + } +} + +impl From for PurgeDroppedTableTask { + fn from(pb: PbPurgeDroppedTableTask) -> Self { + Self { + catalog: pb.catalog_name, + schema: pb.schema_name, + table: pb.table_name, + table_id: pb.table_id.map(|table_id| table_id.id), + } + } +} + +impl From for PbPurgeDroppedTableTask { + fn from(task: PurgeDroppedTableTask) -> Self { + Self { + catalog_name: task.catalog, + schema_name: task.schema, + table_name: task.table, + table_id: task.table_id.map(|id| api::v1::TableId { id }), + } + } +} + impl DropTableTask { pub fn table_ref(&self) -> TableReference<'_> { TableReference { @@ -1663,6 +1807,46 @@ mod tests { assert_eq!(task, de); } + #[test] + fn test_undrop_table_task_pb_roundtrip() { + let expected = UndropTableTask { + catalog: "greptime".to_string(), + schema: "public".to_string(), + table: "foo".to_string(), + table_id: 1024, + }; + let request = SubmitDdlTaskRequest::new( + QueryContext::default(), + DdlTask::UndropTable(expected.clone()), + ); + + let pb = PbDdlTaskRequest::try_from(request).unwrap(); + let pb_task = pb.task.unwrap(); + let de = DdlTask::try_from(pb_task).unwrap(); + + assert!(matches!(de, DdlTask::UndropTable(task) if task == expected)); + } + + #[test] + fn test_purge_dropped_table_task_pb_roundtrip() { + let expected = PurgeDroppedTableTask { + catalog: "greptime".to_string(), + schema: "public".to_string(), + table: "foo".to_string(), + table_id: Some(1024), + }; + let request = SubmitDdlTaskRequest::new( + QueryContext::default(), + DdlTask::PurgeDroppedTable(expected.clone()), + ); + + let pb = PbDdlTaskRequest::try_from(request).unwrap(); + let pb_task = pb.task.unwrap(); + let de = DdlTask::try_from(pb_task).unwrap(); + + assert!(matches!(de, DdlTask::PurgeDroppedTable(task) if task == expected)); + } + #[test] fn test_sort_columns() { // construct RawSchema diff --git a/src/common/meta/src/test_util.rs b/src/common/meta/src/test_util.rs index 3396870ab66a..2300679b3377 100644 --- a/src/common/meta/src/test_util.rs +++ b/src/common/meta/src/test_util.rs @@ -206,6 +206,7 @@ pub fn new_ddl_context_with_kv_backend( flow_metadata_allocator, flow_metadata_manager, region_failure_detector_controller: Arc::new(NoopRegionFailureDetectorControl), + soft_drop_enabled: false, } } diff --git a/src/meta-srv/src/metasrv/builder.rs b/src/meta-srv/src/metasrv/builder.rs index 54a94fffbf52..71cc076f7436 100644 --- a/src/meta-srv/src/metasrv/builder.rs +++ b/src/meta-srv/src/metasrv/builder.rs @@ -401,6 +401,7 @@ impl MetasrvBuilder { flow_metadata_manager: flow_metadata_manager.clone(), flow_metadata_allocator: flow_metadata_allocator.clone(), region_failure_detector_controller, + soft_drop_enabled: ddl_soft_drop_enabled(&options), }; let procedure_manager_c = procedure_manager.clone(); let repartition_procedure_factory = Arc::new(DefaultRepartitionProcedureFactory::new( @@ -646,6 +647,13 @@ fn build_procedure_manager( )) } +/// Resolves if soft-drop is enabled from metasrv options. +fn ddl_soft_drop_enabled(_options: &MetasrvOptions) -> bool { + // TODO(hl): add a dedicated soft-drop cluster config + // when wiring the user-facing option. + false +} + impl Default for MetasrvBuilder { fn default() -> Self { Self::new() diff --git a/src/meta-srv/src/procedure/utils.rs b/src/meta-srv/src/procedure/utils.rs index 0ac48cfc4f74..a9e06bc9c30d 100644 --- a/src/meta-srv/src/procedure/utils.rs +++ b/src/meta-srv/src/procedure/utils.rs @@ -493,6 +493,7 @@ pub mod test_data { memory_region_keeper: Arc::new(MemoryRegionKeeper::new()), leader_region_registry: Arc::new(LeaderRegionRegistry::default()), region_failure_detector_controller: Arc::new(NoopRegionFailureDetectorControl), + soft_drop_enabled: false, } } } diff --git a/src/mito2/src/engine/open_test.rs b/src/mito2/src/engine/open_test.rs index 28ad1de71e17..db891f8cd369 100644 --- a/src/mito2/src/engine/open_test.rs +++ b/src/mito2/src/engine/open_test.rs @@ -134,13 +134,31 @@ async fn test_engine_reopen_region_with_format(flat_format: bool) { let region_id = RegionId::new(1, 1); let request = CreateRequestBuilder::new().build(); let table_dir = request.table_dir.clone(); + let column_schemas = rows_schema(&request); engine .handle_request(region_id, RegionRequest::Create(request)) .await .unwrap(); + put_rows( + &engine, + region_id, + Rows { + schema: column_schemas, + rows: build_rows(0, 2), + }, + ) + .await; reopen_region(&engine, region_id, table_dir, false, Default::default()).await; assert!(engine.is_region_exists(region_id)); + + let scanner = engine + .scanner(region_id, ScanRequest::default()) + .await + .unwrap(); + let stream = scanner.scan().await.unwrap(); + let batches = RecordBatches::try_collect(stream).await.unwrap(); + assert_eq!(2, batches.iter().map(|b| b.num_rows()).sum::()); } #[tokio::test] diff --git a/tests-integration/src/standalone.rs b/tests-integration/src/standalone.rs index acdd0e60b576..03ad8b108105 100644 --- a/tests-integration/src/standalone.rs +++ b/tests-integration/src/standalone.rs @@ -242,6 +242,7 @@ impl GreptimeDbStandaloneBuilder { flow_metadata_manager, flow_metadata_allocator, region_failure_detector_controller: Arc::new(NoopRegionFailureDetectorControl), + soft_drop_enabled: false, }, procedure_manager.clone(), repartition_procedure_factory,