ct/frontend: reserve producer_queue ticket before stage_write#30429
ct/frontend: reserve producer_queue ticket before stage_write#30429
Conversation
The per-producer ordering ticket was reserved inside the .then_wrapped continuation that fires after stage_write resolves. stage_write is asynchronous (prepare_write has multiple yield points: serialize_batches and the data-plane memory/request semaphores), so its futures can resolve in a different order than the replicate() calls that produced them. The ticket order is what rm_stm ultimately sees as the batch order; reordering there breaks idempotent-producer sequence checks. Specifically a later batch with first_seq>0 from a producer that is unknown to that partition's rm_stm takes the kafka-compatible skip_sequence_checks path (rm_stm.cc:1156), advancing last_known_sequence past the earlier batch. The earlier batch is then permanently rejected with OUT_OF_ORDER_SEQUENCE_NUMBER and the kafka client retries it indefinitely while the broker keeps advancing on later batches. This surfaced as a steady-state OOOSN floor at workload start in cloud-mode benchmark runs. Reserve the ticket synchronously before stage_write and capture it into the continuation. producer_queue::reserve is single-shard and synchronous, so call order determines ticket order unconditionally. Concurrent uploads are unchanged; the ticket only gates the rm_stm replicate step inside do_upload_and_replicate. Regression test mocks stage_write so the second call's future resolves before the first's and asserts both batches still succeed. Without the fix it reproduces the production OOOSN: rm_stm logs "Accepting batch from unknown producer that likely got evicted" for the reordered second batch and then rejects the first with cluster::errc::sequence_out_of_order.
|
/ci-repeat 1 |
Review summaryThe fix is correct, narrowly scoped, and well-explained. Reserving the Correctness checks I made
Behavior delta worth notingIn the old code, a synchronous failure in TestThe regression test correctly exercises the bug — forcing B2's I left a small note inline about Backport
LGTM modulo the small comment tweaks above. |
There was a problem hiding this comment.
Pull request overview
Fixes a cloud-topics produce-path ordering bug where per-producer ordering tickets were reserved only after data_plane_api::stage_write() completed, allowing out-of-order stage_write resolution to reorder tickets and break idempotent producer sequencing in rm_stm.
Changes:
- Reserve the
producer_queueordering ticket synchronously before initiatingstage_write, and move the ticket into the async continuation. - Add a regression test that forces
stage_writefutures to resolve out of order and asserts both idempotent batches still succeed.
Reviewed changes
Copilot reviewed 2 out of 2 changed files in this pull request and generated 1 comment.
| File | Description |
|---|---|
| src/v/cloud_topics/frontend/frontend.cc | Reserves per-producer ordering ticket before stage_write and threads it through the continuation to preserve replicate call order. |
| src/v/cloud_topics/frontend/tests/frontend_test.cc | Adds regression coverage for out-of-order stage_write resolution while maintaining per-producer replicate ordering. |
|
|
||
| EXPECT_CALL(*_data_plane, stage_write(_)) | ||
| .WillOnce(Return(ByMove(b1_stage.get_future()))) | ||
| .WillOnce(Return(ss::as_ready_future(stage_result{}))); | ||
|
|
||
| // Both batches use the same cluster_epoch so neither fence step is | ||
| // rejected on epoch grounds. The order of `execute_write` calls is the | ||
| // same in both buggy and fixed code (B2 first because its stage_write | ||
| // resolves first; B1 second). | ||
| EXPECT_CALL(*_data_plane, execute_write(_, _, _, _)) | ||
| .WillOnce(Return(make_extent_fut(model::offset(0), cluster_epoch(1)))) | ||
| .WillOnce(Return(make_extent_fut(model::offset(1), cluster_epoch(1)))); |
| @@ -942,6 +942,15 @@ raft::replicate_stages frontend::replicate( | |||
| } | |||
|
|
|||
| auto ctp_stm_api = make_ctp_stm_api(_partition); | |||
There was a problem hiding this comment.
Pre-existing nit (unblocking): frontend already holds _ctp_stm_api as a member (initialized in the ctor) that wraps the same underlying ctp_stm. While you're reshuffling this, you could replace auto ctp_stm_api = make_ctp_stm_api(_partition); with auto ctp_stm_api = _ctp_stm_api; (or auto ctp_stm = _ctp_stm_api; captured into the lambda). It avoids an extra lw_shared_ptr alloc per replicate and the surprising fact that make_ctp_stm_api throws (despite the ctor being noexcept).
Happy to defer this to a follow-up — the producer-queue ordering fix is the load-bearing change here.
| // `skip_sequence_checks` path (rm_stm.cc:1156) because the producer is | ||
| // unknown to that partition. `last_known_sequence` advances to B2's | ||
| // last_seq, and the subsequent B1 (first_seq=0) is rejected with | ||
| // OUT_OF_ORDER_SEQUENCE_NUMBER (cluster::errc::sequence_out_of_order). |
There was a problem hiding this comment.
Nit: line references like rm_stm.cc:1156 will rot fast — consider naming the function (e.g. "the skip_sequence_checks branch in rm_stm::check_seq") instead. Same for the duplicate reference at line 405.
| // waiting for B1's ticket release; that's still consistent with | ||
| // `request_enqueued` resolving (which only requires the | ||
| // `then_wrapped` callback to return). | ||
| stages2.request_enqueued.get(); |
There was a problem hiding this comment.
Question on test robustness: stages2.request_enqueued.get() only requires B2's then_wrapped callback to return — it does not guarantee B2's do_upload_and_replicate has reached ticket.redeem() (or even execute_write). For the buggy-code scenario you describe in the comment, the subsequent b1_stage.set_value(...) plus the two replicate_finished.get() calls are what eventually converge things, so the assertion still holds either way. But the comment block above ("In the fixed code B2's chain runs through stage_write but blocks at ticket.redeem()...") is slightly misleading since request_enqueued resolving doesn't actually witness that.
Either tighten the comment, or — for an extra-strong guarantee in the buggy case — wait until B2 has reached its rm_stm replicate before unblocking B1 (e.g. expose a hook through the mock or yield with ss::sleep/ss::later a few times). Not strictly required if the bug-case assertion is reliable on CI today.
Retry command for Build#84248please wait until all jobs are finished before running the slash command |
CI test resultstest results on build#84248
|
|
/ci-repeat 1 |
The per-producer ordering ticket was reserved inside the .then_wrapped continuation that fires after stage_write resolves. stage_write is asynchronous (prepare_write has multiple yield points: serialize_batches and the data-plane memory/request semaphores), so its futures can resolve in a different order than the replicate() calls that produced them. The ticket order is what rm_stm ultimately sees as the batch order; reordering there breaks idempotent-producer sequence checks.
Specifically a later batch with first_seq>0 from a producer that is unknown to that partition's rm_stm takes the kafka-compatible skip_sequence_checks path (rm_stm.cc:1156), advancing last_known_sequence past the earlier batch. The earlier batch is then permanently rejected with OUT_OF_ORDER_SEQUENCE_NUMBER and the kafka client retries it indefinitely while the broker keeps advancing on later batches. This surfaced as a steady-state OOOSN floor at workload start in cloud-mode benchmark runs.
Reserve the ticket synchronously before stage_write and capture it into the continuation. producer_queue::reserve is single-shard and synchronous, so call order determines ticket order unconditionally. Concurrent uploads are unchanged; the ticket only gates the rm_stm replicate step inside do_upload_and_replicate.
Regression test mocks stage_write so the second call's future resolves before the first's and asserts both batches still succeed. Without the fix it reproduces the production OOOSN: rm_stm logs "Accepting batch from unknown producer that likely got evicted" for the reordered second batch and then rejects the first with cluster::errc::sequence_out_of_order.
Backports Required
Release Notes
Bug Fixes