Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 12 additions & 5 deletions src/v/cloud_topics/frontend/frontend.cc
Original file line number Diff line number Diff line change
Expand Up @@ -942,6 +942,15 @@ raft::replicate_stages frontend::replicate(
}

auto ctp_stm_api = make_ctp_stm_api(_partition);
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pre-existing nit (unblocking): frontend already holds _ctp_stm_api as a member (initialized in the ctor) that wraps the same underlying ctp_stm. While you're reshuffling this, you could replace auto ctp_stm_api = make_ctp_stm_api(_partition); with auto ctp_stm_api = _ctp_stm_api; (or auto ctp_stm = _ctp_stm_api; captured into the lambda). It avoids an extra lw_shared_ptr alloc per replicate and the surprising fact that make_ctp_stm_api throws (despite the ctor being noexcept).

Happy to defer this to a follow-up — the producer-queue ordering fix is the load-bearing change here.

// Reserve the per-producer ordering ticket synchronously, before
// stage_write. The ticket order is what `rm_stm` ultimately sees as the
// batch order, so it must match the order of replicate() calls — not
// the order in which the asynchronous stage_write futures resolve.
// Reserving inside the then_wrapped continuation lets stage_write's
// resolution order drive the ticket order, which can replicate kafka
// idempotent-producer sequences out of order under cold-start memory
// contention.
auto ticket = ctp_stm_api->producer_queue().reserve(batch_id.pid.get_id());
auto header = batch.header();
chunked_vector<model::record_batch> batch_vec, to_cache;
batch_vec.push_back(std::move(batch));
Expand All @@ -954,6 +963,8 @@ raft::replicate_stages frontend::replicate(
out.request_enqueued = _data_plane->stage_write(std::move(batch_vec))
.then_wrapped([this,
p = std::move(result),
ticket = std::move(ticket),
ctp_stm = std::move(ctp_stm_api),
cloned = std::move(to_cache),
batch_id,
header,
Expand All @@ -967,14 +978,10 @@ raft::replicate_stages frontend::replicate(
p.set_value(raft::errc::timeout);
return;
}
auto ctp_stm = make_ctp_stm_api(_partition);
auto ticket
= ctp_stm->producer_queue().reserve(
batch_id.pid.get_id());
do_upload_and_replicate(
_data_plane,
_partition,
ctp_stm,
std::move(ctp_stm),
std::move(ticket),
batch_id,
header,
Expand Down
122 changes: 122 additions & 0 deletions src/v/cloud_topics/frontend/tests/frontend_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -388,3 +388,125 @@ TEST_F(frontend_fixture, test_advance_epoch) {
EXPECT_EQ(final_info.estimated_inactive_epoch, cluster_epoch(9));
EXPECT_EQ(final_info, frontend.get_epoch_info());
}

// Regression test for the produce-path reorder bug.
//
// Two consecutive `frontend::replicate` calls for the same
// (producer-id, partition) must replicate through `rm_stm` in call order,
// even if the underlying `data_plane_api::stage_write` futures resolve out
// of order. The current implementation reserves the per-producer ordering
// ticket *inside* the `then_wrapped` continuation that fires after
// stage_write resolves, so reorder of stage_write resolution causes
// reorder of ticket reservation, defeating the ordering guarantee the
// `producer_queue` is supposed to provide.
//
// Concrete failure on buggy code: when B2 (first_seq>0) wins the race and
// reaches rm_stm first, it is accepted via the kafka-compatible
// `skip_sequence_checks` path (rm_stm.cc:1156) because the producer is
// unknown to that partition. `last_known_sequence` advances to B2's
// last_seq, and the subsequent B1 (first_seq=0) is rejected with
// OUT_OF_ORDER_SEQUENCE_NUMBER (cluster::errc::sequence_out_of_order).
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: line references like rm_stm.cc:1156 will rot fast — consider naming the function (e.g. "the skip_sequence_checks branch in rm_stm::check_seq") instead. Same for the duplicate reference at line 405.

TEST_F(
frontend_fixture,
test_replicate_preserves_per_producer_order_under_stage_write_reorder) {
const model::topic topic_name("reorder_regression");
model::ntp ntp(model::kafka_namespace, topic_name, 0);

cluster::topic_properties props;
props.storage_mode = model::redpanda_storage_mode::cloud;
props.shadow_indexing = model::shadow_indexing_mode::disabled;

add_topic({model::kafka_namespace, topic_name}, 1, props).get();
wait_for_leader(ntp).get();

auto partition = app.partition_manager.local().get(ntp);
ASSERT_TRUE(
partition->raft()->stm_manager()->get<cloud_topics::ctp_stm>()
!= nullptr);

cloud_topics::frontend frontend(std::move(partition), _data_plane.get());

// Force stage_write resolution order: the first call (for B1) is held
// unresolved on a promise; the second call (for B2) returns an
// already-resolved future. This simulates the cold-start race in
// production where `prepare_write`'s yield points let B2's stage_write
// resolve before B1's.
using stage_result = std::expected<staged_write, std::error_code>;
ss::promise<stage_result> b1_stage;

EXPECT_CALL(*_data_plane, stage_write(_))
.WillOnce(Return(ByMove(b1_stage.get_future())))
.WillOnce(Return(ss::as_ready_future(stage_result{})));

// Both batches use the same cluster_epoch so neither fence step is
// rejected on epoch grounds. The order of `execute_write` calls is the
// same in both buggy and fixed code (B2 first because its stage_write
// resolves first; B1 second).
EXPECT_CALL(*_data_plane, execute_write(_, _, _, _))
.WillOnce(Return(make_extent_fut(model::offset(0), cluster_epoch(1))))
.WillOnce(Return(make_extent_fut(model::offset(1), cluster_epoch(1))));
Comment on lines +436 to +447

EXPECT_CALL(*_data_plane, invalidate_epoch_below(_))
.Times(AnyNumber())
.WillRepeatedly([](cloud_topics::cluster_epoch) { return ss::now(); });

ON_CALL(*_data_plane, cache_put_ordered(_, _))
.WillByDefault([](const auto&, auto) {});

auto make_idempotent_bid = [](int32_t first_seq, int32_t record_count) {
return model::batch_identity{
.pid = model::producer_identity{42, 0},
.first_seq = first_seq,
.last_seq = first_seq + record_count - 1,
.record_count = record_count,
.max_timestamp = model::timestamp::min(),
.is_transactional = false,
};
};

auto batch1 = model::test::make_random_batch(model::offset{0}, false);
auto batch1_count = batch1.record_count();
auto batch2 = model::test::make_random_batch(model::offset{1}, false);
auto batch2_count = batch2.record_count();

// Issue B1 (stage_write blocked on promise) and B2 (stage_write ready).
// In buggy code, B2's `then_wrapped` continuation reserves its ticket
// first and reaches rm_stm first. In fixed code, B1's ticket is
// reserved synchronously before stage_write, so B1 reaches rm_stm
// first (after we unblock its stage_write) and B2 waits behind it on
// ticket.redeem().
auto stages1 = frontend.replicate(
make_idempotent_bid(0, batch1_count),
std::move(batch1),
raft::replicate_options(raft::consistency_level::quorum_ack));
auto stages2 = frontend.replicate(
make_idempotent_bid(batch1_count, batch2_count),
std::move(batch2),
raft::replicate_options(raft::consistency_level::quorum_ack));

// Drive the reactor: in the buggy code this gives B2's chain enough
// runway to reach rm_stm before we unblock B1. In the fixed code B2's
// chain runs through stage_write but blocks at `ticket.redeem()`
// waiting for B1's ticket release; that's still consistent with
// `request_enqueued` resolving (which only requires the
// `then_wrapped` callback to return).
stages2.request_enqueued.get();
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Question on test robustness: stages2.request_enqueued.get() only requires B2's then_wrapped callback to return — it does not guarantee B2's do_upload_and_replicate has reached ticket.redeem() (or even execute_write). For the buggy-code scenario you describe in the comment, the subsequent b1_stage.set_value(...) plus the two replicate_finished.get() calls are what eventually converge things, so the assertion still holds either way. But the comment block above ("In the fixed code B2's chain runs through stage_write but blocks at ticket.redeem()...") is slightly misleading since request_enqueued resolving doesn't actually witness that.

Either tighten the comment, or — for an extra-strong guarantee in the buggy case — wait until B2 has reached its rm_stm replicate before unblocking B1 (e.g. expose a hook through the mock or yield with ss::sleep/ss::later a few times). Not strictly required if the bug-case assertion is reliable on CI today.


// Now release B1's stage_write. From here both batches' chains can
// run to completion. In fixed code, B1 reaches rm_stm first
// (first_seq=0, normal accept), then B2 (first_seq matches expected).
// In buggy code, B2 already reached rm_stm via skip_sequence_checks
// and B1 is now rejected with OUT_OF_ORDER_SEQUENCE_NUMBER.
b1_stage.set_value(stage_result{});

auto r1 = std::move(stages1.replicate_finished).get();
auto r2 = std::move(stages2.replicate_finished).get();

ASSERT_TRUE(r1.has_value())
<< "B1 (first_seq=0) should be accepted by rm_stm; got error: "
<< r1.error().message()
<< " — this indicates stage_write reorder caused B2 to reach rm_stm "
"first, advancing last_known_sequence past B1.";
ASSERT_TRUE(r2.has_value())
<< "B2 should be accepted by rm_stm; got error: " << r2.error().message();
}
Loading