Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion crates/fluss/src/client/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,9 @@ impl FlussConnection {
pub async fn new(arg: Config) -> Result<Self> {
arg.validate_security()
.map_err(|msg| Error::IllegalArgument { message: msg })?;
arg.validate_scanner_fetch()
arg.validate_scanner()
.map_err(|msg| Error::IllegalArgument { message: msg })?;
arg.validate_writer()
.map_err(|msg| Error::IllegalArgument { message: msg })?;

let timeout = Duration::from_millis(arg.connect_timeout_ms);
Expand Down
2 changes: 1 addition & 1 deletion crates/fluss/src/client/table/remote_log.rs
Original file line number Diff line number Diff line change
Expand Up @@ -778,7 +778,7 @@ impl RemoteLogDownloader {
let fetcher = Arc::new(ProductionFetcher {
credentials_rx,
local_log_dir: Arc::new(local_log_dir),
remote_log_read_concurrency: remote_log_read_concurrency.max(1),
remote_log_read_concurrency,
});

Self::new_with_fetcher(fetcher, max_prefetch_segments, max_concurrent_downloads)
Expand Down
178 changes: 164 additions & 14 deletions crates/fluss/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -324,7 +324,16 @@ impl Config {
}
Ok(())
}
pub fn validate_scanner_fetch(&self) -> Result<(), String> {
pub fn validate_scanner(&self) -> Result<(), String> {
if self.scanner_remote_log_prefetch_num == 0 {
return Err("scanner_remote_log_prefetch_num must be > 0".to_string());
}
if self.scanner_remote_log_read_concurrency == 0 {
return Err("scanner_remote_log_read_concurrency must be > 0".to_string());
}
if self.remote_file_download_thread_num == 0 {
return Err("remote_file_download_thread_num must be > 0".to_string());
}
if self.scanner_log_fetch_min_bytes <= 0 {
return Err("scanner_log_fetch_min_bytes must be > 0".to_string());
}
Expand All @@ -350,6 +359,57 @@ impl Config {
}
Ok(())
}

pub fn validate_writer(&self) -> Result<(), String> {
if self.writer_request_max_size <= 0 {
return Err("writer_request_max_size must be > 0".to_string());
}
if self.writer_batch_size <= 0 {
return Err("writer_batch_size must be > 0".to_string());
}
if self.writer_batch_timeout_ms < 0 {
return Err("writer_batch_timeout_ms must be >= 0".to_string());
}
if self.writer_max_inflight_requests_per_bucket == 0 {
return Err("writer_max_inflight_requests_per_bucket must be > 0".to_string());
}
if self.writer_buffer_memory_size == 0 {
return Err("writer_buffer_memory_size must be > 0".to_string());
}
if self.writer_batch_size > self.writer_request_max_size {
return Err("writer_batch_size must be <= writer_request_max_size".to_string());
}
if self.writer_batch_size as usize > self.writer_buffer_memory_size {
return Err("writer_batch_size must be <= writer_buffer_memory_size".to_string());
}
// idempotence checks
if !self.writer_enable_idempotence {
return Ok(());
}
let acks_is_all = self.writer_acks.eq_ignore_ascii_case("all") || self.writer_acks == "-1";
if !acks_is_all {
return Err(format!(
"Idempotent writes require acks='all' (-1), but got acks='{}'",
self.writer_acks
));
}
if self.writer_retries <= 0 {
return Err(format!(
"Idempotent writes require retries > 0, but got retries={}",
self.writer_retries
));
}
if self.writer_max_inflight_requests_per_bucket
> MAX_IN_FLIGHT_REQUESTS_PER_BUCKET_FOR_IDEMPOTENCE
{
return Err(format!(
"Idempotent writes require max-inflight-requests-per-bucket <= {}, but got {}",
MAX_IN_FLIGHT_REQUESTS_PER_BUCKET_FOR_IDEMPOTENCE,
self.writer_max_inflight_requests_per_bucket
));
}
Ok(())
Copy link
Copy Markdown
Contributor

@fresh-borzoni fresh-borzoni Apr 2, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we need to have these checks as well:

  • writer_batch_size <= writer_request_max_size - otherwise batches never drain as they exceed max size defined for request
  • writer_batch_size <= writer_buffer_memory_size - or If a single full batch exceeds max_size, it doesn't fit buffer and just keeps piling, we have runtime check for it, but better to validate at config level

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added the requested checks in config validation

}
}

#[cfg(test)]
Expand Down Expand Up @@ -419,13 +479,38 @@ mod tests {
};
assert!(config.validate_security().is_err());
}

#[test]
fn test_scanner_fetch_defaults_valid() {
fn test_scanner_defaults_valid() {
let config = Config::default();
assert!(config.validate_scanner_fetch().is_ok());
assert_eq!(config.scanner_log_fetch_max_bytes, 16 * 1024 * 1024);
assert_eq!(config.scanner_log_fetch_min_bytes, 1);
assert_eq!(config.scanner_log_fetch_wait_max_time_ms, 500);
assert!(config.validate_scanner().is_ok());
}

#[test]
fn test_scanner_remote_log_prefetch_num_zero() {
let config = Config {
scanner_remote_log_prefetch_num: 0,
..Config::default()
};
assert!(config.validate_scanner().is_err());
}

#[test]
fn test_scanner_remote_log_read_concurrency_zero() {
let config = Config {
scanner_remote_log_read_concurrency: 0,
..Config::default()
};
assert!(config.validate_scanner().is_err());
}

#[test]
fn test_remote_file_download_thread_num_zero() {
let config = Config {
remote_file_download_thread_num: 0,
..Config::default()
};
assert!(config.validate_scanner().is_err());
}

#[test]
Expand All @@ -435,7 +520,7 @@ mod tests {
scanner_log_fetch_max_bytes: 1,
..Config::default()
};
assert!(config.validate_scanner_fetch().is_err());
assert!(config.validate_scanner().is_err());
}

#[test]
Expand All @@ -444,13 +529,78 @@ mod tests {
scanner_log_fetch_wait_max_time_ms: -1,
..Config::default()
};
assert!(config.validate_scanner_fetch().is_err());
assert!(config.validate_scanner().is_err());
}

#[test]
fn test_idempotence_default_is_valid() {
fn test_writer_defaults_valid() {
let config = Config::default();
assert!(config.validate_idempotence().is_ok());
assert!(config.validate_writer().is_ok());
}

#[test]
fn test_writer_request_max_size_zero() {
let config = Config {
writer_request_max_size: 0,
..Config::default()
};
assert!(config.validate_writer().is_err());
}

#[test]
fn test_writer_batch_size_zero() {
let config = Config {
writer_batch_size: 0,
..Config::default()
};
assert!(config.validate_writer().is_err());
}

#[test]
fn test_writer_batch_timeout_negative() {
let config = Config {
writer_batch_timeout_ms: -1,
..Config::default()
};
assert!(config.validate_writer().is_err());
}

#[test]
fn test_writer_max_inflight_requests_per_bucket_zero() {
let config = Config {
writer_max_inflight_requests_per_bucket: 0,
..Config::default()
};
assert!(config.validate_writer().is_err());
}

#[test]
fn test_writer_buffer_memory_size_zero() {
let config = Config {
writer_buffer_memory_size: 0,
..Config::default()
};
assert!(config.validate_writer().is_err());
}

#[test]
fn test_writer_batch_size_exceeds_request_max_size() {
let config = Config {
writer_batch_size: 20 * 1024 * 1024,
writer_request_max_size: 10 * 1024 * 1024,
..Config::default()
};
assert!(config.validate_writer().is_err());
}

#[test]
fn test_writer_batch_size_exceeds_buffer_memory_size() {
let config = Config {
writer_batch_size: 128 * 1024 * 1024,
writer_buffer_memory_size: 64 * 1024 * 1024,
..Config::default()
};
assert!(config.validate_writer().is_err());
}

#[test]
Expand All @@ -462,7 +612,7 @@ mod tests {
writer_max_inflight_requests_per_bucket: 100,
..Config::default()
};
assert!(config.validate_idempotence().is_ok());
assert!(config.validate_writer().is_ok());
}

#[test]
Expand All @@ -472,7 +622,7 @@ mod tests {
writer_acks: "1".to_string(),
..Config::default()
};
assert!(config.validate_idempotence().is_err());
assert!(config.validate_writer().is_err());
}

#[test]
Expand All @@ -482,7 +632,7 @@ mod tests {
writer_retries: 0,
..Config::default()
};
assert!(config.validate_idempotence().is_err());
assert!(config.validate_writer().is_err());
}

#[test]
Expand All @@ -492,6 +642,6 @@ mod tests {
writer_max_inflight_requests_per_bucket: 10,
..Config::default()
};
assert!(config.validate_idempotence().is_err());
assert!(config.validate_writer().is_err());
}
}