-
Notifications
You must be signed in to change notification settings - Fork 739
ct/frontend: reserve producer_queue ticket before stage_write #30429
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: dev
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -388,3 +388,125 @@ TEST_F(frontend_fixture, test_advance_epoch) { | |
| EXPECT_EQ(final_info.estimated_inactive_epoch, cluster_epoch(9)); | ||
| EXPECT_EQ(final_info, frontend.get_epoch_info()); | ||
| } | ||
|
|
||
| // Regression test for the produce-path reorder bug. | ||
| // | ||
| // Two consecutive `frontend::replicate` calls for the same | ||
| // (producer-id, partition) must replicate through `rm_stm` in call order, | ||
| // even if the underlying `data_plane_api::stage_write` futures resolve out | ||
| // of order. The current implementation reserves the per-producer ordering | ||
| // ticket *inside* the `then_wrapped` continuation that fires after | ||
| // stage_write resolves, so reorder of stage_write resolution causes | ||
| // reorder of ticket reservation, defeating the ordering guarantee the | ||
| // `producer_queue` is supposed to provide. | ||
| // | ||
| // Concrete failure on buggy code: when B2 (first_seq>0) wins the race and | ||
| // reaches rm_stm first, it is accepted via the kafka-compatible | ||
| // `skip_sequence_checks` path (rm_stm.cc:1156) because the producer is | ||
| // unknown to that partition. `last_known_sequence` advances to B2's | ||
| // last_seq, and the subsequent B1 (first_seq=0) is rejected with | ||
| // OUT_OF_ORDER_SEQUENCE_NUMBER (cluster::errc::sequence_out_of_order). | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nit: line references like |
||
| TEST_F( | ||
| frontend_fixture, | ||
| test_replicate_preserves_per_producer_order_under_stage_write_reorder) { | ||
| const model::topic topic_name("reorder_regression"); | ||
| model::ntp ntp(model::kafka_namespace, topic_name, 0); | ||
|
|
||
| cluster::topic_properties props; | ||
| props.storage_mode = model::redpanda_storage_mode::cloud; | ||
| props.shadow_indexing = model::shadow_indexing_mode::disabled; | ||
|
|
||
| add_topic({model::kafka_namespace, topic_name}, 1, props).get(); | ||
| wait_for_leader(ntp).get(); | ||
|
|
||
| auto partition = app.partition_manager.local().get(ntp); | ||
| ASSERT_TRUE( | ||
| partition->raft()->stm_manager()->get<cloud_topics::ctp_stm>() | ||
| != nullptr); | ||
|
|
||
| cloud_topics::frontend frontend(std::move(partition), _data_plane.get()); | ||
|
|
||
| // Force stage_write resolution order: the first call (for B1) is held | ||
| // unresolved on a promise; the second call (for B2) returns an | ||
| // already-resolved future. This simulates the cold-start race in | ||
| // production where `prepare_write`'s yield points let B2's stage_write | ||
| // resolve before B1's. | ||
| using stage_result = std::expected<staged_write, std::error_code>; | ||
| ss::promise<stage_result> b1_stage; | ||
|
|
||
| EXPECT_CALL(*_data_plane, stage_write(_)) | ||
| .WillOnce(Return(ByMove(b1_stage.get_future()))) | ||
| .WillOnce(Return(ss::as_ready_future(stage_result{}))); | ||
|
|
||
| // Both batches use the same cluster_epoch so neither fence step is | ||
| // rejected on epoch grounds. The order of `execute_write` calls is the | ||
| // same in both buggy and fixed code (B2 first because its stage_write | ||
| // resolves first; B1 second). | ||
| EXPECT_CALL(*_data_plane, execute_write(_, _, _, _)) | ||
| .WillOnce(Return(make_extent_fut(model::offset(0), cluster_epoch(1)))) | ||
| .WillOnce(Return(make_extent_fut(model::offset(1), cluster_epoch(1)))); | ||
|
Comment on lines
+436
to
+447
|
||
|
|
||
| EXPECT_CALL(*_data_plane, invalidate_epoch_below(_)) | ||
| .Times(AnyNumber()) | ||
| .WillRepeatedly([](cloud_topics::cluster_epoch) { return ss::now(); }); | ||
|
|
||
| ON_CALL(*_data_plane, cache_put_ordered(_, _)) | ||
| .WillByDefault([](const auto&, auto) {}); | ||
|
|
||
| auto make_idempotent_bid = [](int32_t first_seq, int32_t record_count) { | ||
| return model::batch_identity{ | ||
| .pid = model::producer_identity{42, 0}, | ||
| .first_seq = first_seq, | ||
| .last_seq = first_seq + record_count - 1, | ||
| .record_count = record_count, | ||
| .max_timestamp = model::timestamp::min(), | ||
| .is_transactional = false, | ||
| }; | ||
| }; | ||
|
|
||
| auto batch1 = model::test::make_random_batch(model::offset{0}, false); | ||
| auto batch1_count = batch1.record_count(); | ||
| auto batch2 = model::test::make_random_batch(model::offset{1}, false); | ||
| auto batch2_count = batch2.record_count(); | ||
|
|
||
| // Issue B1 (stage_write blocked on promise) and B2 (stage_write ready). | ||
| // In buggy code, B2's `then_wrapped` continuation reserves its ticket | ||
| // first and reaches rm_stm first. In fixed code, B1's ticket is | ||
| // reserved synchronously before stage_write, so B1 reaches rm_stm | ||
| // first (after we unblock its stage_write) and B2 waits behind it on | ||
| // ticket.redeem(). | ||
| auto stages1 = frontend.replicate( | ||
| make_idempotent_bid(0, batch1_count), | ||
| std::move(batch1), | ||
| raft::replicate_options(raft::consistency_level::quorum_ack)); | ||
| auto stages2 = frontend.replicate( | ||
| make_idempotent_bid(batch1_count, batch2_count), | ||
| std::move(batch2), | ||
| raft::replicate_options(raft::consistency_level::quorum_ack)); | ||
|
|
||
| // Drive the reactor: in the buggy code this gives B2's chain enough | ||
| // runway to reach rm_stm before we unblock B1. In the fixed code B2's | ||
| // chain runs through stage_write but blocks at `ticket.redeem()` | ||
| // waiting for B1's ticket release; that's still consistent with | ||
| // `request_enqueued` resolving (which only requires the | ||
| // `then_wrapped` callback to return). | ||
| stages2.request_enqueued.get(); | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Question on test robustness: Either tighten the comment, or — for an extra-strong guarantee in the buggy case — wait until B2 has reached its rm_stm replicate before unblocking B1 (e.g. expose a hook through the mock or yield with |
||
|
|
||
| // Now release B1's stage_write. From here both batches' chains can | ||
| // run to completion. In fixed code, B1 reaches rm_stm first | ||
| // (first_seq=0, normal accept), then B2 (first_seq matches expected). | ||
| // In buggy code, B2 already reached rm_stm via skip_sequence_checks | ||
| // and B1 is now rejected with OUT_OF_ORDER_SEQUENCE_NUMBER. | ||
| b1_stage.set_value(stage_result{}); | ||
|
|
||
| auto r1 = std::move(stages1.replicate_finished).get(); | ||
| auto r2 = std::move(stages2.replicate_finished).get(); | ||
|
|
||
| ASSERT_TRUE(r1.has_value()) | ||
| << "B1 (first_seq=0) should be accepted by rm_stm; got error: " | ||
| << r1.error().message() | ||
| << " — this indicates stage_write reorder caused B2 to reach rm_stm " | ||
| "first, advancing last_known_sequence past B1."; | ||
| ASSERT_TRUE(r2.has_value()) | ||
| << "B2 should be accepted by rm_stm; got error: " << r2.error().message(); | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pre-existing nit (unblocking):
frontendalready holds_ctp_stm_apias a member (initialized in the ctor) that wraps the same underlyingctp_stm. While you're reshuffling this, you could replaceauto ctp_stm_api = make_ctp_stm_api(_partition);withauto ctp_stm_api = _ctp_stm_api;(orauto ctp_stm = _ctp_stm_api;captured into the lambda). It avoids an extra lw_shared_ptr alloc per replicate and the surprising fact thatmake_ctp_stm_apithrows (despite the ctor beingnoexcept).Happy to defer this to a follow-up — the producer-queue ordering fix is the load-bearing change here.