diff --git a/src/v/cloud_topics/frontend/frontend.cc b/src/v/cloud_topics/frontend/frontend.cc index a9df4e6206a82..ad820a7cc7c39 100644 --- a/src/v/cloud_topics/frontend/frontend.cc +++ b/src/v/cloud_topics/frontend/frontend.cc @@ -942,6 +942,15 @@ raft::replicate_stages frontend::replicate( } auto ctp_stm_api = make_ctp_stm_api(_partition); + // Reserve the per-producer ordering ticket synchronously, before + // stage_write. The ticket order is what `rm_stm` ultimately sees as the + // batch order, so it must match the order of replicate() calls — not + // the order in which the asynchronous stage_write futures resolve. + // Reserving inside the then_wrapped continuation lets stage_write's + // resolution order drive the ticket order, which can replicate kafka + // idempotent-producer sequences out of order under cold-start memory + // contention. + auto ticket = ctp_stm_api->producer_queue().reserve(batch_id.pid.get_id()); auto header = batch.header(); chunked_vector batch_vec, to_cache; batch_vec.push_back(std::move(batch)); @@ -954,6 +963,8 @@ raft::replicate_stages frontend::replicate( out.request_enqueued = _data_plane->stage_write(std::move(batch_vec)) .then_wrapped([this, p = std::move(result), + ticket = std::move(ticket), + ctp_stm = std::move(ctp_stm_api), cloned = std::move(to_cache), batch_id, header, @@ -967,14 +978,10 @@ raft::replicate_stages frontend::replicate( p.set_value(raft::errc::timeout); return; } - auto ctp_stm = make_ctp_stm_api(_partition); - auto ticket - = ctp_stm->producer_queue().reserve( - batch_id.pid.get_id()); do_upload_and_replicate( _data_plane, _partition, - ctp_stm, + std::move(ctp_stm), std::move(ticket), batch_id, header, diff --git a/src/v/cloud_topics/frontend/tests/frontend_test.cc b/src/v/cloud_topics/frontend/tests/frontend_test.cc index 59c247d04fad5..77ae2dd73c4a2 100644 --- a/src/v/cloud_topics/frontend/tests/frontend_test.cc +++ b/src/v/cloud_topics/frontend/tests/frontend_test.cc @@ -388,3 +388,125 @@ TEST_F(frontend_fixture, test_advance_epoch) { EXPECT_EQ(final_info.estimated_inactive_epoch, cluster_epoch(9)); EXPECT_EQ(final_info, frontend.get_epoch_info()); } + +// Regression test for the produce-path reorder bug. +// +// Two consecutive `frontend::replicate` calls for the same +// (producer-id, partition) must replicate through `rm_stm` in call order, +// even if the underlying `data_plane_api::stage_write` futures resolve out +// of order. The current implementation reserves the per-producer ordering +// ticket *inside* the `then_wrapped` continuation that fires after +// stage_write resolves, so reorder of stage_write resolution causes +// reorder of ticket reservation, defeating the ordering guarantee the +// `producer_queue` is supposed to provide. +// +// Concrete failure on buggy code: when B2 (first_seq>0) wins the race and +// reaches rm_stm first, it is accepted via the kafka-compatible +// `skip_sequence_checks` path (rm_stm.cc:1156) because the producer is +// unknown to that partition. `last_known_sequence` advances to B2's +// last_seq, and the subsequent B1 (first_seq=0) is rejected with +// OUT_OF_ORDER_SEQUENCE_NUMBER (cluster::errc::sequence_out_of_order). +TEST_F( + frontend_fixture, + test_replicate_preserves_per_producer_order_under_stage_write_reorder) { + const model::topic topic_name("reorder_regression"); + model::ntp ntp(model::kafka_namespace, topic_name, 0); + + cluster::topic_properties props; + props.storage_mode = model::redpanda_storage_mode::cloud; + props.shadow_indexing = model::shadow_indexing_mode::disabled; + + add_topic({model::kafka_namespace, topic_name}, 1, props).get(); + wait_for_leader(ntp).get(); + + auto partition = app.partition_manager.local().get(ntp); + ASSERT_TRUE( + partition->raft()->stm_manager()->get() + != nullptr); + + cloud_topics::frontend frontend(std::move(partition), _data_plane.get()); + + // Force stage_write resolution order: the first call (for B1) is held + // unresolved on a promise; the second call (for B2) returns an + // already-resolved future. This simulates the cold-start race in + // production where `prepare_write`'s yield points let B2's stage_write + // resolve before B1's. + using stage_result = std::expected; + ss::promise b1_stage; + + EXPECT_CALL(*_data_plane, stage_write(_)) + .WillOnce(Return(ByMove(b1_stage.get_future()))) + .WillOnce(Return(ss::as_ready_future(stage_result{}))); + + // Both batches use the same cluster_epoch so neither fence step is + // rejected on epoch grounds. The order of `execute_write` calls is the + // same in both buggy and fixed code (B2 first because its stage_write + // resolves first; B1 second). + EXPECT_CALL(*_data_plane, execute_write(_, _, _, _)) + .WillOnce(Return(make_extent_fut(model::offset(0), cluster_epoch(1)))) + .WillOnce(Return(make_extent_fut(model::offset(1), cluster_epoch(1)))); + + EXPECT_CALL(*_data_plane, invalidate_epoch_below(_)) + .Times(AnyNumber()) + .WillRepeatedly([](cloud_topics::cluster_epoch) { return ss::now(); }); + + ON_CALL(*_data_plane, cache_put_ordered(_, _)) + .WillByDefault([](const auto&, auto) {}); + + auto make_idempotent_bid = [](int32_t first_seq, int32_t record_count) { + return model::batch_identity{ + .pid = model::producer_identity{42, 0}, + .first_seq = first_seq, + .last_seq = first_seq + record_count - 1, + .record_count = record_count, + .max_timestamp = model::timestamp::min(), + .is_transactional = false, + }; + }; + + auto batch1 = model::test::make_random_batch(model::offset{0}, false); + auto batch1_count = batch1.record_count(); + auto batch2 = model::test::make_random_batch(model::offset{1}, false); + auto batch2_count = batch2.record_count(); + + // Issue B1 (stage_write blocked on promise) and B2 (stage_write ready). + // In buggy code, B2's `then_wrapped` continuation reserves its ticket + // first and reaches rm_stm first. In fixed code, B1's ticket is + // reserved synchronously before stage_write, so B1 reaches rm_stm + // first (after we unblock its stage_write) and B2 waits behind it on + // ticket.redeem(). + auto stages1 = frontend.replicate( + make_idempotent_bid(0, batch1_count), + std::move(batch1), + raft::replicate_options(raft::consistency_level::quorum_ack)); + auto stages2 = frontend.replicate( + make_idempotent_bid(batch1_count, batch2_count), + std::move(batch2), + raft::replicate_options(raft::consistency_level::quorum_ack)); + + // Drive the reactor: in the buggy code this gives B2's chain enough + // runway to reach rm_stm before we unblock B1. In the fixed code B2's + // chain runs through stage_write but blocks at `ticket.redeem()` + // waiting for B1's ticket release; that's still consistent with + // `request_enqueued` resolving (which only requires the + // `then_wrapped` callback to return). + stages2.request_enqueued.get(); + + // Now release B1's stage_write. From here both batches' chains can + // run to completion. In fixed code, B1 reaches rm_stm first + // (first_seq=0, normal accept), then B2 (first_seq matches expected). + // In buggy code, B2 already reached rm_stm via skip_sequence_checks + // and B1 is now rejected with OUT_OF_ORDER_SEQUENCE_NUMBER. + b1_stage.set_value(stage_result{}); + + auto r1 = std::move(stages1.replicate_finished).get(); + auto r2 = std::move(stages2.replicate_finished).get(); + + ASSERT_TRUE(r1.has_value()) + << "B1 (first_seq=0) should be accepted by rm_stm; got error: " + << r1.error().message() + << " — this indicates stage_write reorder caused B2 to reach rm_stm " + "first, advancing last_known_sequence past B1."; + ASSERT_TRUE(r2.has_value()) + << "B2 should be accepted by rm_stm; got error: " << r2.error().message(); +}