Skip to content
Draft
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
132 changes: 116 additions & 16 deletions src/cmd/src/standalone.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ pub struct Instance {
flownode: FlownodeInstance,
procedure_manager: ProcedureManagerRef,
wal_provider: WalProviderRef,
leader_services_controller: Box<dyn StandaloneLeaderServicesController>,
// Keep the logging guard to prevent the worker from being dropped.
_guard: Vec<WorkerGuard>,
}
Expand Down Expand Up @@ -157,15 +158,13 @@ impl App for Instance {
async fn start(&mut self) -> Result<()> {
self.datanode.start_telemetry();

self.procedure_manager
.start()
.await
.context(error::StartProcedureManagerSnafu)?;

self.wal_provider
.start()
.await
.context(error::StartWalProviderSnafu)?;
self.leader_services_controller
.start(
self.procedure_manager.clone(),
self.wal_provider.clone(),
self.datanode.region_server(),
)
.await?;

plugins::start_frontend_plugins(self.frontend.instance.plugins().clone())
.await
Expand All @@ -187,10 +186,9 @@ impl App for Instance {
.await
.context(error::ShutdownFrontendSnafu)?;

self.procedure_manager
.stop()
.await
.context(error::StopProcedureManagerSnafu)?;
self.leader_services_controller
.stop(self.procedure_manager.clone())
.await?;

self.datanode
.shutdown()
Expand Down Expand Up @@ -420,6 +418,9 @@ impl StartCommand {

let mut builder = DatanodeBuilder::new(dn_opts, plugins.clone(), kv_backend.clone());
builder.with_cache_registry(layered_cache_registry.clone());
if let Some(writable) = creator.open_regions_writable_override {
builder.with_open_regions_writable_override(writable);
}
let datanode = builder.build().await.context(error::StartDatanodeSnafu)?;

let information_extension = Arc::new(StandaloneInformationExtension::new(
Expand Down Expand Up @@ -617,6 +618,7 @@ impl StartCommand {
flownode,
procedure_manager,
wal_provider,
leader_services_controller: creator.leader_services_controller,
_guard: vec![],
};
let result = InstanceCreatorResult {
Expand All @@ -642,7 +644,7 @@ impl StartCommand {
}

#[async_trait]
pub trait NodeManagerCreator {
pub trait NodeManagerCreator: Send + Sync {
async fn create(
&self,
kv_backend: &KvBackendRef,
Expand Down Expand Up @@ -688,7 +690,7 @@ impl MetadataKvBackendCreator for DefaultMetadataKvBackendCreator {
}
}

pub trait TableIdAllocatorCreator {
pub trait TableIdAllocatorCreator: Send + Sync {
fn create(&self, kv_backend: &KvBackendRef) -> Arc<Sequence>;
}

Expand All @@ -706,7 +708,7 @@ impl TableIdAllocatorCreator for DefaultTableIdAllocatorCreator {
}

#[async_trait]
pub trait ProcedureExecutorCreator {
pub trait ProcedureExecutorCreator: Send + Sync {
async fn create(
&self,
ddl_manager: DdlManagerRef,
Expand All @@ -730,6 +732,51 @@ impl ProcedureExecutorCreator for DefaultProcedureExecutorCreator {
}
}

#[async_trait]
pub trait StandaloneLeaderServicesController: Send + Sync {
/// Starts services that manage standalone metadata or WAL state.
///
/// The default implementation starts the procedure manager and WAL provider
/// during instance startup.
async fn start(
&self,
procedure_manager: ProcedureManagerRef,
wal_provider: WalProviderRef,
region_server: RegionServer,
) -> Result<()>;

/// Stops services started by [`StandaloneLeaderServicesController::start`].
async fn stop(&self, procedure_manager: ProcedureManagerRef) -> Result<()>;
}

pub struct DefaultStandaloneLeaderServicesController;

#[async_trait]
impl StandaloneLeaderServicesController for DefaultStandaloneLeaderServicesController {
async fn start(
&self,
procedure_manager: ProcedureManagerRef,
wal_provider: WalProviderRef,
_region_server: RegionServer,
) -> Result<()> {
procedure_manager
.start()
.await
.context(error::StartProcedureManagerSnafu)?;
wal_provider
.start()
.await
.context(error::StartWalProviderSnafu)
}

async fn stop(&self, procedure_manager: ProcedureManagerRef) -> Result<()> {
procedure_manager
.stop()
.await
.context(error::StopProcedureManagerSnafu)
}
}

/// `InstanceCreator` is used for grouping various component creators for building the
/// Standalone instance, suitable for customizing how the instance can be built.
pub struct InstanceCreator {
Expand All @@ -739,6 +786,8 @@ pub struct InstanceCreator {
node_manager_creator: Box<dyn NodeManagerCreator>,
table_id_allocator_creator: Box<dyn TableIdAllocatorCreator>,
procedure_executor_creator: Box<dyn ProcedureExecutorCreator>,
leader_services_controller: Box<dyn StandaloneLeaderServicesController>,
open_regions_writable_override: Option<bool>,
}

impl InstanceCreator {
Expand All @@ -752,6 +801,8 @@ impl InstanceCreator {
node_manager_creator,
table_id_allocator_creator,
procedure_executor_creator,
leader_services_controller: Box::new(DefaultStandaloneLeaderServicesController),
open_regions_writable_override: None,
}
}

Expand All @@ -762,6 +813,53 @@ impl InstanceCreator {
self.metadata_kv_backend_creator = metadata_kv_backend_creator;
self
}

/// Wraps the metadata backend creator while retaining the default creator.
///
/// This is useful for callers that need to add runtime behavior around
/// metadata access without reimplementing backend selection.
pub fn map_metadata_kv_backend_creator<F>(mut self, f: F) -> Self
where
F: FnOnce(Box<dyn MetadataKvBackendCreator>) -> Box<dyn MetadataKvBackendCreator>,
{
self.metadata_kv_backend_creator = f(self.metadata_kv_backend_creator);
self
}

/// Wraps node-manager creation while preserving the selected standalone node manager.
pub fn map_node_manager_creator<F>(mut self, f: F) -> Self
where
F: FnOnce(Box<dyn NodeManagerCreator>) -> Box<dyn NodeManagerCreator>,
{
self.node_manager_creator = f(self.node_manager_creator);
self
}

/// Wraps procedure-executor creation while preserving the current setup.
pub fn map_procedure_executor_creator<F>(mut self, f: F) -> Self
where
F: FnOnce(Box<dyn ProcedureExecutorCreator>) -> Box<dyn ProcedureExecutorCreator>,
{
self.procedure_executor_creator = f(self.procedure_executor_creator);
self
}

/// Replaces startup/shutdown ownership for procedure manager and WAL provider.
pub fn with_leader_services_controller(
mut self,
leader_services_controller: Box<dyn StandaloneLeaderServicesController>,
) -> Self {
self.leader_services_controller = leader_services_controller;
self
}

/// Overrides whether regions opened during startup should become writable.
///
/// `None` keeps the default startup behavior.
pub fn with_open_regions_writable_override(mut self, writable: bool) -> Self {
self.open_regions_writable_override = Some(writable);
self
}
}

impl Default for InstanceCreator {
Expand All @@ -771,6 +869,8 @@ impl Default for InstanceCreator {
node_manager_creator: Box::new(DefaultNodeManagerCreator),
table_id_allocator_creator: Box::new(DefaultTableIdAllocatorCreator),
procedure_executor_creator: Box::new(DefaultProcedureExecutorCreator),
leader_services_controller: Box::new(DefaultStandaloneLeaderServicesController),
open_regions_writable_override: None,
}
}
}
Expand Down
15 changes: 14 additions & 1 deletion src/datanode/src/datanode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,7 @@ pub struct DatanodeBuilder {
kv_backend: KvBackendRef,
cache_registry: Option<Arc<LayeredCacheRegistry>>,
topic_stats_reporter: Option<Box<dyn TopicStatsReporter>>,
open_regions_writable_override: Option<bool>,
#[cfg(feature = "enterprise")]
extension_range_provider_factory: Option<mito2::extension::BoxedExtensionRangeProviderFactory>,
}
Expand All @@ -176,6 +177,7 @@ impl DatanodeBuilder {
meta_client: None,
kv_backend,
cache_registry: None,
open_regions_writable_override: None,
#[cfg(feature = "enterprise")]
extension_range_provider_factory: None,
topic_stats_reporter: None,
Expand Down Expand Up @@ -205,6 +207,14 @@ impl DatanodeBuilder {
self
}

/// Overrides whether regions opened during datanode startup should become writable.
///
/// When unset, the builder uses its default writable policy for reopened regions.
pub fn with_open_regions_writable_override(&mut self, writable: bool) -> &mut Self {
self.open_regions_writable_override = Some(writable);
self
}

#[cfg(feature = "enterprise")]
pub fn with_extension_range_provider(
&mut self,
Expand Down Expand Up @@ -274,10 +284,13 @@ impl DatanodeBuilder {

let region_open_requests =
build_region_open_requests(node_id, self.kv_backend.clone()).await?;
let open_with_writable = self
.open_regions_writable_override
.unwrap_or(!controlled_by_metasrv);
let open_all_regions = open_all_regions(
region_server.clone(),
region_open_requests,
!controlled_by_metasrv,
open_with_writable,
self.opts.init_regions_parallelism,
// Ignore nonexistent regions in recovery mode.
is_recovery_mode,
Expand Down
Loading