Skip to content
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions bindings/cpp/include/fluss.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -1000,6 +1000,8 @@ struct Configuration {
int64_t writer_batch_timeout_ms{100};
// Connect timeout in milliseconds for TCP transport connect
uint64_t connect_timeout_ms{120000};
// Request timeout in milliseconds for individual RPC calls
uint64_t request_timeout_ms{30000};
// Security protocol: "PLAINTEXT" (default, no auth) or "sasl" (SASL auth)
std::string security_protocol{"PLAINTEXT"};
// SASL mechanism (only "PLAIN" is supported)
Expand Down
1 change: 1 addition & 0 deletions bindings/cpp/src/ffi_converter.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ inline ffi::FfiConfig to_ffi_config(const Configuration& config) {
ffi_config.scanner_log_max_poll_records = config.scanner_log_max_poll_records;
ffi_config.writer_batch_timeout_ms = config.writer_batch_timeout_ms;
ffi_config.connect_timeout_ms = config.connect_timeout_ms;
ffi_config.request_timeout_ms = config.request_timeout_ms;
ffi_config.security_protocol = rust::String(config.security_protocol);
ffi_config.security_sasl_mechanism = rust::String(config.security_sasl_mechanism);
ffi_config.security_sasl_username = rust::String(config.security_sasl_username);
Expand Down
2 changes: 2 additions & 0 deletions bindings/cpp/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ mod ffi {
scanner_log_max_poll_records: usize,
writer_batch_timeout_ms: i64,
connect_timeout_ms: u64,
request_timeout_ms: u64,
security_protocol: String,
security_sasl_mechanism: String,
security_sasl_username: String,
Expand Down Expand Up @@ -671,6 +672,7 @@ fn new_connection(config: &ffi::FfiConfig) -> ffi::FfiPtrResult {
scanner_remote_log_read_concurrency: config.scanner_remote_log_read_concurrency,
scanner_log_max_poll_records: config.scanner_log_max_poll_records,
connect_timeout_ms: config.connect_timeout_ms,
request_timeout_ms: config.request_timeout_ms,
security_protocol: config.security_protocol.to_string(),
security_sasl_mechanism: config.security_sasl_mechanism.to_string(),
security_sasl_username: config.security_sasl_username.to_string(),
Expand Down
3 changes: 3 additions & 0 deletions bindings/python/fluss/__init__.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,9 @@ class Config:
@scanner_log_max_poll_records.setter
def scanner_log_max_poll_records(self, num: int) -> None: ...
@property
def request_timeout_ms(self) -> int: ...
@request_timeout_ms.setter
def request_timeout_ms(self, timeout: int) -> None: ...
def writer_batch_timeout_ms(self) -> int: ...
@writer_batch_timeout_ms.setter
def writer_batch_timeout_ms(self, timeout: int) -> None: ...
Expand Down
17 changes: 17 additions & 0 deletions bindings/python/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,11 @@ impl Config {
FlussError::new_err(format!("Invalid value '{value}' for '{key}': {e}"))
})?;
}
"request-timeout" => {
config.request_timeout_ms = value.parse::<u64>().map_err(|e| {
FlussError::new_err(format!("Invalid value '{value}' for '{key}': {e}"))
})?;
}
"security.protocol" => {
config.security_protocol = value;
}
Expand Down Expand Up @@ -267,6 +272,18 @@ impl Config {
self.inner.connect_timeout_ms = timeout;
}

/// Get the request timeout in milliseconds
#[getter]
fn request_timeout_ms(&self) -> u64 {
self.inner.request_timeout_ms
}

/// Set the request timeout in milliseconds
#[setter]
fn set_request_timeout_ms(&mut self, timeout: u64) {
self.inner.request_timeout_ms = timeout;
}

/// Get the security protocol
#[getter]
fn security_protocol(&self) -> String {
Expand Down
10 changes: 8 additions & 2 deletions crates/fluss/src/client/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,17 +41,23 @@ impl FlussConnection {
.map_err(|msg| Error::IllegalArgument { message: msg })?;

let timeout = Duration::from_millis(arg.connect_timeout_ms);
let request_timeout = Duration::from_millis(arg.request_timeout_ms);
let connections = if arg.is_sasl_enabled() {
Arc::new(
RpcClient::new()
.with_sasl(
arg.security_sasl_username.clone(),
arg.security_sasl_password.clone(),
)
.with_timeout(timeout),
.with_connect_timeout(timeout)
.with_request_timeout(request_timeout),
)
} else {
Arc::new(RpcClient::new().with_timeout(timeout))
Arc::new(
RpcClient::new()
.with_connect_timeout(timeout)
.with_request_timeout(request_timeout),
)
};
let metadata = Metadata::new(arg.bootstrap_servers.as_str(), connections.clone()).await?;

Expand Down
8 changes: 8 additions & 0 deletions crates/fluss/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ const DEFAULT_WRITER_BATCH_TIMEOUT_MS: i64 = 100;

const DEFAULT_ACKS: &str = "all";
const DEFAULT_CONNECT_TIMEOUT_MS: u64 = 120_000;
const DEFAULT_REQUEST_TIMEOUT_MS: u64 = 30_000;
const DEFAULT_SECURITY_PROTOCOL: &str = "PLAINTEXT";
const DEFAULT_SASL_MECHANISM: &str = "PLAIN";

Expand Down Expand Up @@ -105,6 +106,11 @@ pub struct Config {
#[arg(long, default_value_t = DEFAULT_CONNECT_TIMEOUT_MS)]
pub connect_timeout_ms: u64,

/// Request timeout in milliseconds for individual RPC calls.
/// Default: 30000 (30 seconds).
#[arg(long, default_value_t = DEFAULT_REQUEST_TIMEOUT_MS)]
pub request_timeout_ms: u64,

#[arg(long, default_value_t = String::from(DEFAULT_SECURITY_PROTOCOL))]
pub security_protocol: String,

Expand Down Expand Up @@ -145,6 +151,7 @@ impl std::fmt::Debug for Config {
)
.field("writer_batch_timeout_ms", &self.writer_batch_timeout_ms)
.field("connect_timeout_ms", &self.connect_timeout_ms)
.field("request_timeout_ms", &self.request_timeout_ms)
.field("security_protocol", &self.security_protocol)
.field("security_sasl_mechanism", &self.security_sasl_mechanism)
.field("security_sasl_username", &self.security_sasl_username)
Expand All @@ -168,6 +175,7 @@ impl Default for Config {
scanner_log_max_poll_records: DEFAULT_MAX_POLL_RECORDS,
writer_batch_timeout_ms: DEFAULT_WRITER_BATCH_TIMEOUT_MS,
connect_timeout_ms: DEFAULT_CONNECT_TIMEOUT_MS,
request_timeout_ms: DEFAULT_REQUEST_TIMEOUT_MS,
security_protocol: String::from(DEFAULT_SECURITY_PROTOCOL),
security_sasl_mechanism: String::from(DEFAULT_SASL_MECHANISM),
security_sasl_username: String::new(),
Expand Down
4 changes: 4 additions & 0 deletions crates/fluss/src/rpc/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use crate::rpc::api_key::ApiKey;
use crate::rpc::api_version::ApiVersion;
use prost::DecodeError;
use std::sync::Arc;
use std::time::Duration;
use thiserror::Error;

#[derive(Error, Debug)]
Expand Down Expand Up @@ -51,4 +52,7 @@ pub enum RpcError {
api_key: ApiKey,
api_version: ApiVersion,
},

#[error("Request timed out after {timeout:?} for api_key={api_key:?}")]
RequestTimeout { timeout: Duration, api_key: ApiKey },
}
130 changes: 120 additions & 10 deletions crates/fluss/src/rpc/server_connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,8 @@ impl fmt::Debug for SaslConfig {
pub struct RpcClient {
connections: RwLock<HashMap<String, ServerConnection>>,
client_id: Arc<str>,
timeout: Option<Duration>,
connect_timeout: Option<Duration>,
request_timeout: Option<Duration>,
max_message_size: usize,
sasl_config: Option<SaslConfig>,
}
Expand All @@ -80,14 +81,20 @@ impl RpcClient {
RpcClient {
connections: Default::default(),
client_id: Arc::from(""),
timeout: None,
connect_timeout: None,
request_timeout: None,
max_message_size: usize::MAX,
sasl_config: None,
}
}

pub fn with_timeout(mut self, timeout: Duration) -> Self {
self.timeout = Some(timeout);
pub fn with_connect_timeout(mut self, timeout: Duration) -> Self {
self.connect_timeout = Some(timeout);
self
}

pub fn with_request_timeout(mut self, timeout: Duration) -> Self {
self.request_timeout = Some(timeout);
self
}

Expand Down Expand Up @@ -125,14 +132,15 @@ impl RpcClient {

async fn connect(&self, server_node: &ServerNode) -> Result<ServerConnection, Error> {
let url = server_node.url();
let transport = Transport::connect(&url, self.timeout)
let transport = Transport::connect(&url, self.connect_timeout)
.await
.map_err(|error| ConnectionError(error.to_string()))?;

let messenger = ServerConnectionInner::new(
BufStream::new(transport),
self.max_message_size,
self.client_id.clone(),
self.request_timeout,
);
let connection = ServerConnection::new(messenger);

Expand Down Expand Up @@ -266,14 +274,25 @@ pub struct ServerConnectionInner<RW> {

state: Arc<Mutex<ConnectionState>>,

/// Per-request timeout applied to the response-wait phase only.
/// The send (write) phase is not covered; a stalled `send_message` can
/// exceed this duration.
/// TODO: Full RPC deadline semantics are a potential future enhancement.
request_timeout: Option<Duration>,

join_handle: JoinHandle<()>,
}

impl<RW> ServerConnectionInner<RW>
where
RW: AsyncRead + AsyncWrite + Send + 'static,
{
pub fn new(stream: RW, max_message_size: usize, client_id: Arc<str>) -> Self {
pub fn new(
stream: RW,
max_message_size: usize,
client_id: Arc<str>,
request_timeout: Option<Duration>,
) -> Self {
let (stream_read, stream_write) = tokio::io::split(stream);
let state = Arc::new(Mutex::new(ConnectionState::RequestMap(HashMap::default())));
let state_captured = Arc::clone(&state);
Expand Down Expand Up @@ -301,9 +320,9 @@ where
match map.remove(&header.request_id) {
Some(active_request) => active_request,
_ => {
log::warn!(
log::debug!(
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: why change to debug?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think debug fits better here: because this is a normal late-response race after timeout/cancel, other than an operational warning IMHO

so in production environment, we probably will use metric/counter which probably better than warn spamming. WDYT? @luoyuxia

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let me know what you think of this, in the meantime, i rebased and resolved conflicts, thanks a lot! @luoyuxia 🙏

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree this is a normal late-response race.

However, a request timeout still suggests some underlying issue or risk that users should be aware of. Metrics would help, but logs are often more immediately useful in practice.

Fluss's Java client logs this as warn. I'm not sure whether Kafka client does the same for a similar case, but Kafka client may be a better reference if we want to follow common client behavior.

So I’m slightly leaning toward keeping warn for now.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks for response @luoyuxia

i checked Kafka client, it seems Kafka closes connection on timeout, so IIUC late responses can never arrive, aka, Kafka ignores response for request after timeout.

But it seems it treats as info level:

         log.info("Disconnecting from node {} due to request timeout.", nodeId);

so i think here i will switch to warn level then! 🙏

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated!

PTAL @luoyuxia thank you!

request_id:% = header.request_id;
"Got response for unknown request",
"Ignoring response for unknown request (likely timed out or cancelled)",
);
continue;
}
Expand Down Expand Up @@ -337,6 +356,7 @@ where
client_id,
request_id: AtomicI32::new(0),
state,
request_timeout,
join_handle,
}
}
Expand Down Expand Up @@ -388,8 +408,29 @@ where

self.send_message(buf).await?;
_cleanup_on_cancel.message_sent();
let mut response = rx.await.map_err(|e| Error::UnexpectedError {
message: "Got recvError, some one close the channel".to_string(),
let recv_result = match self.request_timeout {
Some(timeout) => match tokio::time::timeout(timeout, rx).await {
Ok(result) => result,
Err(_elapsed) => {
if let ConnectionState::RequestMap(map) = self.state.lock().deref_mut() {
map.remove(&request_id);
}
return Err(RpcError::RequestTimeout {
timeout,
api_key: R::API_KEY,
}
.into());
}
},
None => rx.await,
};

let mut response = recv_result.map_err(|e| Error::UnexpectedError {
message: format!(
"Response channel closed for request_id={request_id} api_key={:?}; \
connection may be closed or poisoned",
R::API_KEY
),
source: Some(Box::new(e)),
})??;

Expand Down Expand Up @@ -561,3 +602,72 @@ impl Drop for CleanupRequestStateOnCancel {
}
}
}

#[cfg(test)]
mod tests {
use super::*;
use crate::metadata::TablePath;
use crate::rpc::message::TableExistsRequest;

#[tokio::test]
async fn test_request_timeout() {
// Create a duplex stream where the "server" side never responds.
let (client_stream, _server_stream) = tokio::io::duplex(1024);

let conn = ServerConnectionInner::new(
BufStream::new(client_stream),
usize::MAX,
Arc::from("test"),
Some(Duration::from_millis(50)),
);

let table_path = TablePath::new("db", "table");
let request = TableExistsRequest::new(&table_path);
let result = conn.request(request).await;

assert!(result.is_err());
let err = result.unwrap_err();
assert!(
matches!(
err,
Error::RpcError {
source: RpcError::RequestTimeout { .. },
..
}
),
"expected RequestTimeout, got: {err}"
);

// Timeout must not poison the connection — other requests should still work.
assert!(!conn.is_poisoned());

// The timed-out request must be removed from the request map (no state leak).
if let ConnectionState::RequestMap(map) = conn.state.lock().deref_mut() {
assert!(map.is_empty(), "request map should be empty after timeout");
} else {
panic!("connection should not be poisoned after a timeout");
}
}

#[tokio::test]
async fn test_request_no_timeout() {
// With no request timeout configured, request should remain pending
// when the server does not respond.
let (client_stream, _server_stream) = tokio::io::duplex(1024);

let conn = ServerConnectionInner::new(
BufStream::new(client_stream),
usize::MAX,
Arc::from("test"),
None,
);

let table_path = TablePath::new("db", "table");
let request = TableExistsRequest::new(&table_path);
let pending = tokio::time::timeout(Duration::from_millis(50), conn.request(request)).await;
assert!(
pending.is_err(),
"expected request to remain pending without per-request timeout"
);
}
}
1 change: 1 addition & 0 deletions website/docs/user-guide/cpp/api-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ Complete API reference for the Fluss C++ client.
| `scanner_remote_log_read_concurrency` | `size_t` | `4` | Streaming read concurrency within a remote log file |
| `scanner_log_max_poll_records` | `size_t` | `500` | Maximum number of records returned in a single Poll() |
| `connect_timeout_ms` | `uint64_t` | `120000` | TCP connect timeout in milliseconds |
| `request_timeout_ms` | `uint64_t` | `30000` | Per-request RPC timeout in milliseconds |
| `security_protocol` | `std::string` | `"PLAINTEXT"` | `"PLAINTEXT"` (default) or `"sasl"` for SASL auth |
| `security_sasl_mechanism` | `std::string` | `"PLAIN"` | SASL mechanism (only `"PLAIN"` is supported) |
| `security_sasl_username` | `std::string` | (empty) | SASL username (required when protocol is `"sasl"`) |
Expand Down
1 change: 1 addition & 0 deletions website/docs/user-guide/cpp/example/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ config.remote_file_download_thread_num = 3; // Download threa
config.scanner_remote_log_read_concurrency = 4; // In-file remote log read concurrency
config.scanner_log_max_poll_records = 500; // Max records per poll
config.connect_timeout_ms = 120000; // TCP connect timeout (ms)
config.request_timeout_ms = 30000; // Per-request RPC timeout (ms)
```

## SASL Authentication
Expand Down
1 change: 1 addition & 0 deletions website/docs/user-guide/python/api-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ Complete API reference for the Fluss Python client.
| `scanner_remote_log_read_concurrency` | Get/set streaming read concurrency within a remote log file |
| `scanner_log_max_poll_records` | Get/set max number of records returned in a single poll() |
| `connect_timeout_ms` | Get/set TCP connect timeout in milliseconds |
| `request_timeout_ms` | Get/set per-request RPC timeout in milliseconds |
| `security_protocol` | Get/set security protocol (`"PLAINTEXT"` or `"sasl"`) |
| `security_sasl_mechanism` | Get/set SASL mechanism (only `"PLAIN"` is supported) |
| `security_sasl_username` | Get/set SASL username (required when protocol is `"sasl"`) |
Expand Down
1 change: 1 addition & 0 deletions website/docs/user-guide/python/example/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ with await fluss.FlussConnection.create(config) as conn:
| `scanner.remote-log.read-concurrency` | Streaming read concurrency within a remote log file | `4` |
| `scanner.log.max-poll-records` | Max records returned in a single poll() | `500` |
| `connect-timeout` | TCP connect timeout in milliseconds | `120000` |
| `request-timeout` | Per-request RPC timeout in milliseconds | `30000` |
| `security.protocol` | `PLAINTEXT` (default) or `sasl` for SASL auth | `PLAINTEXT` |
| `security.sasl.mechanism` | SASL mechanism (only `PLAIN` is supported) | `PLAIN` |
| `security.sasl.username` | SASL username (required when protocol is `sasl`) | (empty) |
Expand Down