Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions bindings/cpp/include/fluss.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -1000,6 +1000,8 @@ struct Configuration {
int64_t writer_batch_timeout_ms{100};
// Connect timeout in milliseconds for TCP transport connect
uint64_t connect_timeout_ms{120000};
// Request timeout in milliseconds for individual RPC calls
uint64_t request_timeout_ms{30000};
// Security protocol: "PLAINTEXT" (default, no auth) or "sasl" (SASL auth)
std::string security_protocol{"PLAINTEXT"};
// SASL mechanism (only "PLAIN" is supported)
Expand Down
1 change: 1 addition & 0 deletions bindings/cpp/src/ffi_converter.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ inline ffi::FfiConfig to_ffi_config(const Configuration& config) {
ffi_config.scanner_log_max_poll_records = config.scanner_log_max_poll_records;
ffi_config.writer_batch_timeout_ms = config.writer_batch_timeout_ms;
ffi_config.connect_timeout_ms = config.connect_timeout_ms;
ffi_config.request_timeout_ms = config.request_timeout_ms;
ffi_config.security_protocol = rust::String(config.security_protocol);
ffi_config.security_sasl_mechanism = rust::String(config.security_sasl_mechanism);
ffi_config.security_sasl_username = rust::String(config.security_sasl_username);
Expand Down
2 changes: 2 additions & 0 deletions bindings/cpp/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ mod ffi {
scanner_log_max_poll_records: usize,
writer_batch_timeout_ms: i64,
connect_timeout_ms: u64,
request_timeout_ms: u64,
security_protocol: String,
security_sasl_mechanism: String,
security_sasl_username: String,
Expand Down Expand Up @@ -654,6 +655,7 @@ fn new_connection(config: &ffi::FfiConfig) -> Result<*mut Connection, String> {
scanner_remote_log_read_concurrency: config.scanner_remote_log_read_concurrency,
scanner_log_max_poll_records: config.scanner_log_max_poll_records,
connect_timeout_ms: config.connect_timeout_ms,
request_timeout_ms: config.request_timeout_ms,
security_protocol: config.security_protocol.to_string(),
security_sasl_mechanism: config.security_sasl_mechanism.to_string(),
security_sasl_username: config.security_sasl_username.to_string(),
Expand Down
4 changes: 4 additions & 0 deletions bindings/python/fluss/__init__.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,10 @@ class Config:
def scanner_log_max_poll_records(self) -> int: ...
@scanner_log_max_poll_records.setter
def scanner_log_max_poll_records(self, num: int) -> None: ...
@property
def request_timeout_ms(self) -> int: ...
@request_timeout_ms.setter
def request_timeout_ms(self, timeout: int) -> None: ...

class FlussConnection:
@staticmethod
Expand Down
17 changes: 17 additions & 0 deletions bindings/python/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,11 @@ impl Config {
FlussError::new_err(format!("Invalid value '{value}' for '{key}': {e}"))
})?;
}
"request-timeout-ms" => {
config.request_timeout_ms = value.parse::<u64>().map_err(|e| {
FlussError::new_err(format!("Invalid value '{value}' for '{key}': {e}"))
})?;
}
"security.protocol" => {
config.security_protocol = value;
}
Expand Down Expand Up @@ -267,6 +272,18 @@ impl Config {
self.inner.connect_timeout_ms = timeout;
}

/// Get the request timeout in milliseconds
#[getter]
fn request_timeout_ms(&self) -> u64 {
self.inner.request_timeout_ms
}

/// Set the request timeout in milliseconds
#[setter]
fn set_request_timeout_ms(&mut self, timeout: u64) {
self.inner.request_timeout_ms = timeout;
}

/// Get the security protocol
#[getter]
fn security_protocol(&self) -> String {
Expand Down
10 changes: 8 additions & 2 deletions crates/fluss/src/client/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,17 +41,23 @@ impl FlussConnection {
.map_err(|msg| Error::IllegalArgument { message: msg })?;

let timeout = Duration::from_millis(arg.connect_timeout_ms);
let request_timeout = Duration::from_millis(arg.request_timeout_ms);
let connections = if arg.is_sasl_enabled() {
Arc::new(
RpcClient::new()
.with_sasl(
arg.security_sasl_username.clone(),
arg.security_sasl_password.clone(),
)
.with_timeout(timeout),
.with_timeout(timeout)
.with_request_timeout(request_timeout),
)
} else {
Arc::new(RpcClient::new().with_timeout(timeout))
Arc::new(
RpcClient::new()
.with_timeout(timeout)
.with_request_timeout(request_timeout),
)
};
let metadata = Metadata::new(arg.bootstrap_servers.as_str(), connections.clone()).await?;

Expand Down
8 changes: 8 additions & 0 deletions crates/fluss/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ const DEFAULT_WRITER_BATCH_TIMEOUT_MS: i64 = 100;

const DEFAULT_ACKS: &str = "all";
const DEFAULT_CONNECT_TIMEOUT_MS: u64 = 120_000;
const DEFAULT_REQUEST_TIMEOUT_MS: u64 = 30_000;
const DEFAULT_SECURITY_PROTOCOL: &str = "PLAINTEXT";
const DEFAULT_SASL_MECHANISM: &str = "PLAIN";

Expand Down Expand Up @@ -105,6 +106,11 @@ pub struct Config {
#[arg(long, default_value_t = DEFAULT_CONNECT_TIMEOUT_MS)]
pub connect_timeout_ms: u64,

/// Request timeout in milliseconds for individual RPC calls.
/// Default: 30000 (30 seconds).
#[arg(long, default_value_t = DEFAULT_REQUEST_TIMEOUT_MS)]
pub request_timeout_ms: u64,

#[arg(long, default_value_t = String::from(DEFAULT_SECURITY_PROTOCOL))]
pub security_protocol: String,

Expand Down Expand Up @@ -145,6 +151,7 @@ impl std::fmt::Debug for Config {
)
.field("writer_batch_timeout_ms", &self.writer_batch_timeout_ms)
.field("connect_timeout_ms", &self.connect_timeout_ms)
.field("request_timeout_ms", &self.request_timeout_ms)
.field("security_protocol", &self.security_protocol)
.field("security_sasl_mechanism", &self.security_sasl_mechanism)
.field("security_sasl_username", &self.security_sasl_username)
Expand All @@ -168,6 +175,7 @@ impl Default for Config {
scanner_log_max_poll_records: DEFAULT_MAX_POLL_RECORDS,
writer_batch_timeout_ms: DEFAULT_WRITER_BATCH_TIMEOUT_MS,
connect_timeout_ms: DEFAULT_CONNECT_TIMEOUT_MS,
request_timeout_ms: DEFAULT_REQUEST_TIMEOUT_MS,
security_protocol: String::from(DEFAULT_SECURITY_PROTOCOL),
security_sasl_mechanism: String::from(DEFAULT_SASL_MECHANISM),
security_sasl_username: String::new(),
Expand Down
4 changes: 4 additions & 0 deletions crates/fluss/src/rpc/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use crate::rpc::api_key::ApiKey;
use crate::rpc::api_version::ApiVersion;
use prost::DecodeError;
use std::sync::Arc;
use std::time::Duration;
use thiserror::Error;

#[derive(Error, Debug)]
Expand Down Expand Up @@ -51,4 +52,7 @@ pub enum RpcError {
api_key: ApiKey,
api_version: ApiVersion,
},

#[error("Request timed out after {timeout:?} for api_key={api_key:?}")]
RequestTimeout { timeout: Duration, api_key: ApiKey },
}
91 changes: 86 additions & 5 deletions crates/fluss/src/rpc/server_connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ pub struct RpcClient {
connections: RwLock<HashMap<String, ServerConnection>>,
client_id: Arc<str>,
timeout: Option<Duration>,
request_timeout: Option<Duration>,
max_message_size: usize,
sasl_config: Option<SaslConfig>,
}
Expand All @@ -81,6 +82,7 @@ impl RpcClient {
connections: Default::default(),
client_id: Arc::from(""),
timeout: None,
request_timeout: None,
max_message_size: usize::MAX,
sasl_config: None,
}
Expand All @@ -91,6 +93,11 @@ impl RpcClient {
self
}

pub fn with_request_timeout(mut self, timeout: Duration) -> Self {
self.request_timeout = Some(timeout);
self
}

pub fn with_sasl(mut self, username: String, password: String) -> Self {
self.sasl_config = Some(SaslConfig { username, password });
self
Expand Down Expand Up @@ -133,6 +140,7 @@ impl RpcClient {
BufStream::new(transport),
self.max_message_size,
self.client_id.clone(),
self.request_timeout,
);
let connection = ServerConnection::new(messenger);

Expand Down Expand Up @@ -266,14 +274,21 @@ pub struct ServerConnectionInner<RW> {

state: Arc<Mutex<ConnectionState>>,

request_timeout: Option<Duration>,

join_handle: JoinHandle<()>,
}

impl<RW> ServerConnectionInner<RW>
where
RW: AsyncRead + AsyncWrite + Send + 'static,
{
pub fn new(stream: RW, max_message_size: usize, client_id: Arc<str>) -> Self {
pub fn new(
stream: RW,
max_message_size: usize,
client_id: Arc<str>,
request_timeout: Option<Duration>,
) -> Self {
let (stream_read, stream_write) = tokio::io::split(stream);
let state = Arc::new(Mutex::new(ConnectionState::RequestMap(HashMap::default())));
let state_captured = Arc::clone(&state);
Expand Down Expand Up @@ -337,6 +352,7 @@ where
client_id,
request_id: AtomicI32::new(0),
state,
request_timeout,
join_handle,
}
}
Expand Down Expand Up @@ -388,10 +404,28 @@ where

self.send_message(buf).await?;
_cleanup_on_cancel.message_sent();
let mut response = rx.await.map_err(|e| Error::UnexpectedError {
message: "Got recvError, some one close the channel".to_string(),
source: Some(Box::new(e)),
})??;
let mut response = match self.request_timeout {
Some(timeout) => match tokio::time::timeout(timeout, rx).await {
Ok(result) => result.map_err(|e| Error::UnexpectedError {
message: "Got recvError, some one close the channel".to_string(),
source: Some(Box::new(e)),
})??,
Err(_elapsed) => {
if let ConnectionState::RequestMap(map) = self.state.lock().deref_mut() {
map.remove(&request_id);
}
return Err(RpcError::RequestTimeout {
timeout,
api_key: R::API_KEY,
}
.into());
}
},
None => rx.await.map_err(|e| Error::UnexpectedError {
message: "Got recvError, some one close the channel".to_string(),
source: Some(Box::new(e)),
})??,
};

if let Some(error_response) = response.header.error_response {
return Err(Error::FlussAPIError {
Expand Down Expand Up @@ -561,3 +595,50 @@ impl Drop for CleanupRequestStateOnCancel {
}
}
}

#[cfg(test)]
mod tests {
use super::*;
use crate::metadata::TablePath;
use crate::rpc::message::TableExistsRequest;

#[tokio::test]
async fn test_request_timeout() {
// Create a duplex stream where the "server" side never responds.
let (client_stream, _server_stream) = tokio::io::duplex(1024);

let conn = ServerConnectionInner::new(
BufStream::new(client_stream),
usize::MAX,
Arc::from("test"),
Some(Duration::from_millis(50)),
);

let table_path = TablePath::new("db", "table");
let request = TableExistsRequest::new(&table_path);
let result = conn.request(request).await;

assert!(result.is_err());
let err = result.unwrap_err();
let err_msg = err.to_string();
assert!(
err_msg.contains("timed out"),
"expected timeout error, got: {err_msg}"
);
}

#[tokio::test]
async fn test_request_no_timeout() {
// With None timeout, verify a connection can still be constructed without panics.
let (client_stream, _server_stream) = tokio::io::duplex(1024);

let conn = ServerConnectionInner::new(
BufStream::new(client_stream),
usize::MAX,
Arc::from("test"),
None,
);

assert!(!conn.is_poisoned());
}
}
Loading