cloud_topics: convert generator callsites to consume() / for_each_ref()#30430
cloud_topics: convert generator callsites to consume() / for_each_ref()#30430ballard26 wants to merge 1 commit into
Conversation
Convert the remaining cloud_topics callsites away from the generator API; rvalue consume()/for_each_ref() ensure finally() runs on the underlying impl when iteration completes.
There was a problem hiding this comment.
Pull request overview
This PR finishes migrating cloud_topics callsites from the coroutine generator API to record_batch_reader’s rvalue-qualified consume() / for_each_ref() APIs, ensuring impl::finally() is reliably invoked when iteration ends (including early-exit and exception paths). This closes a potential lifetime/UAF hazard when a generator is not fully exhausted.
Changes:
- Refactor reconciliation object-building to use
record_batch_reader::consume()with a stateful consumer. - Refactor L0 metadata reading to use
consume()and return accumulated batches viaend_of_stream(). - Refactor frontend timequery paths to use
for_each_ref()/consume()and early-stop viass::stop_iteration.
Reviewed changes
Copilot reviewed 3 out of 3 changed files in this pull request and generated no comments.
| File | Description |
|---|---|
src/v/cloud_topics/reconciler/reconciliation_consumer.cc |
Replace slice_generator() loop with consume() to guarantee finally() and avoid generator lifetime pitfalls. |
src/v/cloud_topics/level_zero/frontend_reader/level_zero_reader.cc |
Replace generator() loop with consume() while accumulating metadata batches in a consumer. |
src/v/cloud_topics/frontend/frontend.cc |
Replace timequery generator loops with for_each_ref()/consume() consumers that can early-stop and still trigger finally(). |
CI test resultstest results on build#84265
|
FYI, unless this is specifcally an issue with readers that is being addressed? |
There isn't an issue with seastar's generators in general. Rather it's the generator for the record batch readers that's problematic. The readers require an async |
| if (batch.header().max_timestamp < time) { | ||
| co_return ss::stop_iteration::no; | ||
| } | ||
| // NOTE: we can't just return this offset verbatim, since we | ||
| // don't record the same timestamp deltas inside batches for | ||
| // placeholder batches (this would require unpacking batches | ||
| // during produce). | ||
| result = coarse_grained_timequery_result{ | ||
| .time = time, | ||
| .start_offset = model::offset_cast( | ||
| ot_state->from_log_offset(batch.base_offset())), | ||
| .last_offset = model::offset_cast( | ||
| ot_state->from_log_offset(batch.last_offset())), | ||
| }; |
There was a problem hiding this comment.
Nice fix. I was trying to think of other ways to keep the succinct procedural code, but ultimately it does end up adding a good amount of bloat while also introducing room for error. Was thinking something like the following, but I also don't love it:
auto [gen, close] = std::move(reader).generator(timeout);
std::optional<result_t> res;
std::exception_ptr eptr;
try {
while (auto batch = co_await gen()) {
if (done) { res = result; break; }
...
}
} catch (...) { eptr = std::current_exception(); }
co_await close();
if (eptr) std::rethrow_exception(eptr);
if (res) co_return std::move(*res);
co_return std::nullopt;
It's unfortunate we don't have schedule_at_exit capabilities in seastar yet. I'm fine with this going in as is.
Convert the remaining cloud_topics callsites away from the generator API; rvalue consume()/for_each_ref() ensure finally() runs on the underlying impl when iteration completes. This avoids a potential UAG that occurs from finally not being called for the generator when either the iterator is not exhausted or an exception is thrown.
Backports Required
Release Notes