fix: include http_request_id in request-wise priming event IDs#799
fix: include http_request_id in request-wise priming event IDs#799
Conversation
df297e0 to
b881409
Compare
b881409 to
f15b728
Compare
There was a problem hiding this comment.
Hi @DaleSeo,
Thanks for putting in these changes.
Keeping the cache after completion works.
But there is an edge case where if there are 2 overlapping parallel calls things don't work. In fact, the client may go into a loop waiting for a response because when a stream is not found we fallback to the resume_or_shadow_common.
I've created a test that demonstrates this at: https://github.com/binahm/rust-sdk/blob/fix/priming-event-ids/crates/rmcp/tests/test_streamable_http_priming.rs . I added to your branch 2 tests:
- test_long_running_tool_single_via_mcp_client (passes)
- test_long_running_tool_parallel_via_mcp_client (fails)
I've also added comments to the PR code that try to explain what I've seen.
| "Request-wise channel completed, falling back to common channel" | ||
| "Request-wise channel not found, falling back to common channel" | ||
| ); | ||
| self.resume_or_shadow_common(last_event_id.index).await |
There was a problem hiding this comment.
Is this correct to fallback to resume_or_shadow_common? If there was a http_request_id provided and it is not found, shouldn't we provide an error? Seems to me that this will lead to providing messages from a different stream than the one the client expects.
From the spec:
The server MUST NOT replay messages that would have been delivered on a different stream.
There was a problem hiding this comment.
Good catch, @glicht. Resume now returns SessionError::ChannelClosed when the http_request_id is provided but not found in tx_router. The tower handler catches the error and creates a fresh standalone stream.
| async fn establish_request_wise_channel( | ||
| &mut self, | ||
| ) -> Result<StreamableHttpMessageReceiver, SessionError> { | ||
| self.tx_router.retain(|_, rw| !rw.tx.tx.is_closed()); |
There was a problem hiding this comment.
If I understand correctly, the code assumes that once a new stream is created we can discard streams that were closed (including those that experienced a disconnect). This can lead to a scenario that the stream is discarded before it was fully consumed. For example if while the client was waiting before doing the resume GET request, it performed another request. Example flow:
Time 0: Client issues req A (long running task that takes for example 10 seconds)
Time 9: Client receives disconnect and now will wait 3 second before performing GET resume request
Time 11: Client issues req B. Req A is discarded as `tx.is_closed()`
Time 12: Cilent sends GET request to resume stream from req A. But stream is `not found`.
Additionally, there is a memory risk here. If a client doesn't create a new stream the router is not cleaned out. May lead to unnecessary extra memory consumption when there are many clients which maintain a session but are not active.
Maybe there is need for a different approach, such as cleaning out HttpRequestWise after a timeout period after it completed.
There was a problem hiding this comment.
You're right, the retain on new channel creation was too aggressive. Replaced it with timeout-based eviction. Let me know if this version would work.
| // Resume existing request-wise channel | ||
| let channel = tokio::sync::mpsc::channel(self.session_config.channel_capacity); | ||
| let (tx, rx) = channel; | ||
| let was_completed = request_wise.tx.tx.is_closed(); |
There was a problem hiding this comment.
If I understand correctly,tx.is_closed() doesn't necessarily mean the request completed. A disconnect from the client will cause tx.is_closed(). If there was a disconnect and the request was not completed, the stream should be left active to send messages as the request continues processing. I think there is need to add an additional completed field to HttpRequestWise which will be set at unregister_resource.
There was a problem hiding this comment.
I agree. Added completed_at: Option<Instant> to HttpRequestWise, set explicitly in unregister_resource when the response is delivered.
750332d to
3e67b4f
Compare
3e67b4f to
559120d
Compare
|
Thanks for the thorough review, @glicht. I've pushed updates addressing all three points. Your long-running tool tests also pass locally and I also wanted to add them to this PR but they kept failing in CI due to a client reconnect issue on Ubuntu runners. The server-side behavior should be covered by the existing tests and manual validation. |
glicht
left a comment
There was a problem hiding this comment.
It looks very good.
I added a comment regarding the fall through to a standalone stream when the resume stream is not found.
| // EventSource to retry with the same Last-Event-ID in an | ||
| // infinite loop. Logging at warn so malformed IDs or | ||
| // unexpected failures remain visible. | ||
| tracing::warn!("Resume failed ({e}), creating standalone stream"); |
There was a problem hiding this comment.
Not sure I understand why we leave this connection open and fall through to the standalone stream. I think we should just return a success and close if an error causes an infinite retry loop. Maybe check fi it is an error of type ChannelClosed and then return success. For a different error probably want to return an error to the client. Returning the standalone stream leads to a case where client is asking for messages of a specific stream but gets messages from the standalone common channel (not sure this is what the client is expecting).
There was a problem hiding this comment.
Thanks for calling this out, @glicht. This was actually the root cause of our CI test failures. 🤦♂️ .text().await was hanging because the standalone stream never closed.
Fixes #791
Motivation and Context
Priming events on POST request-wise SSE streams used a hardcoded event ID of
"0", making it impossible for clients to identify which stream to resume after disconnection. The MCP spec requires event IDs to encode enough information to correlate aLast-Event-IDback to the originating stream. This moves priming event generation for request-wise streams into the session layer (LocalSessionManager::create_stream), where thehttp_request_idis available, so the priming event ID is now correctly formatted as0/<http_request_id>(e.g.0/0,0/1). GET standalone and initialize priming remain unchanged at"0"since they have no per-request stream identity.Additionally, the event cache for a request-wise channel was discarded as soon as the response was delivered, so a client that disconnected and tried to resume after the tool call finished would find nothing to replay. The cache is now retained after completion, allowing late resume requests to replay cached events. Completed entries are evicted based on a configurable
completed_cache_ttl(default 60s).How Has This Been Tested?
Added
test_request_wise_priming_includes_http_request_idwhich verifies consecutive tool calls get correct priming and response event IDs. All existing priming, SSE concurrent streams, stale session, and custom header tests continue to pass.Breaking Changes
SessionConfighas two new fields:sse_retry: Option<Duration>(defaults toSome(3s)) andcompleted_cache_ttl: Duration(defaults to60s). Since the struct is#[non_exhaustive], this is not a breaking change for downstream consumers.Types of changes
Checklist