Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
116 changes: 75 additions & 41 deletions src/v/cloud_topics/frontend/frontend.cc
Original file line number Diff line number Diff line change
Expand Up @@ -493,28 +493,44 @@ frontend::l0_timequery(storage::timequery_config cfg) {
model::record_batch_type::raft_data,
model::record_batch_type::ctp_placeholder,
});
auto gen = std::move(reader).generator(model::no_timeout);
while (auto batch_opt = co_await gen()) {
auto& batch = batch_opt->get();
if (!std::ranges::contains(type_filter, batch.header().type)) {
continue;

struct coarse_consumer {
model::timestamp time;
ss::lw_shared_ptr<const storage::offset_translator_state> ot_state;
std::optional<coarse_grained_timequery_result> result;

ss::future<ss::stop_iteration> operator()(model::record_batch& batch) {
if (!std::ranges::contains(type_filter, batch.header().type)) {
co_return ss::stop_iteration::no;
}
if (batch.header().max_timestamp < time) {
co_return ss::stop_iteration::no;
}
// NOTE: we can't just return this offset verbatim, since we
// don't record the same timestamp deltas inside batches for
// placeholder batches (this would require unpacking batches
// during produce).
result = coarse_grained_timequery_result{
.time = time,
.start_offset = model::offset_cast(
ot_state->from_log_offset(batch.base_offset())),
.last_offset = model::offset_cast(
ot_state->from_log_offset(batch.last_offset())),
};
Comment on lines +506 to +519
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice fix. I was trying to think of other ways to keep the succinct procedural code, but ultimately it does end up adding a good amount of bloat while also introducing room for error. Was thinking something like the following, but I also don't love it:

auto [gen, close] = std::move(reader).generator(timeout);
std::optional<result_t> res;
std::exception_ptr eptr;
try {
    while (auto batch = co_await gen()) {
        if (done) { res = result; break; }
        ...
    }
} catch (...) { eptr = std::current_exception(); }
co_await close();
if (eptr) std::rethrow_exception(eptr);
if (res) co_return std::move(*res);
co_return std::nullopt;

It's unfortunate we don't have schedule_at_exit capabilities in seastar yet. I'm fine with this going in as is.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeh, the generator makes things look fantastic. I think we should try to come up with a way to make it work (without the try-catch) but it'll require rethinking how the batch reader works.

co_return ss::stop_iteration::yes;
}
if (batch.header().max_timestamp < cfg.time) {
continue;

std::optional<coarse_grained_timequery_result> end_of_stream() {
return result;
}
// NOTE: we can't just return this offset verbatim, since we don't
// record the same timestamp deltas inside batches for placeholder
// batches (this would require unpacking batches during produce).
auto ot_state = _partition->get_offset_translator_state();
co_return coarse_grained_timequery_result{
.time = cfg.time,
.start_offset = model::offset_cast(
ot_state->from_log_offset(batch.base_offset())),
.last_offset = model::offset_cast(
ot_state->from_log_offset(batch.last_offset())),
};
}
co_return std::nullopt;
};

co_return co_await std::move(reader).for_each_ref(
coarse_consumer{
.time = cfg.time,
.ot_state = _partition->get_offset_translator_state(),
},
model::no_timeout);
}
ss::future<std::optional<storage::timequery_result>>
frontend::refine_timequery_result(
Expand All @@ -532,30 +548,48 @@ frontend::refine_timequery_result(
/*as=*/abort_source,
/*client_addr=*/std::nullopt);
auto reader = co_await make_reader(reader_cfg);
auto generator = std::move(reader.reader).generator(model::no_timeout);
auto query_interval = model::bounded_offset_interval::checked(
kafka::offset_cast(input.start_offset),
kafka::offset_cast(input.last_offset));
while (auto batch_opt = co_await generator()) {
auto& batch = batch_opt->get();
auto batch_interval = model::bounded_offset_interval::checked(
batch.base_offset(), batch.last_offset());
if (!query_interval.overlaps(batch_interval)) {
if (batch_interval.min() > query_interval.max()) {
break;

struct timequery_consumer {
model::offset start_offset;
model::offset last_offset;
model::timestamp time;
model::bounded_offset_interval query_interval;
std::optional<storage::timequery_result> result;

ss::future<ss::stop_iteration> operator()(model::record_batch batch) {
auto batch_interval = model::bounded_offset_interval::checked(
batch.base_offset(), batch.last_offset());
if (!query_interval.overlaps(batch_interval)) {
if (batch_interval.min() > query_interval.max()) {
co_return ss::stop_iteration::yes;
}
co_return ss::stop_iteration::no;
}
continue;
if (time > batch.header().max_timestamp) {
co_return ss::stop_iteration::no;
}
result = co_await storage::batch_timequery(
std::move(batch), start_offset, time, last_offset);
co_return ss::stop_iteration::yes;
}
if (input.time > batch.header().max_timestamp) {
continue;

std::optional<storage::timequery_result> end_of_stream() {
return result;
}
co_return co_await storage::batch_timequery(
std::move(batch),
kafka::offset_cast(input.start_offset),
input.time,
kafka::offset_cast(input.last_offset));
}
co_return std::nullopt;
};

auto start_offset = kafka::offset_cast(input.start_offset);
auto last_offset = kafka::offset_cast(input.last_offset);
co_return co_await std::move(reader.reader)
.consume(
timequery_consumer{
.start_offset = start_offset,
.last_offset = last_offset,
.time = input.time,
.query_interval = model::bounded_offset_interval::checked(
start_offset, last_offset),
},
model::no_timeout);
}

namespace {
Expand Down
52 changes: 29 additions & 23 deletions src/v/cloud_topics/level_zero/frontend_reader/level_zero_reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -287,35 +287,41 @@ ss::future<chunked_circular_buffer<level_zero_log_reader_impl::local_log_batch>>
level_zero_log_reader_impl::fetch_metadata(
storage::local_log_reader_config cfg,
model::timeout_clock::time_point deadline) const {
chunked_circular_buffer<local_log_batch> ret;
auto reader = co_await _ctp->make_local_reader(cfg);
auto batches = std::move(reader).generator(deadline);

// Convert L0 meta batches to extent_meta structures.
while (auto maybe_batch = co_await batches()) {
auto batch = std::move(maybe_batch->get());
auto& header = batch.header();
if (header.type == model::record_batch_type::raft_data) {
local_log_batch local_batch{.header = header};
local_batch.data = std::move(batch).release_data();
ret.push_back(std::move(local_batch));
continue;
struct metadata_consumer {
chunked_circular_buffer<local_log_batch> ret;

ss::future<ss::stop_iteration> operator()(model::record_batch batch) {
auto header = batch.header();
if (header.type == model::record_batch_type::raft_data) {
local_log_batch local_batch{.header = header};
local_batch.data = std::move(batch).release_data();
ret.push_back(std::move(local_batch));
co_return ss::stop_iteration::no;
}
if (header.type != model::record_batch_type::ctp_placeholder) {
co_return ss::stop_iteration::no;
}
cloud_topics::extent_meta e{
.base_offset = model::offset_cast(batch.base_offset()),
.last_offset = model::offset_cast(batch.last_offset()),
};
auto placeholder = parse_placeholder_batch(std::move(batch));
e.id = placeholder.id;
e.first_byte_offset = placeholder.offset;
e.byte_range_size = placeholder.size_bytes;
ret.push_back(local_log_batch{.header = header, .data = e});
co_return ss::stop_iteration::no;
}
if (header.type != model::record_batch_type::ctp_placeholder) {
continue;

chunked_circular_buffer<local_log_batch> end_of_stream() {
return std::move(ret);
}
cloud_topics::extent_meta e{
.base_offset = model::offset_cast(batch.base_offset()),
.last_offset = model::offset_cast(batch.last_offset()),
};
auto placeholder = parse_placeholder_batch(std::move(batch));
e.id = placeholder.id;
e.first_byte_offset = placeholder.offset;
e.byte_range_size = placeholder.size_bytes;
ret.push_back(local_log_batch{.header = header, .data = e});
}
};

co_return ret;
co_return co_await std::move(reader).consume(metadata_consumer{}, deadline);
}

ss::future<std::expected<chunked_circular_buffer<model::record_batch>, errc>>
Expand Down
42 changes: 29 additions & 13 deletions src/v/cloud_topics/reconciler/reconciliation_consumer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,19 @@ ss::future<std::optional<consumer_metadata>> build_from_reader(
model::record_batch_reader reader,
l1::object_builder* builder,
reconciler_probe* probe) {
auto gen = std::move(reader).slice_generator(model::no_timeout);
auto build_duration = probe->measure_object_build_duration();
co_await builder->start_partition(tidp);
build_duration->stop();
auto read_duration = probe->measure_l0_read_duration();
consumer_metadata metadata;
while (auto batches = co_await gen()) {
read_duration->stop();
build_duration->start();
for (auto& batch : batches->get()) {

struct reconciler_consumer {
consumer_metadata metadata;
l1::object_builder* builder;
std::unique_ptr<reconciler_probe::hist_t::measurement> read_duration;
std::unique_ptr<reconciler_probe::hist_t::measurement> build_duration;

ss::future<ss::stop_iteration> operator()(model::record_batch batch) {
read_duration->stop();
build_duration->start();
if (metadata.base_offset == kafka::offset::min()) {
metadata.base_offset = model::offset_cast(batch.base_offset());
}
Expand All @@ -40,13 +43,26 @@ ss::future<std::optional<consumer_metadata>> build_from_reader(
}
++metadata.batch_count;
co_await builder->add_batch(std::move(batch));
build_duration->stop();
read_duration->start();
co_return ss::stop_iteration::no;
}

std::optional<consumer_metadata> end_of_stream() {
if (metadata.batch_count == 0) {
return std::nullopt;
}
return std::move(metadata);
}
build_duration->stop();
read_duration->start();
}
co_return metadata.batch_count == 0
? std::nullopt
: std::make_optional(std::move(metadata));
};

co_return co_await std::move(reader).consume(
reconciler_consumer{
.builder = builder,
.read_duration = probe->measure_l0_read_duration(),
.build_duration = std::move(build_duration),
},
model::no_timeout);
}

} // namespace cloud_topics::reconciler
Loading