diff --git a/src/v/cloud_topics/BUILD b/src/v/cloud_topics/BUILD index 38092a3c2d304..17ee41fbfd877 100644 --- a/src/v/cloud_topics/BUILD +++ b/src/v/cloud_topics/BUILD @@ -94,11 +94,14 @@ redpanda_cc_library( ":data_plane_api", "//src/v/base", "//src/v/cloud_topics:state_accessors", + "//src/v/cloud_topics/level_one/common:file_io", "//src/v/cloud_topics/level_one/domain:domain_supervisor", "//src/v/cloud_topics/level_one/metastore:frontend", + "//src/v/cloud_topics/level_one/metastore:replicated_metastore", "//src/v/cloud_topics/level_zero/common:extent_meta", "//src/v/cloud_topics/reconciler", "//src/v/cluster", + "//src/v/config", "//src/v/container:chunked_vector", "//src/v/model", "//src/v/ssx:sharded_service_container", diff --git a/src/v/cloud_topics/app.cc b/src/v/cloud_topics/app.cc index 486b5d132497c..423f9c16451bf 100644 --- a/src/v/cloud_topics/app.cc +++ b/src/v/cloud_topics/app.cc @@ -13,8 +13,10 @@ #include "cloud_topics/cluster_services.h" #include "cloud_topics/data_plane_api.h" #include "cloud_topics/data_plane_impl.h" +#include "cloud_topics/level_one/metastore/replicated_metastore.h" #include "cluster/cluster_epoch_service.h" #include "cluster/controller.h" +#include "config/node_config.h" #include "ssx/sharded_service_container.h" #include @@ -49,8 +51,17 @@ ss::future<> app::construct( co_await construct_service(state, data_plane.get()); + // Ensure the L1 staging directory exists before creating the l1_io service + co_await ss::recursive_touch_directory( + config::node().l1_staging_path().string()); + co_await construct_service( - reconciler, partition_mgr, remote, data_plane.get(), bucket); + l1_io, + config::node().l1_staging_path(), + ss::sharded_parameter([&remote] { return &remote->local(); }), + bucket, + ss::sharded_parameter([&cloud_cache] { return &cloud_cache->local(); })); + co_await construct_service(domain_supervisor, controller); co_await construct_service( l1_metastore_fe, @@ -60,6 +71,18 @@ ss::future<> app::construct( shard_table, connection_cache, &domain_supervisor); + + co_await construct_service( + reconciler, + partition_mgr, + data_plane.get(), + ss::sharded_parameter([this] { return &l1_io.local(); }), + ss::sharded_parameter([this] { + return std::make_unique( + l1_metastore_fe.local()); + }), + ss::sharded_parameter( + [&metadata_cache] { return &metadata_cache->local(); })); } ss::future<> app::start() { diff --git a/src/v/cloud_topics/app.h b/src/v/cloud_topics/app.h index 7cfceef19f118..9096fc9b62fd1 100644 --- a/src/v/cloud_topics/app.h +++ b/src/v/cloud_topics/app.h @@ -10,6 +10,7 @@ #pragma once +#include "cloud_topics/level_one/common/file_io.h" #include "cloud_topics/level_one/domain/domain_supervisor.h" #include "cloud_topics/level_one/metastore/frontend.h" #include "cloud_topics/reconciler/reconciler.h" @@ -71,6 +72,7 @@ class app : public ssx::sharded_service_container { ss::sstring _logger_name; std::unique_ptr data_plane; ss::sharded state; + ss::sharded l1_io; ss::sharded reconciler; ss::sharded domain_supervisor; ss::sharded l1_metastore_fe; diff --git a/src/v/cloud_topics/level_one/common/BUILD b/src/v/cloud_topics/level_one/common/BUILD index 71a1ff0e641db..b9257ee433a74 100644 --- a/src/v/cloud_topics/level_one/common/BUILD +++ b/src/v/cloud_topics/level_one/common/BUILD @@ -4,13 +4,21 @@ package(default_visibility = ["//src/v/cloud_topics/level_one:__subpackages__"]) redpanda_cc_library( name = "object_id", + srcs = [ + "object_id.cc", + ], hdrs = [ "object_id.h", ], + visibility = [ + "//src/v/cloud_topics/level_one:__subpackages__", + "//src/v/cloud_topics/reconciler:__subpackages__", + ], deps = [ "//src/v/base", "//src/v/utils:named_type", "//src/v/utils:uuid", + "@fmt", ], ) @@ -48,6 +56,10 @@ redpanda_cc_library( "//src/v/serde:vector", "//src/v/storage:record_batch_builder", ], + visibility = [ + "//src/v/cloud_topics/level_one:__subpackages__", + "//src/v/cloud_topics/reconciler:__subpackages__", + ], deps = [ "//src/v/base", "//src/v/container:chunked_vector", @@ -63,6 +75,10 @@ redpanda_cc_library( name = "abstract_io", srcs = ["abstract_io.cc"], hdrs = ["abstract_io.h"], + visibility = [ + "//src/v/cloud_topics/level_one:__subpackages__", + "//src/v/cloud_topics/reconciler:__subpackages__", + ], deps = [ ":object_id", "//src/v/container:chunked_vector", @@ -75,6 +91,11 @@ redpanda_cc_library( name = "file_io", srcs = ["file_io.cc"], hdrs = ["file_io.h"], + visibility = [ + "//src/v/cloud_topics:__pkg__", + "//src/v/cloud_topics/level_one:__subpackages__", + "//src/v/cloud_topics/reconciler:__pkg__", + ], deps = [ ":abstract_io", ":object_id", diff --git a/src/v/cloud_topics/level_one/metastore/metastore.h b/src/v/cloud_topics/level_one/metastore/metastore.h index 16d464ae6422a..fd045cbd5597e 100644 --- a/src/v/cloud_topics/level_one/metastore/metastore.h +++ b/src/v/cloud_topics/level_one/metastore/metastore.h @@ -49,6 +49,8 @@ namespace cloud_topics::l1 { // side effects. As such, callers can think of this interface as thread safe. class metastore { public: + virtual ~metastore() = default; + enum class errc { missing_ntp, invalid_request, diff --git a/src/v/cloud_topics/reconciler/BUILD b/src/v/cloud_topics/reconciler/BUILD index 447dc831a0ba8..028f47084c9fa 100644 --- a/src/v/cloud_topics/reconciler/BUILD +++ b/src/v/cloud_topics/reconciler/BUILD @@ -3,16 +3,16 @@ load("//bazel:build.bzl", "redpanda_cc_library") package(default_visibility = [":__subpackages__"]) redpanda_cc_library( - name = "range_batch_consumer", + name = "reconciliation_consumer", srcs = [ - "range_batch_consumer.cc", + "reconciliation_consumer.cc", ], hdrs = [ - "range_batch_consumer.h", + "reconciliation_consumer.h", ], deps = [ "//src/v/base", - "//src/v/bytes:iobuf", + "//src/v/cloud_topics/level_one/common:object", "//src/v/model", "@abseil-cpp//absl/container:btree", "@seastar", @@ -33,14 +33,17 @@ redpanda_cc_library( ], visibility = ["//visibility:public"], deps = [ - ":range_batch_consumer", + ":reconciliation_consumer", "//src/v/base", - "//src/v/cloud_io:remote", - "//src/v/cloud_storage", "//src/v/cloud_storage_clients", "//src/v/cloud_topics:object_utils", "//src/v/cloud_topics:types", + "//src/v/cloud_topics/level_one/common:file_io", + "//src/v/cloud_topics/level_one/common:object", + "//src/v/cloud_topics/level_one/common:object_id", "//src/v/cloud_topics/level_one/common:object_utils", + "//src/v/cloud_topics/level_one/metastore", + "//src/v/cloud_topics/level_one/metastore:replicated_metastore", "//src/v/cloud_topics/level_zero/stm:ctp_stm_api", "//src/v/cluster", "//src/v/cluster:notification", @@ -49,6 +52,7 @@ redpanda_cc_library( "//src/v/model", "//src/v/random:generators", "@abseil-cpp//absl/container:btree", + "@abseil-cpp//absl/container:flat_hash_map", "@abseil-cpp//absl/container:node_hash_map", "@seastar", ], diff --git a/src/v/cloud_topics/reconciler/range_batch_consumer.cc b/src/v/cloud_topics/reconciler/range_batch_consumer.cc deleted file mode 100644 index 08f07e7e7e03a..0000000000000 --- a/src/v/cloud_topics/reconciler/range_batch_consumer.cc +++ /dev/null @@ -1,66 +0,0 @@ -/* - * Copyright 2025 Redpanda Data, Inc. - * - * Licensed as a Redpanda Enterprise file under the Redpanda Community - * License (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * https://github.com/redpanda-data/redpanda/blob/master/licenses/rcl.md - */ - -#include "cloud_topics/reconciler/range_batch_consumer.h" - -#include "model/timestamp.h" - -namespace cloud_topics::reconciler { - -ss::future -range_batch_consumer::operator()(model::record_batch batch) { - if (!_base_offset.has_value()) { - _base_offset = model::offset_cast(batch.base_offset()); - } - // NOTE: we only have data batches here so it's safe - // to use timestamps without checking the batch type - if (_range.info.base_timestamp == model::timestamp{}) { - _range.info.base_timestamp = batch.header().first_timestamp; - } else { - _range.info.base_timestamp = std::min( - batch.header().first_timestamp, _range.info.base_timestamp); - } - _range.info.last_timestamp = std::max( - batch.header().max_timestamp, _range.info.last_timestamp); - _range.info.last_offset = model::offset_cast(batch.last_offset()); - - bool add_term = false; - if (_range.info.terms.empty()) { - // Always add term of the first batch in the sequence. We don't really - // know if it's the first batch in the term so some filtering will be - // needed later. - add_term = true; - } else { - auto term = _range.info.terms.rbegin()->first; - if (term != batch.term()) { - add_term = true; - } - } - if (add_term) { - _range.info.terms.insert( - std::make_pair( - batch.term(), model::offset_cast(batch.base_offset()))); - } - - auto data = serde::to_iobuf(std::move(batch)); - _range.data.append(std::move(data)); - - co_return ss::stop_iteration::no; -} - -std::optional range_batch_consumer::end_of_stream() { - if (_base_offset.has_value()) { - _range.info.base_offset = _base_offset.value(); - return std::move(_range); - } - return std::nullopt; -} - -} // namespace cloud_topics::reconciler diff --git a/src/v/cloud_topics/reconciler/reconciler.cc b/src/v/cloud_topics/reconciler/reconciler.cc index 345fbfbe1821e..f861d09b34fb0 100644 --- a/src/v/cloud_topics/reconciler/reconciler.cc +++ b/src/v/cloud_topics/reconciler/reconciler.cc @@ -11,13 +11,9 @@ #include "cloud_topics/reconciler/reconciler.h" #include "base/vlog.h" -#include "cloud_storage/configuration.h" #include "cloud_topics/data_plane_api.h" #include "cloud_topics/frontend/frontend.h" -#include "cloud_topics/level_one/common/object_utils.h" -#include "cloud_topics/level_zero/stm/ctp_stm_api.h" -#include "cloud_topics/object_utils.h" -#include "cloud_topics/types.h" +#include "cloud_topics/reconciler/reconciliation_consumer.h" #include "cluster/partition.h" #include "kafka/utils/txn_reader.h" #include "model/namespace.h" @@ -26,13 +22,11 @@ #include #include +#include + namespace { -ss::logger lg("reconciler"); +ss::logger rlog("reconciler"); -/* - * Temporary hack for identifying cloud partitions (topic has "_ct" suffix). - * This can be removed once we teach Redpanda about this new type of topic. - */ bool is_cloud_partition( const ss::lw_shared_ptr& partition) { return partition->get_ntp_config().cloud_topic_enabled(); @@ -64,28 +58,17 @@ namespace cloud_topics::reconciler { reconciler::reconciler( ss::sharded* pm, - ss::sharded* cloud_io, data_plane_api* data_plane, - std::optional bucket) + l1::file_io* l1_io, + std::unique_ptr metastore, + cluster::metadata_cache* metadata_cache) : _partition_manager(pm) - , _cloud_io(cloud_io) - , _data_plane(data_plane) { - if (bucket.has_value()) { - _bucket = std::move(bucket.value()); - } else { - _bucket = cloud_storage_clients::bucket_name( - cloud_storage::configuration::get_bucket_config().value().value()); - } -} + , _data_plane(data_plane) + , _l1_io(l1_io) + , _metastore(std::move(metastore)) + , _metadata_cache(metadata_cache) {} ss::future<> reconciler::start() { - _manage_notify_handle - = _partition_manager->local().register_manage_notification( - model::kafka_namespace, - [this](ss::lw_shared_ptr p) { - attach_partition(std::move(p)); - }); - _unmanage_notify_handle = _partition_manager->local().register_unmanage_notification( model::kafka_namespace, [this](model::topic_partition_view tp_p) { @@ -93,6 +76,13 @@ ss::future<> reconciler::start() { model::ntp(model::kafka_namespace, tp_p.topic, tp_p.partition)); }); + _manage_notify_handle + = _partition_manager->local().register_manage_notification( + model::kafka_namespace, + [this](ss::lw_shared_ptr p) { + attach_partition(std::move(p)); + }); + ssx::spawn_with_gate(_gate, [this] { return reconciliation_loop(); }); co_return; @@ -117,7 +107,7 @@ void reconciler::attach_partition( return; } const auto& ntp = partition->ntp(); - vlog(lg.info, "Reconciler is attaching cloud partition {}", ntp); + vlog(rlog.debug, "Attaching partition {}", ntp); auto attached = ss::make_lw_shared(partition); auto res = _partitions.try_emplace(ntp, std::move(attached)); vassert(res.second, "Double registration of ntp {}", ntp); @@ -125,7 +115,7 @@ void reconciler::attach_partition( void reconciler::detach_partition(const model::ntp& ntp) { if (auto it = _partitions.find(ntp); it != _partitions.end()) { - vlog(lg.info, "Reconciler is detaching partition {}", ntp); + vlog(rlog.debug, "Detaching partition {}", ntp); /* * This upcall doesn't synchronize with the rest of the reconciler, * which means that once a reference to an attached partition is held, @@ -136,21 +126,12 @@ void reconciler::detach_partition(const model::ntp& ntp) { } } -void reconciler::object::add(range range, const attached_partition& partition) { - vassert(!range.data.empty(), "cannot add an empty range to object"); - const auto physical_offset_start = data.size_bytes(); - data.append(std::move(range.data)); - const auto physical_offset_end = data.size_bytes(); - - ranges.emplace_back( - partition, physical_offset_start, physical_offset_end, range.info); -} - ss::future<> reconciler::reconciliation_loop() { /* - * polling is not particularly efficient, and in practice, we'll probably + * Polling is not particularly efficient, and in practice, we'll probably * want to look into receiving upcalls from partitions announcing that new * data is available. + * TODO: Investigate performance of polling and alternatives to polling. */ constexpr std::chrono::seconds poll_frequency(10); @@ -159,16 +140,21 @@ ss::future<> reconciler::reconciliation_loop() { co_await _control_sem.wait( poll_frequency, std::max(_control_sem.current(), size_t(1))); } catch (const ss::semaphore_timed_out&) { - // time to do some work + // Time to do some work. } + vlog( + rlog.debug, + "Reconciliation loop tick with {} attached partitions", + _partitions.size()); + try { co_await reconcile(); } catch (...) { const auto is_shutdown = ssx::is_shutdown_exception( std::current_exception()); vlogl( - lg, + rlog, is_shutdown ? ss::log_level::debug : ss::log_level::info, "Recoverable error during reconciliation: {}", std::current_exception()); @@ -177,25 +163,47 @@ ss::future<> reconciler::reconciliation_loop() { } ss::future<> reconciler::reconcile() { - auto object = co_await build_object(); - if (!object.has_value()) { - co_return; - } - auto path = l1::object_path_factory::level_one_path(l1::create_object_id()); - auto result = co_await upload_object(path, std::move(object->data)); - if (result != cloud_io::upload_result::success) { - vlog(lg.info, "Failed to upload L1 object: {}", result); + auto metadata_builder = _metastore->object_builder(); + + auto objects = co_await build_objects(metadata_builder.get()); + if (objects.empty()) { + vlog(rlog.debug, "No objects to upload"); co_return; } - // commit for each partition represented in the uploaded object - for (const auto& range : object->ranges) { - co_await commit_object(path, range); + for (auto& object : objects) { + vlog( + rlog.debug, + "About to upload L1 object {} built from {} partitions", + object.object_id, + object.partitions.size()); + + // Upload. + auto upload_result = co_await _l1_io->put_object( + object.object_id, object.staging_file.get(), &_as); + co_await object.staging_file->remove(); + if (!upload_result.has_value()) { + vlog( + rlog.warn, + "Failed to upload L1 object: {}", + static_cast(upload_result.error())); + continue; + } + vlog(rlog.debug, "Uploaded L1 object {}", object.object_id); } + + // Commit all objects to metastore. + l1::metastore::term_offset_map_t term_offset_map; + co_await populate_metastore_builder( + objects, metadata_builder.get(), term_offset_map); + co_await commit_to_metastore( + std::move(metadata_builder), term_offset_map, objects); } -ss::future> reconciler::build_object() { - // light-weight copy for stable iteration +ss::future> reconciler::build_objects( + l1::metastore::object_metadata_builder* metadata_builder) { + // Copy the leader partition information in case of + // mid-reconciliation unregistration. std::vector partitions; for (const auto& p : _partitions) { if (p.second->partition->is_leader()) { @@ -203,69 +211,72 @@ ss::future> reconciler::build_object() { } } - // avoid starving partitions + if (partitions.empty()) { + vlog(rlog.debug, "No leader partitions to reconcile"); + co_return chunked_vector{}; + } + + // Shuffle to avoid starving partitions. + // TODO: Investigate how to divide work between partitions with + // different throughput. std::shuffle( partitions.begin(), partitions.end(), random_generators::internal::gen); - object object; - auto size_budget = max_object_size; - for (const auto& partition : partitions) { - auto reader = co_await make_reader(partition, size_budget); - auto range = co_await std::move(reader).consume( - range_batch_consumer{}, model::no_timeout); - if (range.has_value()) { - object.add(std::move(*range), partition); - size_budget -= std::min(object.data.size_bytes(), size_budget); - } + auto contexts_result = co_await setup_contexts( + metadata_builder, partitions); + if (!contexts_result.has_value()) { + vlog( + rlog.error, "Failed to setup contexts: {}", contexts_result.error()); + co_return chunked_vector{}; } - if (object.data.empty()) { - co_return std::nullopt; - } + auto contexts = std::move(contexts_result.value()); - co_return object; -} + for (const auto& partition : partitions) { + auto tidp_it = contexts.ntp_to_tidp.find(partition->partition->ntp()); + vassert( + tidp_it != contexts.ntp_to_tidp.end(), + "No topic_id_partition found for {}", + partition->partition->ntp()); + + auto [_, tidp] = *tidp_it; + auto object_id_it = contexts.tidp_to_object.find(tidp); + vassert( + object_id_it != contexts.tidp_to_object.end(), + "No object context found for {}", + tidp); + + auto [_unused, object_id] = *object_id_it; + auto& ctx = contexts.objects[object_id]; + auto metadata = co_await add_partition_to_object( + ctx.builder.get(), partition, ctx.remaining_budget); + + if (!metadata.has_value()) { + continue; + } -ss::future reconciler::upload_object( - cloud_storage_clients::object_key key, iobuf payload) { - retry_chain_node rtc( - _as, - ss::lowres_clock::now() + std::chrono::seconds(20), - std::chrono::seconds(1)); - - co_return co_await _cloud_io->local().upload_object({ - .transfer_details = { - .bucket = _bucket, - .key = key, - .parent_rtc = rtc, - }, - .display_str = "l1_object", - .payload = std::move(payload), - }); -} + ctx.result.partitions.emplace_back( + partition, std::move(metadata.value())); -ss::future<> reconciler::commit_object( - const cloud_storage_clients::object_key& key, - const object_range_info& range) { - /* - * TODO register the L1 object with L1 metastore. - */ - const auto& part = range.partition->partition; + auto current_size = co_await ctx.result.staging_file->size(); + ctx.remaining_budget = max_object_size - current_size; + } - range.partition->lro = range.info.last_offset + kafka::offset(1); + chunked_vector objects; + for (auto& [object_id, ctx] : contexts.objects) { + if (ctx.result.partitions.empty()) { + co_await ctx.builder->close(); + co_await ctx.result.staging_file->remove(); + continue; + } - vlog( - lg.info, - "Committed overlay to {} for {} phy {}~{} log {}~{}. New LRO {}", - key, - part->ntp(), - range.physical_offset_start, - range.physical_offset_end, - range.info.base_offset, - range.info.last_offset, - range.partition->lro); + ctx.result.object_info = co_await ctx.builder->finish(); + co_await ctx.builder->close(); - co_return; + objects.emplace_back(std::move(ctx.result)); + } + + co_return objects; } ss::future @@ -276,7 +287,7 @@ reconciler::make_reader(const attached_partition& partition, size_t max_bytes) { auto effective_start = co_await fe.sync_effective_start(5s); if (!effective_start.has_value()) { vlog( - lg.info, + rlog.warn, "Error querying partition start offset ({}): {}", cluster_partition->ntp(), effective_start.error()); @@ -289,7 +300,7 @@ reconciler::make_reader(const attached_partition& partition, size_t max_bytes) { auto maybe_lso = fe.last_stable_offset(); if (!maybe_lso.has_value()) { vlog( - lg.info, + rlog.warn, "Error querying partition LSO ({}): {}", cluster_partition->ntp(), maybe_lso.error()); @@ -322,4 +333,274 @@ reconciler::make_reader(const attached_partition& partition, size_t max_bytes) { std::move(tracker), std::move(reader.reader)); } +ss::future> +reconciler::add_partition_to_object( + l1::object_builder* builder, + const attached_partition& partition, + size_t size_budget) { + vlog( + rlog.debug, + "Processing partition {} with LRO {}", + partition->partition->ntp(), + partition->lro); + + auto reader = co_await make_reader(partition, size_budget); + reconciliation_consumer consumer(builder, partition->partition->ntp()); + auto metadata = co_await std::move(reader).consume( + std::move(consumer), model::no_timeout); + + if (!metadata.has_value()) { + vlog( + rlog.debug, + "No batches found for partition {}", + partition->partition->ntp()); + co_return std::nullopt; + } + + vlog( + rlog.debug, + "Adding partition {} to L1 object with offsets {}~{}", + partition->partition->ntp(), + metadata->base_offset, + metadata->last_offset); + + co_return metadata.value(); +} + +ss::future<> reconciler::populate_metastore_builder( + const chunked_vector& objects, + l1::metastore::object_metadata_builder* metadata_builder, + l1::metastore::term_offset_map_t& term_offset_map) { + for (const auto& object : objects) { + // Add all partitions for this object + for (const auto& partition_info : object.partitions) { + const auto& partition = partition_info.partition; + const auto& metadata = partition_info.metadata; + + // Get topic_id_partition for this partition + auto tidp_result = get_topic_id_partition( + partition->partition->ntp()); + if (!tidp_result.has_value()) { + vlog( + rlog.error, + "Failed to get topic_id_partition for {}: {}", + partition->partition->ntp(), + tidp_result.error()); + continue; + } + + auto tidp = tidp_result.value(); + + // Find the partition info in the footer for this NTP + const auto& footer = object.object_info.index; + auto [begin, end] = footer.partitions.equal_range( + partition->partition->ntp()); + + // Find the matching partition entry based on offset range + ssize_t pos = -1; + size_t size = 0; + auto matching_partition = std::ranges::find_if( + std::ranges::subrange(begin, end), + [&metadata](const auto& entry) { + const auto& footer_partition = entry.second; + return footer_partition.first_offset == metadata.base_offset + && footer_partition.last_offset + == metadata.last_offset; + }); + + vassert( + matching_partition != end, + "Failed to find partition {} with offsets [{}, {}] in footer", + partition->partition->ntp(), + metadata.base_offset, + metadata.last_offset); + + pos = matching_partition->second.file_position; + size = matching_partition->second.length; + + // Add partition metadata to metastore builder + l1::metastore::object_metadata::ntp_metadata ntp_meta; + ntp_meta.tidp = tidp; + ntp_meta.base_offset = metadata.base_offset; + ntp_meta.last_offset = metadata.last_offset; + ntp_meta.max_timestamp = metadata.last_timestamp; + ntp_meta.pos = pos; + ntp_meta.size = size; + + auto add_result = metadata_builder->add(object.object_id, ntp_meta); + if (!add_result.has_value()) { + vlog( + rlog.error, + "Failed to add partition {} to metastore builder: {}", + tidp, + add_result.error()); + continue; + } + + // Build term offset map for this partition + chunked_vector term_offsets; + for (const auto& [term, offset] : metadata.terms) { + term_offsets.emplace_back( + l1::metastore::term_offset{ + .term = term, .first_offset = offset}); + } + + if (!term_offsets.empty()) { + term_offset_map[tidp] = std::move(term_offsets); + } + } + + // Finish the object once after all partitions are added + auto finish_result = metadata_builder->finish( + object.object_id, object.object_info.footer_offset); + if (!finish_result.has_value()) { + vlog( + rlog.error, + "Failed to finish object {} in metastore builder: {}", + object.object_id, + finish_result.error()); + } + } + + co_return; +} + +ss::future<> reconciler::commit_to_metastore( + std::unique_ptr metadata_builder, + const l1::metastore::term_offset_map_t& term_offset_map, + const chunked_vector& objects) { + auto result = co_await _metastore->add_objects( + std::move(metadata_builder), term_offset_map); + if (!result.has_value()) { + vlog( + rlog.error, + "Failed to add objects to metastore: {}", + static_cast(result.error())); + co_return; + } + + // Update LRO for all partitions based on metastore commit results + for (const auto& object : objects) { + for (const auto& partition_info : object.partitions) { + const auto& partition = partition_info.partition; + const auto& metadata = partition_info.metadata; + + // Get topic_id_partition for this partition + auto tidp_result = get_topic_id_partition( + partition->partition->ntp()); + if (!tidp_result.has_value()) { + vlog( + rlog.error, + "Failed to get topic_id_partition for {}: {}", + partition->partition->ntp(), + tidp_result.error()); + continue; + } + + auto tidp = tidp_result.value(); + + // Check if metastore returned a corrected offset for this partition + kafka::offset new_lro; + auto corrected_it = result->corrected_next_offsets.find(tidp); + if (corrected_it != result->corrected_next_offsets.end()) { + new_lro = corrected_it->second; + vlog( + rlog.info, + "Using corrected next offset for {}: {}", + partition->partition->ntp(), + new_lro); + } else { + // No correction, use one past the last offset we uploaded + new_lro = metadata.last_offset + kafka::offset(1); + } + + // Update the partition's LRO + partition->lro = new_lro; + + vlog( + rlog.debug, + "Updated LRO for {} to {} (offsets {}~{} in object {})", + partition->partition->ntp(), + new_lro, + metadata.base_offset, + metadata.last_offset, + object.object_id); + } + } + + co_return; +} + +ss::future> +reconciler::setup_contexts( + l1::metastore::object_metadata_builder* metadata_builder, + const std::vector& partitions) { + build_contexts contexts; + + for (const auto& partition : partitions) { + // Get topic_id_partition for this partition + auto tidp_result = get_topic_id_partition(partition->partition->ntp()); + if (!tidp_result.has_value()) { + co_return std::unexpected( + fmt::format( + "Failed to get topic_id_partition for {}: {}", + partition->partition->ntp(), + tidp_result.error())); + } + + auto tidp = tidp_result.value(); + + // Get or create object for this partition + auto object_id = metadata_builder->get_or_create_object_for(tidp); + + // Store the mappings + contexts.ntp_to_tidp[partition->partition->ntp()] = tidp; + contexts.tidp_to_object[tidp] = object_id; + + // Create build context if this is a new object + if (contexts.objects.find(object_id) == contexts.objects.end()) { + auto staging_file_result = co_await _l1_io->create_tmp_file(); + if (!staging_file_result.has_value()) { + co_return std::unexpected( + fmt::format( + "Failed to create staging file: {}", + static_cast(staging_file_result.error()))); + } + + object_build_context ctx; + ctx.remaining_budget = max_object_size; + ctx.result.object_id = object_id; + ctx.result.staging_file = std::move(staging_file_result.value()); + + auto output_stream + = co_await ctx.result.staging_file->output_stream(); + ctx.builder = l1::object_builder::create( + std::move(output_stream), l1::object_builder::options{}); + + contexts.objects[object_id] = std::move(ctx); + } + } + + co_return contexts; +} + +std::expected +reconciler::get_topic_id_partition(const model::ntp& ntp) const { + auto topic_ns = model::topic_namespace{ntp.ns, ntp.tp.topic}; + auto topic_cfg = _metadata_cache->get_topic_cfg(topic_ns); + if (!topic_cfg.has_value()) { + return std::unexpected( + fmt::format( + "Failed to get topic configuration for topic {}", topic_ns)); + } + + if (!topic_cfg->tp_id.has_value()) { + return std::unexpected( + fmt::format("Topic {} does not have a topic_id", ntp.tp.topic)); + } + + model::topic_id tid = topic_cfg->tp_id.value(); + return model::topic_id_partition{tid, ntp.tp.partition}; +} + } // namespace cloud_topics::reconciler diff --git a/src/v/cloud_topics/reconciler/reconciler.h b/src/v/cloud_topics/reconciler/reconciler.h index e8e864eb5c327..ecbdedade0faf 100644 --- a/src/v/cloud_topics/reconciler/reconciler.h +++ b/src/v/cloud_topics/reconciler/reconciler.h @@ -10,11 +10,15 @@ #pragma once +#include "absl/container/flat_hash_map.h" #include "absl/container/node_hash_map.h" #include "base/seastarx.h" -#include "cloud_io/remote.h" -#include "cloud_storage_clients/types.h" -#include "cloud_topics/reconciler/range_batch_consumer.h" +#include "cloud_topics/level_one/common/file_io.h" +#include "cloud_topics/level_one/common/object.h" +#include "cloud_topics/level_one/common/object_id.h" +#include "cloud_topics/level_one/metastore/metastore.h" +#include "cloud_topics/reconciler/reconciliation_consumer.h" +#include "cluster/metadata_cache.h" #include "cluster/notification.h" #include "cluster/partition.h" #include "cluster/partition_manager.h" @@ -24,6 +28,7 @@ #include #include +#include #include namespace cloud_topics { @@ -42,9 +47,10 @@ class reconciler { public: reconciler( ss::sharded*, - ss::sharded*, data_plane_api*, - std::optional = std::nullopt); + l1::file_io*, + std::unique_ptr, + cluster::metadata_cache*); reconciler(const reconciler&) = delete; reconciler& operator=(const reconciler&) = delete; @@ -57,9 +63,9 @@ class reconciler { private: /* - * an attached partition is a partition that the reconciler is tracking and - * periodically processing. partitions are attached/detatched via upcalls - * from the cluster module. the reconciler operates on the leaders of + * An attached partition is a partition that the reconciler is tracking and + * periodically processing. Partitions are attached/detached via upcalls + * from the cluster module. The reconciler operates on the leaders of * partitions with affinity to the local shard. */ struct attached_partition_info { @@ -70,16 +76,16 @@ class reconciler { ss::lw_shared_ptr partition; /* - * last reconciled offset. this forms the starting offset when querying + * Last reconciled offset. this forms the starting offset when querying * the partition for new data. In later versions of the system this will * be stored in and queried from the partition itself. + * TODO: Rename this, and set it using the L0 LRO and the L1 metastore. */ kafka::offset lro; }; using attached_partition = ss::lw_shared_ptr; - // currently attached partitions absl::node_hash_map _partitions; void attach_partition(ss::lw_shared_ptr); @@ -89,72 +95,119 @@ class reconciler { cluster::notification_id_type _unmanage_notify_handle; private: - static constexpr size_t max_object_size = 4_MiB; + static constexpr size_t max_object_size = 64_MiB; /* - * metadata about a materialized range of batches stored in an L1 object. - * after an object is created and uploaded, this metadata is used to drive - * the creation and replication of overlay batches to each partition. - * - * partition - the source partition - * physical extent - position within the object - * range info - additional metadata (e.g. kafka offset extent) + * Metadata about a partition in an L1 object, used for committing. + * TODO: Update to commit using the L1 metastore. */ - struct object_range_info { + struct partition_commit_info { attached_partition partition; - uint64_t physical_offset_start; - uint64_t physical_offset_end; - range_info info; + partition_metadata metadata; }; /* - * a staged / materialized L1 object. - * - * data - the payload - * ranges - metadata about each range in the payload + * An L1 object built using object_builder with associated partition + * metadata. */ - struct object { - iobuf data; - chunked_vector ranges; + struct built_object { + l1::object_id object_id; + l1::object_builder::object_info object_info; + std::unique_ptr staging_file; + chunked_vector partitions; + }; + + /* + * Context for building a single L1 object, tracked during the + * build_objects process. + */ + struct object_build_context { + l1::object_id object_id; + std::unique_ptr builder; + size_t remaining_budget; + built_object result; + }; - // add a range from the given partition - void add(range, const attached_partition&); + /* + * Overall context for building multiple L1 objects grouped by + * metastore partition. + */ + struct build_contexts { + absl::flat_hash_map objects; + absl::flat_hash_map + tidp_to_object; + absl::flat_hash_map ntp_to_tidp; }; - // top-level background worker that drives reconciliation + // Top-level background worker that drives reconciliation. ss::future<> reconciliation_loop(); ssx::semaphore _control_sem{0, "reconciler::semaphore"}; /* - * one round of reconciliation in which data from one or more partitions may - * be reconciled into an L1 object. operates on the set of currently + * One round of reconciliation in which data from one or more partitions may + * be reconciled into an L1 object. Operates on the set of currently * attached partitions. */ ss::future<> reconcile(); /* - * reconciliation is a three step process. first an L1 object is built, then - * it is uploaded to cloud storage, and finally its committed. + * Reconciliation is a three step process. First, an L1 object is built, + * then it is uploaded to cloud storage, and finally it is committed. + * TODO: This process occurs for each domain, once using the metastore. + */ + ss::future> + build_objects(l1::metastore::object_metadata_builder*); + + /* + * Populate the metastore builder with metadata from all built objects + * and build the term offset map for committing to the metastore. */ - ss::future> build_object(); - ss::future - upload_object(cloud_storage_clients::object_key, iobuf); - ss::future<> commit_object( - const cloud_storage_clients::object_key&, const object_range_info&); + ss::future<> populate_metastore_builder( + const chunked_vector&, + l1::metastore::object_metadata_builder*, + l1::metastore::term_offset_map_t&); /* - * build a partition reader that returns batches to be reconciled. reading - * will start from the last reconcilied offset. if there is no data that + * Commit all objects to the metastore using the populated builder + * and term offset map, then update partition LROs based on the response. + */ + ss::future<> commit_to_metastore( + std::unique_ptr, + const l1::metastore::term_offset_map_t&, + const chunked_vector&); + + /* + * Build a partition reader that returns batches to be reconciled. Reading + * will start from the last reconcilied offset. If there is no data that * needs to be reconciled then an empty reader is returned. */ ss::future make_reader(const attached_partition&, size_t); + /* + * Add partition data to an L1 object builder. Returns the partition + * metadata if any batches were consumed, nullopt otherwise. + */ + ss::future> add_partition_to_object( + l1::object_builder*, const attached_partition&, size_t); + + /* + * Set up build contexts for all partitions, creating object builders + * and mapping partitions to objects via the metastore. + */ + ss::future> setup_contexts( + l1::metastore::object_metadata_builder*, + const std::vector&); + + std::expected + get_topic_id_partition(const model::ntp&) const; + private: ss::sharded* _partition_manager; - ss::sharded* _cloud_io; data_plane_api* _data_plane; - cloud_storage_clients::bucket_name _bucket; + l1::file_io* _l1_io; + std::unique_ptr _metastore; + cluster::metadata_cache* _metadata_cache; ss::gate _gate; ss::abort_source _as; }; diff --git a/src/v/cloud_topics/reconciler/reconciliation_consumer.cc b/src/v/cloud_topics/reconciler/reconciliation_consumer.cc new file mode 100644 index 0000000000000..adda2aaa4e0f2 --- /dev/null +++ b/src/v/cloud_topics/reconciler/reconciliation_consumer.cc @@ -0,0 +1,62 @@ +/* + * Copyright 2025 Redpanda Data, Inc. + * + * Licensed as a Redpanda Enterprise file under the Redpanda Community + * License (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * https://github.com/redpanda-data/redpanda/blob/master/licenses/rcl.md + */ + +#include "cloud_topics/reconciler/reconciliation_consumer.h" + +#include "model/timestamp.h" + +namespace cloud_topics::reconciler { + +constexpr kafka::offset offset_unset{-1}; + +reconciliation_consumer::reconciliation_consumer( + l1::object_builder* builder, model::ntp ntp) + : _builder(builder) + , _ntp(std::move(ntp)) + , _metadata{ + .base_offset = offset_unset, + .last_offset = offset_unset, + .base_timestamp = model::timestamp::max(), + .last_timestamp = model::timestamp::min()} {} + +ss::future +reconciliation_consumer::operator()(model::record_batch batch) { + if (_metadata.base_offset == offset_unset) { + _metadata.base_offset = model::offset_cast(batch.base_offset()); + co_await _builder->start_partition(_ntp); + } + + // NOTE: we only have data batches here so it's safe + // to use timestamps without checking the batch type. + _metadata.base_timestamp = std::min( + batch.header().first_timestamp, _metadata.base_timestamp); + _metadata.last_timestamp = std::max( + batch.header().max_timestamp, _metadata.last_timestamp); + _metadata.last_offset = model::offset_cast(batch.last_offset()); + + if (!_metadata.terms.contains(batch.term())) { + _metadata.terms.insert( + std::make_pair( + batch.term(), model::offset_cast(batch.base_offset()))); + } + + co_await _builder->add_batch(std::move(batch)); + + co_return ss::stop_iteration::no; +} + +std::optional reconciliation_consumer::end_of_stream() { + if (_metadata.base_offset != offset_unset) { + return _metadata; + } + return std::nullopt; +} + +} // namespace cloud_topics::reconciler diff --git a/src/v/cloud_topics/reconciler/range_batch_consumer.h b/src/v/cloud_topics/reconciler/reconciliation_consumer.h similarity index 64% rename from src/v/cloud_topics/reconciler/range_batch_consumer.h rename to src/v/cloud_topics/reconciler/reconciliation_consumer.h index f172f653af177..b5f5ee2b862a7 100644 --- a/src/v/cloud_topics/reconciler/range_batch_consumer.h +++ b/src/v/cloud_topics/reconciler/reconciliation_consumer.h @@ -11,7 +11,7 @@ #pragma once #include "absl/container/btree_map.h" -#include "bytes/iobuf.h" +#include "cloud_topics/level_one/common/object.h" #include "model/fundamental.h" #include "model/record.h" #include "model/timestamp.h" @@ -23,38 +23,28 @@ namespace cloud_topics::reconciler { -/* - * metadata about a range of batches. - */ -struct range_info { +struct partition_metadata { kafka::offset base_offset; kafka::offset last_offset; model::timestamp base_timestamp; model::timestamp last_timestamp; - // 'range_info' is not aligned by term boundary so this - // map is used to track term changes absl::btree_map terms; }; -/* - * a materialized range of batches. - */ -struct range { - iobuf data; - range_info info; -}; - -/* - * Consumer that builds a range from a record batch reader. - */ -class range_batch_consumer { +/// Consumes record batches from a partition and writes them to an L1 object. +/// Produces metadata about the consumed range including offsets, timestamps, +/// and term transitions. +class reconciliation_consumer { public: + reconciliation_consumer(l1::object_builder* builder, model::ntp ntp); + ss::future operator()(model::record_batch); - std::optional end_of_stream(); + std::optional end_of_stream(); private: - range _range; - std::optional _base_offset; + l1::object_builder* _builder; + model::ntp _ntp; + partition_metadata _metadata; }; } // namespace cloud_topics::reconciler diff --git a/src/v/cloud_topics/reconciler/tests/BUILD b/src/v/cloud_topics/reconciler/tests/BUILD index ad40ad81db05d..283f7c93a4d93 100644 --- a/src/v/cloud_topics/reconciler/tests/BUILD +++ b/src/v/cloud_topics/reconciler/tests/BUILD @@ -1,17 +1,20 @@ load("//bazel:test.bzl", "redpanda_cc_gtest") redpanda_cc_gtest( - name = "range_batch_consumer_test", + name = "reconciliation_consumer_test", timeout = "short", srcs = [ - "range_batch_consumer_test.cc", + "reconciliation_consumer_test.cc", ], deps = [ - "//src/v/cloud_topics/reconciler:range_batch_consumer", + "//src/v/bytes:iostream", + "//src/v/cloud_topics/level_one/common:object", + "//src/v/cloud_topics/reconciler:reconciliation_consumer", "//src/v/model", "//src/v/storage:record_batch_builder", "//src/v/test_utils:gtest", "//src/v/test_utils:random_bytes", "@googletest//:gtest", + "@seastar", ], ) diff --git a/src/v/cloud_topics/reconciler/tests/range_batch_consumer_test.cc b/src/v/cloud_topics/reconciler/tests/range_batch_consumer_test.cc deleted file mode 100644 index b8806f2ffecae..0000000000000 --- a/src/v/cloud_topics/reconciler/tests/range_batch_consumer_test.cc +++ /dev/null @@ -1,53 +0,0 @@ -/* - * Copyright 2024 Redpanda Data, Inc. - * - * Licensed as a Redpanda Enterprise file under the Redpanda Community - * License (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * https://github.com/redpanda-data/redpanda/blob/master/licenses/rcl.md - */ -#include "cloud_topics/reconciler/range_batch_consumer.h" -#include "model/record_batch_reader.h" -#include "storage/record_batch_builder.h" -#include "test_utils/random_bytes.h" - -#include - -using consumer = cloud_topics::reconciler::range_batch_consumer; - -model::record_batch_reader make_reader( - int offset, int record_size, int num_batches, int records_per_batch) { - ss::chunked_fifo batches; - for (int i = 0; i < num_batches; i++) { - storage::record_batch_builder b( - model::record_batch_type::raft_data, model::offset(offset)); - for (int j = 0; j < records_per_batch; j++) { - b.add_raw_kv(tests::random_iobuf(record_size), iobuf()); - offset += 1; - } - batches.push_back(std::move(b).build()); - } - return model::make_chunked_memory_record_batch_reader(std::move(batches)); -} - -TEST(RangeBatchConsumer, EmptyReader) { - auto reader = model::make_empty_record_batch_reader(); - auto range = std::move(reader).consume(consumer{}, model::no_timeout).get(); - ASSERT_FALSE(range.has_value()); -} - -TEST(RangeBatchConsumer, MultipleBatchesRecords) { - size_t record_size = 100; - size_t base_offset = 11; - for (int i = 1; i < 4; i++) { - for (int j = 1; j < 2; j++) { - auto reader = make_reader(base_offset, record_size, i, j); - auto range - = std::move(reader).consume(consumer{}, model::no_timeout).get(); - ASSERT_TRUE(range.has_value()); - ASSERT_EQ(range->info.base_offset(), base_offset); - ASSERT_EQ(range->info.last_offset(), base_offset + (i * j) - 1); - } - } -} diff --git a/src/v/cloud_topics/reconciler/tests/reconciliation_consumer_test.cc b/src/v/cloud_topics/reconciler/tests/reconciliation_consumer_test.cc new file mode 100644 index 0000000000000..8e960d3754a15 --- /dev/null +++ b/src/v/cloud_topics/reconciler/tests/reconciliation_consumer_test.cc @@ -0,0 +1,84 @@ +/* + * Copyright 2024 Redpanda Data, Inc. + * + * Licensed as a Redpanda Enterprise file under the Redpanda Community + * License (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * https://github.com/redpanda-data/redpanda/blob/master/licenses/rcl.md + */ +#include "bytes/iostream.h" +#include "cloud_topics/level_one/common/object.h" +#include "cloud_topics/reconciler/reconciliation_consumer.h" +#include "model/namespace.h" +#include "model/record_batch_reader.h" +#include "storage/record_batch_builder.h" +#include "test_utils/random_bytes.h" + +#include + +#include + +using consumer = cloud_topics::reconciler::reconciliation_consumer; +using namespace cloud_topics::l1; + +model::record_batch_reader make_reader( + int offset, int record_size, int num_batches, int records_per_batch) { + ss::chunked_fifo batches; + for (int i = 0; i < num_batches; i++) { + storage::record_batch_builder b( + model::record_batch_type::raft_data, model::offset(offset)); + for (int j = 0; j < records_per_batch; j++) { + b.add_raw_kv(tests::random_iobuf(record_size), iobuf()); + offset += 1; + } + batches.push_back(std::move(b).build()); + } + return model::make_chunked_memory_record_batch_reader(std::move(batches)); +} + +TEST(RangeBatchConsumer, EmptyReader) { + auto reader = model::make_empty_record_batch_reader(); + iobuf output; + auto builder = object_builder::create( + make_iobuf_ref_output_stream(output), object_builder::options{}); + auto _ = ss::defer([&builder] { builder->close().get(); }); + + model::ntp ntp( + model::ns("kimchi"), model::topic("taco"), model::partition_id(0)); + consumer c(builder.get(), ntp); + auto metadata + = std::move(reader).consume(std::move(c), model::no_timeout).get(); + ASSERT_FALSE(metadata.has_value()); +} + +TEST(RangeBatchConsumer, MultipleBatchesRecords) { + size_t record_size = 100; + size_t base_offset = 11; + for (int i = 1; i < 4; i++) { + for (int j = 1; j < 2; j++) { + auto reader = make_reader(base_offset, record_size, i, j); + iobuf output; + auto builder = object_builder::create( + make_iobuf_ref_output_stream(output), object_builder::options{}); + auto _ = ss::defer([&builder] { builder->close().get(); }); + + model::ntp ntp( + model::ns("kimchi"), + model::topic("taco"), + model::partition_id(0)); + consumer c(builder.get(), ntp); + auto metadata = std::move(reader) + .consume(std::move(c), model::no_timeout) + .get(); + ASSERT_TRUE(metadata.has_value()); + ASSERT_EQ(metadata->base_offset(), base_offset); + ASSERT_EQ(metadata->last_offset(), base_offset + (i * j) - 1); + + // Verify object was built + auto info = builder->finish().get(); + ASSERT_EQ(info.index.partitions.size(), 1); + ASSERT_EQ(info.index.partitions.begin()->first, ntp); + } + } +} diff --git a/src/v/config/node_config.h b/src/v/config/node_config.h index dd3795d36ed7d..580cbf44f6cac 100644 --- a/src/v/config/node_config.h +++ b/src/v/config/node_config.h @@ -150,6 +150,10 @@ struct node_config final : public config_store { return data_directory().path / "datalake_staging"; } + std::filesystem::path l1_staging_path() const { + return data_directory().path / "l1_staging"; + } + std::vector advertised_kafka_api() const { if (_advertised_kafka_api().empty()) { std::vector eps;