Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions crates/bin/ampd/src/server_cmd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -168,5 +168,6 @@ pub fn config_from_common(config: &CommonConfig) -> ServerConfig {
max_mem_mb: config.max_mem_mb,
query_max_mem_mb: config.query_max_mem_mb,
spill_location: config.spill_location.clone(),
metadata_fetch_concurrency: config.metadata_fetch_concurrency,
}
}
1 change: 1 addition & 0 deletions crates/bin/ampd/src/worker_cmd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@ pub(crate) fn config_from_common(config: &Config) -> worker::config::Config {
max_mem_mb: config.max_mem_mb,
query_max_mem_mb: config.query_max_mem_mb,
spill_location: config.spill_location.clone(),
metadata_fetch_concurrency: config.metadata_fetch_concurrency,
parquet: config.parquet.clone(),
events_config: config.worker_events.clone(),
}
Expand Down
8 changes: 8 additions & 0 deletions crates/config/src/config_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,9 @@ pub struct ConfigFile {
/// Paths for DataFusion temporary files for spill-to-disk (default: [])
#[serde(default)]
pub spill_location: Vec<PathBuf>,
/// Maximum concurrent parquet metadata fetches during query planning (default: 32)
#[serde(default = "default_metadata_fetch_concurrency")]
pub metadata_fetch_concurrency: usize,

// Operational timing
/// Polling interval for new blocks during dump in seconds (default: 1.0)
Expand Down Expand Up @@ -254,6 +257,11 @@ fn default_keep_alive_interval() -> u64 {
DEFAULT_KEEP_ALIVE_INTERVAL
}

/// Default concurrent metadata fetches during query planning.
fn default_metadata_fetch_concurrency() -> usize {
32
}

/// Error when loading configuration from a TOML file.
#[derive(Debug, thiserror::Error)]
#[error("Failed to load configuration file")]
Expand Down
3 changes: 3 additions & 0 deletions crates/config/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ fn resolve_config(
poll_interval: config_file.poll_interval_secs.into(),
keep_alive_interval: config_file.keep_alive_interval,
worker_events,
metadata_fetch_concurrency: config_file.metadata_fetch_concurrency,
})
}

Expand Down Expand Up @@ -230,6 +231,8 @@ pub struct Config {
pub keep_alive_interval: u64,
/// Worker event streaming configuration.
pub worker_events: WorkerEventsConfig,
/// Maximum concurrent parquet metadata fetches during query planning.
pub metadata_fetch_concurrency: usize,
}

/// Configuration for worker event streaming.
Expand Down
15 changes: 13 additions & 2 deletions crates/core/common/src/catalog/physical/snapshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,8 @@ pub struct QueryableSnapshot {
reader_factory: Arc<reader::AmpReaderFactory>,
/// The dataset reference portion of SQL table references (e.g. `anvil_rpc`).
sql_schema_name: String,
/// Maximum concurrent parquet metadata fetches during query planning.
metadata_fetch_concurrency: usize,
}

impl QueryableSnapshot {
Expand All @@ -143,6 +145,7 @@ impl QueryableSnapshot {
snapshot: &PhyTableSnapshot,
store: DataStore,
sql_schema_name: String,
metadata_fetch_concurrency: usize,
) -> Result<Self, MultiNetworkSegmentsError> {
let reader_factory = Arc::new(reader::AmpReaderFactory {
location_id: snapshot.physical_table().location_id(),
Expand All @@ -155,6 +158,7 @@ impl QueryableSnapshot {
synced_range: snapshot.synced_range()?,
reader_factory,
sql_schema_name,
metadata_fetch_concurrency,
})
}

Expand Down Expand Up @@ -243,15 +247,22 @@ impl QueryableSnapshot {
}

/// Resolves file metadata and computes statistics for the scan plan.
///
/// Fetches parquet metadata concurrently for up to `metadata_fetch_concurrency()`
/// files at a time, using `.buffered()` to preserve ordering for deterministic
/// round-robin partition assignment.
async fn resolve_file_groups(
&self,
segments: &[&Segment],
target_partitions: usize,
table_schema: SchemaRef,
) -> DataFusionResult<(Vec<FileGroup>, datafusion::common::Statistics)> {
let file_count = segments.len();
let file_stream =
futures::stream::iter(segments.iter()).then(|s| self.to_partitioned_file(s));
let futs: Vec<_> = segments
.iter()
.map(|s| self.to_partitioned_file(s))
.collect();
let file_stream = futures::stream::iter(futs).buffered(self.metadata_fetch_concurrency);
let partitioned = round_robin(file_stream, file_count, target_partitions)
.await
.map_err(|e| DataFusionError::External(e.into()))?;
Expand Down
14 changes: 11 additions & 3 deletions crates/core/common/src/context/exec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -333,6 +333,7 @@ pub struct ExecContextBuilder {
disk_manager: Arc<DiskManager>,
cache_manager: Arc<CacheManager>,
object_store_registry: Arc<dyn ObjectStoreRegistry>,
metadata_fetch_concurrency: usize,
table_catalogs: BTreeMap<String, Arc<dyn TableAsyncCatalogProvider>>,
func_catalogs: BTreeMap<String, Arc<dyn FuncAsyncCatalogProvider>>,
}
Expand All @@ -353,6 +354,7 @@ impl ExecContextBuilder {
isolate_pool: None,
global_memory_pool: env.global_memory_pool,
query_max_mem_mb: env.query_max_mem_mb,
metadata_fetch_concurrency: env.metadata_fetch_concurrency,
disk_manager: env.disk_manager,
cache_manager: env.cache_manager,
object_store_registry: env.object_store_registry,
Expand Down Expand Up @@ -438,9 +440,14 @@ impl ExecContextBuilder {
let query_snapshots = physical_table
.table_snapshots()
.map(|(s, sql_schema_name)| {
QueryableSnapshot::from_snapshot(s, self.store.clone(), sql_schema_name.to_string())
.map(Arc::new)
.map_err(CreateContextError::MultiNetworkSegments)
QueryableSnapshot::from_snapshot(
s,
self.store.clone(),
sql_schema_name.to_string(),
self.metadata_fetch_concurrency,
)
.map(Arc::new)
.map_err(CreateContextError::MultiNetworkSegments)
})
.collect::<Result<Vec<_>, _>>()?;

Expand All @@ -458,6 +465,7 @@ impl ExecContextBuilder {
store: self.store,
datasets_cache: self.datasets_cache,
ethcall_udfs_cache: self.ethcall_udfs_cache,
metadata_fetch_concurrency: self.metadata_fetch_concurrency,
};

// Compose a SessionStateBuilder from the stored components (including
Expand Down
5 changes: 5 additions & 0 deletions crates/core/common/src/exec_env.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,9 @@ pub struct ExecEnv {

/// The EthCall UDFs cache used for eth_call UDF creation.
pub ethcall_udfs_cache: EthCallUdfsCache,

/// Maximum concurrent parquet metadata fetches during query planning.
pub metadata_fetch_concurrency: usize,
}

/// Creates a ExecEnv with specified memory and cache configuration
Expand All @@ -91,6 +94,7 @@ pub fn create(
max_mem_mb: usize,
query_max_mem_mb: usize,
spill_location: &[PathBuf],
metadata_fetch_concurrency: usize,
store: DataStore,
datasets_cache: DatasetsCache,
ethcall_udfs_cache: EthCallUdfsCache,
Expand Down Expand Up @@ -129,5 +133,6 @@ pub fn create(
store,
datasets_cache,
ethcall_udfs_cache,
metadata_fetch_concurrency,
})
}
2 changes: 2 additions & 0 deletions crates/core/common/tests/it_session_async_resolution.rs
Original file line number Diff line number Diff line change
Expand Up @@ -407,6 +407,7 @@ async fn exec_statement_to_plan_with_qualified_function_uses_async_pre_resolutio
store: data_store,
datasets_cache,
ethcall_udfs_cache,
metadata_fetch_concurrency: 32,
};

let (amp_table_catalog, amp_table_schema_requests, _amp_table_requests) =
Expand Down Expand Up @@ -508,6 +509,7 @@ async fn exec_statement_to_plan_with_overlapping_async_and_physical_tables_succe
store: data_store.clone(),
datasets_cache,
ethcall_udfs_cache,
metadata_fetch_concurrency: 32,
};

// Create a physical table under "test_schema.blocks" — the same name
Expand Down
2 changes: 2 additions & 0 deletions crates/core/worker-datasets-derived/src/job_ctx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ pub struct Config {
pub query_max_mem_mb: usize,
/// Directory paths for DataFusion query spilling.
pub spill_location: Vec<PathBuf>,
/// Maximum concurrent parquet metadata fetches during query planning.
pub metadata_fetch_concurrency: usize,
/// Progress event emission interval.
pub progress_interval: Duration,
/// Parquet file configuration.
Expand Down
1 change: 1 addition & 0 deletions crates/core/worker-datasets-derived/src/job_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,7 @@ pub async fn execute(
ctx.config.max_mem_mb,
ctx.config.query_max_mem_mb,
&ctx.config.spill_location,
ctx.config.metadata_fetch_concurrency,
ctx.data_store.clone(),
ctx.datasets_cache.clone(),
ctx.ethcall_udfs_cache.clone(),
Expand Down
2 changes: 2 additions & 0 deletions crates/services/server/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,6 @@ pub struct Config {
pub query_max_mem_mb: usize,
/// Paths for DataFusion temporary files for spill-to-disk
pub spill_location: Vec<PathBuf>,
/// Maximum concurrent parquet metadata fetches during query planning
pub metadata_fetch_concurrency: usize,
}
1 change: 1 addition & 0 deletions crates/services/server/src/flight.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ impl Service {
config.max_mem_mb,
config.query_max_mem_mb,
&config.spill_location,
config.metadata_fetch_concurrency,
data_store,
datasets_cache,
ethcall_udfs_cache,
Expand Down
3 changes: 3 additions & 0 deletions crates/services/worker/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@ pub struct Config {
/// Directory paths for DataFusion query spilling
pub spill_location: Vec<PathBuf>,

/// Maximum concurrent parquet metadata fetches during query planning
pub metadata_fetch_concurrency: usize,

/// Parquet file configuration
pub parquet: ParquetConfig,

Expand Down
1 change: 1 addition & 0 deletions crates/services/worker/src/service/job_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ pub(super) async fn new(
max_mem_mb: job_ctx.config.max_mem_mb,
query_max_mem_mb: job_ctx.config.query_max_mem_mb,
spill_location: job_ctx.config.spill_location.clone(),
metadata_fetch_concurrency: job_ctx.config.metadata_fetch_concurrency,
progress_interval: job_ctx
.config
.events_config
Expand Down
7 changes: 7 additions & 0 deletions docs/schemas/config/ampd.spec.json
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,13 @@
"default": 0,
"minimum": 0
},
"metadata_fetch_concurrency": {
"description": "Maximum concurrent parquet metadata fetches during query planning (default: 32)",
"type": "integer",
"format": "uint",
"default": 32,
"minimum": 0
},
"microbatch_max_interval": {
"description": "Max interval for derived dataset dump microbatches in blocks (default: 100000)",
"type": "integer",
Expand Down
1 change: 1 addition & 0 deletions tests/src/testlib/fixtures/daemon_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -161,5 +161,6 @@ fn server_config_from_common(config: &amp_config::Config) -> Config {
max_mem_mb: config.max_mem_mb,
query_max_mem_mb: config.query_max_mem_mb,
spill_location: config.spill_location.clone(),
metadata_fetch_concurrency: config.metadata_fetch_concurrency,
}
}
1 change: 1 addition & 0 deletions tests/src/testlib/fixtures/daemon_worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,7 @@ fn worker_config_from_common(config: &amp_config::Config) -> Config {
max_mem_mb: config.max_mem_mb,
query_max_mem_mb: config.query_max_mem_mb,
spill_location: config.spill_location.clone(),
metadata_fetch_concurrency: config.metadata_fetch_concurrency,
parquet: config.parquet.clone(),
events_config: config.worker_events.clone(),
}
Expand Down
Loading