Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 7 additions & 7 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ fs2 = "0.4"
fst = "0.4.7"
futures = "0.3"
futures-util = "0.3"
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "0de5437582920c8b30d6c34212f161db71d95c50" }
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "0bf04b8124fe9d750a4303114989d4563818c666" }
hex = "0.4"
http = "1"
humantime = "2.1"
Expand Down
1 change: 1 addition & 0 deletions src/cmd/src/standalone.rs
Original file line number Diff line number Diff line change
Expand Up @@ -531,6 +531,7 @@ impl StartCommand {
flow_metadata_manager: flow_metadata_manager.clone(),
flow_metadata_allocator: flow_metadata_allocator.clone(),
region_failure_detector_controller: Arc::new(NoopRegionFailureDetectorControl),
soft_drop_enabled: false,
};

let ddl_manager = DdlManager::try_new(
Expand Down
9 changes: 8 additions & 1 deletion src/common/meta/src/ddl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,12 +44,14 @@ pub mod drop_flow;
pub mod drop_table;
pub mod drop_view;
pub mod flow_meta;
pub mod purge_dropped_table;
pub mod table_meta;
#[cfg(any(test, feature = "testing"))]
pub mod test_util;
#[cfg(test)]
pub(crate) mod tests;
pub mod truncate_table;
pub mod undrop_table;
pub mod utils;

/// Metadata allocated to a table.
Expand Down Expand Up @@ -112,6 +114,8 @@ pub struct DdlContext {
pub flow_metadata_allocator: FlowMetadataAllocatorRef,
/// controller of region failure detector.
pub region_failure_detector_controller: RegionFailureDetectorControllerRef,
/// Whether table drops should stop after tombstoning metadata.
pub soft_drop_enabled: bool,
}

impl DdlContext {
Expand All @@ -129,7 +133,10 @@ impl DdlContext {
///
/// Once the regions were dropped, subsequent heartbeats no longer include these regions.
/// Therefore, we should remove the failure detectors for these dropped regions.
async fn deregister_failure_detectors(&self, detecting_regions: Vec<DetectingRegion>) {
pub(crate) async fn deregister_failure_detectors(
&self,
detecting_regions: Vec<DetectingRegion>,
) {
self.region_failure_detector_controller
.deregister_failure_detectors(detecting_regions)
.await;
Expand Down
21 changes: 20 additions & 1 deletion src/common/meta/src/ddl/drop_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ use table::table_reference::TableReference;

use self::executor::DropTableExecutor;
use crate::ddl::DdlContext;
use crate::ddl::utils::map_to_procedure_error;
use crate::ddl::utils::{convert_region_routes_to_detecting_regions, map_to_procedure_error};
use crate::error::{self, Result};
use crate::key::table_route::TableRouteValue;
use crate::lock_key::{CatalogLock, SchemaLock, TableLock};
Expand Down Expand Up @@ -149,6 +149,7 @@ impl DropTableProcedure {
/// Broadcasts invalidate table cache instruction.
async fn on_broadcast(&mut self) -> Result<Status> {
self.executor.invalidate_table_cache(&self.context).await?;

self.data.state = DropTableState::DatanodeDropRegions;

Ok(Status::executing(true))
Expand All @@ -172,6 +173,23 @@ impl DropTableProcedure {
.await?;
}

if self.context.soft_drop_enabled {
self.executor
.on_close_regions(
&self.context.node_manager,
&self.context.leader_region_registry,
&self.data.physical_region_routes,
)
.await?;
self.context
.deregister_failure_detectors(convert_region_routes_to_detecting_regions(
&self.data.physical_region_routes,
))
.await;
self.dropping_regions.clear();
return Ok(Status::done());
}

self.executor
.on_drop_regions(
&self.context.node_manager,
Expand All @@ -182,6 +200,7 @@ impl DropTableProcedure {
false,
)
.await?;

self.data.state = DropTableState::DeleteTombstone;
Ok(Status::executing(true))
}
Expand Down
90 changes: 88 additions & 2 deletions src/common/meta/src/ddl/drop_table/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::HashMap;
use std::collections::{HashMap, HashSet};

use api::v1::region::{
CloseRequest as PbCloseRegionRequest, DropRequest as PbDropRegionRequest, RegionRequest,
Expand Down Expand Up @@ -82,6 +82,8 @@ impl DropTableExecutor {
/// Checks whether table exists.
/// - Early returns if table not exists and `drop_if_exists` is `true`.
/// - Throws an error if table not exists and `drop_if_exists` is `false`.
/// - Rejects dropping a recreated live table while an older tombstone still owns the same
/// fully qualified name.
pub async fn on_prepare(&self, ctx: &DdlContext) -> Result<Control<()>> {
let table_ref = self.table.table_ref();

Expand All @@ -106,6 +108,21 @@ impl DropTableExecutor {
}
);

if ctx.soft_drop_enabled
&& let Some(dropped_table) = ctx
.table_metadata_manager
.get_dropped_table(&self.table)
.await?
&& dropped_table.table_id != self.table_id
{
return error::TableNameTombstoneConflictSnafu {
table_name: table_ref.to_string(),
existing_table_id: dropped_table.table_id,
dropping_table_id: self.table_id,
}
.fail();
}

Ok(Control::Continue(()))
}

Expand Down Expand Up @@ -211,7 +228,17 @@ impl DropTableExecutor {
Ok(())
}

/// Drops region on datanode.
/// Drops regions on datanodes.
///
/// Arguments:
/// - `node_manager`: resolves datanode clients from peers in `region_routes`.
/// - `leader_region_registry`: tracks in-flight leader region operations.
/// - `region_routes`: table region placement; leaders receive drop requests and followers
/// receive close requests.
/// - `fast_path`: forwards to datanode drop requests to skip extra cleanup when safe.
/// - `force`: forwards to datanode drop requests to allow forced region removal.
/// - `partial_drop`: forwards to datanode drop requests for partial table/region drops.
#[allow(clippy::too_many_arguments)]
pub async fn on_drop_regions(
&self,
node_manager: &NodeManagerRef,
Expand Down Expand Up @@ -245,6 +272,7 @@ impl DropTableExecutor {
fast_path,
force,
partial_drop,
soft_drop: false,
})),
};
let datanode = datanode.clone();
Expand Down Expand Up @@ -318,6 +346,64 @@ impl DropTableExecutor {

Ok(())
}

/// Closes all table regions on datanodes without deleting region files or metadata tombstones.
pub async fn on_close_regions(
&self,
node_manager: &NodeManagerRef,
leader_region_registry: &LeaderRegionRegistryRef,
region_routes: &[RegionRoute],
) -> Result<()> {
let table_id = self.table_id;
let mut seen_peer_ids = HashSet::new();
let peers = find_leaders(region_routes)
.into_iter()
.chain(find_followers(region_routes))
.filter(|peer| seen_peer_ids.insert(peer.id));
let mut close_region_tasks = Vec::new();

for datanode in peers {
let requester = node_manager.datanode(&datanode).await;
let region_ids = find_leader_regions(region_routes, &datanode)
.into_iter()
.chain(find_follower_regions(region_routes, &datanode))
.map(|region_number| RegionId::new(table_id, region_number));

for region_id in region_ids {
debug!("Closing region {region_id} on Datanode {datanode:?}");
let request = RegionRequest {
header: Some(RegionRequestHeader {
tracing_context: TracingContext::from_current_span().to_w3c(),
..Default::default()
}),
body: Some(region_request::Body::Close(PbCloseRegionRequest {
region_id: region_id.as_u64(),
})),
};

let datanode = datanode.clone();
let requester = requester.clone();
close_region_tasks.push(async move {
if let Err(err) = requester.handle(request).await
&& err.status_code() != StatusCode::RegionNotFound
{
return Err(add_peer_context_if_needed(datanode)(err));
}
Ok(())
});
}
}

join_all(close_region_tasks)
.await
.into_iter()
.collect::<Result<Vec<_>>>()?;

let region_ids = operating_leader_regions(region_routes);
leader_region_registry.batch_delete(region_ids.into_iter().map(|(region_id, _)| region_id));

Ok(())
}
}

#[cfg(test)]
Expand Down
43 changes: 30 additions & 13 deletions src/common/meta/src/ddl/drop_table/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,13 @@
// limitations under the License.

use common_catalog::format_full_table_name;
use snafu::OptionExt;
use snafu::{OptionExt, ensure};
use store_api::metric_engine_consts::METRIC_ENGINE_NAME;

use crate::ddl::drop_table::DropTableProcedure;
use crate::ddl::utils::extract_region_wal_options;
use crate::ddl::utils::{extract_region_wal_options, is_metric_engine_logical_table};
use crate::error::{self, Result};
use crate::key::table_route::TableRouteValue;

impl DropTableProcedure {
/// Fetches the table info and physical table route.
Expand All @@ -31,18 +32,34 @@ impl DropTableProcedure {
.get_physical_table_route(task.table_id)
.await?;

if physical_table_id == self.data.table_id() {
let table_info_value = self
.context
.table_metadata_manager
.table_info_manager()
.get(task.table_id)
.await?
.with_context(|| error::TableInfoNotFoundSnafu {
table: format_full_table_name(&task.catalog, &task.schema, &task.table),
})?
.into_inner();
let table_info_value = self
.context
.table_metadata_manager
.table_info_manager()
.get(task.table_id)
.await?
.with_context(|| error::TableInfoNotFoundSnafu {
table: format_full_table_name(&task.catalog, &task.schema, &task.table),
})?
.into_inner();
let table_route_value = TableRouteValue::new(
self.data.table_id(),
physical_table_id,
physical_table_route_value.region_routes.clone(),
);
// TODO(hl): support soft-dropping logical tables.
ensure!(
!(self.context.soft_drop_enabled
&& is_metric_engine_logical_table(
&table_info_value.table_info,
&table_route_value
)),
error::UnsupportedSnafu {
operation: "soft-dropping metric logical tables".to_string()
}
);

if physical_table_id == self.data.table_id() {
let engine = table_info_value.table_info.meta.engine;
// rollback only if dropping the metric physical table fails
self.data.allow_rollback = engine.as_str() == METRIC_ENGINE_NAME;
Expand Down
Loading
Loading