Skip to content
Open
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ fs2 = "0.4"
fst = "0.4.7"
futures = "0.3"
futures-util = "0.3"
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "0de5437582920c8b30d6c34212f161db71d95c50" }
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "305abc0fd1b73c0f20310ebc700c6a9654dfe7d3" }
hex = "0.4"
http = "1"
humantime = "2.1"
Expand Down
1 change: 1 addition & 0 deletions src/cmd/src/standalone.rs
Original file line number Diff line number Diff line change
Expand Up @@ -531,6 +531,7 @@ impl StartCommand {
flow_metadata_manager: flow_metadata_manager.clone(),
flow_metadata_allocator: flow_metadata_allocator.clone(),
region_failure_detector_controller: Arc::new(NoopRegionFailureDetectorControl),
soft_drop_enabled: false,
};

let ddl_manager = DdlManager::try_new(
Expand Down
9 changes: 8 additions & 1 deletion src/common/meta/src/ddl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,12 +44,14 @@ pub mod drop_flow;
pub mod drop_table;
pub mod drop_view;
pub mod flow_meta;
pub mod purge_dropped_table;
pub mod table_meta;
#[cfg(any(test, feature = "testing"))]
pub mod test_util;
#[cfg(test)]
pub(crate) mod tests;
pub mod truncate_table;
pub mod undrop_table;
pub mod utils;

/// Metadata allocated to a table.
Expand Down Expand Up @@ -112,6 +114,8 @@ pub struct DdlContext {
pub flow_metadata_allocator: FlowMetadataAllocatorRef,
/// controller of region failure detector.
pub region_failure_detector_controller: RegionFailureDetectorControllerRef,
/// Whether table drops should stop after tombstoning metadata.
pub soft_drop_enabled: bool,
}

impl DdlContext {
Expand All @@ -129,7 +133,10 @@ impl DdlContext {
///
/// Once the regions were dropped, subsequent heartbeats no longer include these regions.
/// Therefore, we should remove the failure detectors for these dropped regions.
async fn deregister_failure_detectors(&self, detecting_regions: Vec<DetectingRegion>) {
pub(crate) async fn deregister_failure_detectors(
&self,
detecting_regions: Vec<DetectingRegion>,
) {
self.region_failure_detector_controller
.deregister_failure_detectors(detecting_regions)
.await;
Expand Down
21 changes: 20 additions & 1 deletion src/common/meta/src/ddl/drop_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ use table::table_reference::TableReference;

use self::executor::DropTableExecutor;
use crate::ddl::DdlContext;
use crate::ddl::utils::map_to_procedure_error;
use crate::ddl::utils::{convert_region_routes_to_detecting_regions, map_to_procedure_error};
use crate::error::{self, Result};
use crate::key::table_route::TableRouteValue;
use crate::lock_key::{CatalogLock, SchemaLock, TableLock};
Expand Down Expand Up @@ -149,6 +149,7 @@ impl DropTableProcedure {
/// Broadcasts invalidate table cache instruction.
async fn on_broadcast(&mut self) -> Result<Status> {
self.executor.invalidate_table_cache(&self.context).await?;

self.data.state = DropTableState::DatanodeDropRegions;

Ok(Status::executing(true))
Expand All @@ -172,6 +173,23 @@ impl DropTableProcedure {
.await?;
}

if self.context.soft_drop_enabled {
self.executor
.on_close_regions(
&self.context.node_manager,
&self.context.leader_region_registry,
&self.data.physical_region_routes,
)
.await?;
self.context
.deregister_failure_detectors(convert_region_routes_to_detecting_regions(
&self.data.physical_region_routes,
))
.await;
self.dropping_regions.clear();
return Ok(Status::done());
}

self.executor
.on_drop_regions(
&self.context.node_manager,
Expand All @@ -182,6 +200,7 @@ impl DropTableProcedure {
false,
)
.await?;

self.data.state = DropTableState::DeleteTombstone;
Ok(Status::executing(true))
}
Expand Down
89 changes: 87 additions & 2 deletions src/common/meta/src/ddl/drop_table/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::HashMap;
use std::collections::{HashMap, HashSet};

use api::v1::region::{
CloseRequest as PbCloseRegionRequest, DropRequest as PbDropRegionRequest, RegionRequest,
Expand Down Expand Up @@ -82,6 +82,8 @@ impl DropTableExecutor {
/// Checks whether table exists.
/// - Early returns if table not exists and `drop_if_exists` is `true`.
/// - Throws an error if table not exists and `drop_if_exists` is `false`.
/// - Rejects dropping a recreated live table while an older tombstone still owns the same
/// fully qualified name.
pub async fn on_prepare(&self, ctx: &DdlContext) -> Result<Control<()>> {
let table_ref = self.table.table_ref();

Expand All @@ -106,6 +108,21 @@ impl DropTableExecutor {
}
);

if ctx.soft_drop_enabled
&& let Some(dropped_table) = ctx
.table_metadata_manager
.get_dropped_table(&self.table)
.await?
&& dropped_table.table_id != self.table_id
{
return error::TableNameTombstoneConflictSnafu {
table_name: table_ref.to_string(),
existing_table_id: dropped_table.table_id,
dropping_table_id: self.table_id,
}
.fail();
}

Ok(Control::Continue(()))
}

Expand Down Expand Up @@ -211,7 +228,17 @@ impl DropTableExecutor {
Ok(())
}

/// Drops region on datanode.
/// Drops regions on datanodes.
///
/// Arguments:
/// - `node_manager`: resolves datanode clients from peers in `region_routes`.
/// - `leader_region_registry`: tracks in-flight leader region operations.
/// - `region_routes`: table region placement; leaders receive drop requests and followers
/// receive close requests.
/// - `fast_path`: forwards to datanode drop requests to skip extra cleanup when safe.
/// - `force`: forwards to datanode drop requests to allow forced region removal.
/// - `partial_drop`: forwards to datanode drop requests for partial table/region drops.
#[allow(clippy::too_many_arguments)]
pub async fn on_drop_regions(
&self,
node_manager: &NodeManagerRef,
Expand Down Expand Up @@ -318,6 +345,64 @@ impl DropTableExecutor {

Ok(())
}

/// Closes all table regions on datanodes without deleting region files or metadata tombstones.
pub async fn on_close_regions(
&self,
node_manager: &NodeManagerRef,
leader_region_registry: &LeaderRegionRegistryRef,
region_routes: &[RegionRoute],
) -> Result<()> {
let table_id = self.table_id;
let mut seen_peer_ids = HashSet::new();
let peers = find_leaders(region_routes)
.into_iter()
.chain(find_followers(region_routes))
.filter(|peer| seen_peer_ids.insert(peer.id));
let mut close_region_tasks = Vec::new();

for datanode in peers {
let requester = node_manager.datanode(&datanode).await;
let region_ids = find_leader_regions(region_routes, &datanode)
.into_iter()
.chain(find_follower_regions(region_routes, &datanode))
.map(|region_number| RegionId::new(table_id, region_number));

for region_id in region_ids {
debug!("Closing region {region_id} on Datanode {datanode:?}");
let request = RegionRequest {
header: Some(RegionRequestHeader {
tracing_context: TracingContext::from_current_span().to_w3c(),
..Default::default()
}),
body: Some(region_request::Body::Close(PbCloseRegionRequest {
region_id: region_id.as_u64(),
})),
};

let datanode = datanode.clone();
let requester = requester.clone();
close_region_tasks.push(async move {
if let Err(err) = requester.handle(request).await
&& err.status_code() != StatusCode::RegionNotFound
{
return Err(add_peer_context_if_needed(datanode)(err));
}
Ok(())
});
}
}

join_all(close_region_tasks)
.await
.into_iter()
.collect::<Result<Vec<_>>>()?;

let region_ids = operating_leader_regions(region_routes);
leader_region_registry.batch_delete(region_ids.into_iter().map(|(region_id, _)| region_id));

Ok(())
}
}

#[cfg(test)]
Expand Down
43 changes: 30 additions & 13 deletions src/common/meta/src/ddl/drop_table/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,13 @@
// limitations under the License.

use common_catalog::format_full_table_name;
use snafu::OptionExt;
use snafu::{OptionExt, ensure};
use store_api::metric_engine_consts::METRIC_ENGINE_NAME;

use crate::ddl::drop_table::DropTableProcedure;
use crate::ddl::utils::extract_region_wal_options;
use crate::ddl::utils::{extract_region_wal_options, is_metric_engine_logical_table};
use crate::error::{self, Result};
use crate::key::table_route::TableRouteValue;

impl DropTableProcedure {
/// Fetches the table info and physical table route.
Expand All @@ -31,18 +32,34 @@ impl DropTableProcedure {
.get_physical_table_route(task.table_id)
.await?;

if physical_table_id == self.data.table_id() {
let table_info_value = self
.context
.table_metadata_manager
.table_info_manager()
.get(task.table_id)
.await?
.with_context(|| error::TableInfoNotFoundSnafu {
table: format_full_table_name(&task.catalog, &task.schema, &task.table),
})?
.into_inner();
let table_info_value = self
.context
.table_metadata_manager
.table_info_manager()
.get(task.table_id)
.await?
.with_context(|| error::TableInfoNotFoundSnafu {
table: format_full_table_name(&task.catalog, &task.schema, &task.table),
})?
.into_inner();
let table_route_value = TableRouteValue::new(
self.data.table_id(),
physical_table_id,
physical_table_route_value.region_routes.clone(),
);
// TODO(hl): support soft-dropping logical tables.
ensure!(
!(self.context.soft_drop_enabled
&& is_metric_engine_logical_table(
&table_info_value.table_info,
&table_route_value
)),
error::UnsupportedSnafu {
operation: "soft-dropping metric logical tables".to_string()
}
);

if physical_table_id == self.data.table_id() {
let engine = table_info_value.table_info.meta.engine;
// rollback only if dropping the metric physical table fails
self.data.allow_rollback = engine.as_str() == METRIC_ENGINE_NAME;
Expand Down
Loading
Loading