diff --git a/src/v/lsm/block/reader.cc b/src/v/lsm/block/reader.cc index 59a77dd854048..8a6bded453624 100644 --- a/src/v/lsm/block/reader.cc +++ b/src/v/lsm/block/reader.cc @@ -57,11 +57,13 @@ class iter final : public internal::iterator { bool valid() const final { return _current < _restarts_offset; } ss::future<> seek_to_first() final { + invalidate(); seek_to_restart_point(0); parse_next_key(); return ss::now(); } ss::future<> seek_to_last() final { + invalidate(); seek_to_restart_point(_num_restarts - 1); while (parse_next_key() && next_entry_offset() < _restarts_offset) { // Keep skipping @@ -91,6 +93,8 @@ class iter final : public internal::iterator { } } + invalidate(); + while (left < right) { uint32_t mid = (left + right + 1) / 2; uint32_t region_offset = get_restart_point(mid); @@ -118,7 +122,6 @@ class iter final : public internal::iterator { // We might be able to use our current position within the restart // block. This is true if we determined the key we desire is in the // current block and is after than the current key. - assert(current_key_compare == 0 || valid()); bool skip_seek = left == _restart_index && current_key_compare < 0; if (!skip_seek) { seek_to_restart_point(left); @@ -141,10 +144,10 @@ class iter final : public internal::iterator { ss::future<> prev() final { // Scan backwards to a restart point before current_ const uint32_t original = _current; + invalidate(); while (get_restart_point(_restart_index) >= original) { if (_restart_index == 0) { // No more entries - _current = _restarts_offset; _restart_index = _num_restarts; return ss::now(); } @@ -184,36 +187,39 @@ class iter final : public internal::iterator { _value = {.offset = get_restart_point(index), .length = 0}; } + // Reset to !valid() so a thrown corruption_exception doesn't leave + // stale _key / _value bytes from the previous entry. + void invalidate() { _current = _restarts_offset; } + bool parse_next_key() { - _current = next_entry_offset(); + const uint32_t next = next_entry_offset(); + invalidate(); // Restarts come right after data - if (_current >= _restarts_offset) { - // No more entries to return. Mark as invalid. - _current = _restarts_offset; + if (next >= _restarts_offset) { _restart_index = _num_restarts; return false; } // Decode next entry auto [shared, non_shared, value_length] = decode_entry( - *_data, _current, _restarts_offset); - uint32_t p = _current + (3 * sizeof(uint32_t)); + *_data, next, _restarts_offset); + uint32_t p = next + (3 * sizeof(uint32_t)); if (_key.size() < shared) { throw corruption_exception( "corruption: shared key too short: {} < {}", _key.size(), shared); - } else { - _key.resize(shared + non_shared); - auto it = _key.begin(); - std::advance(it, shared); - for (std::string_view s : _data->read_string(p, non_shared)) { - it = std::ranges::copy(s, it).out; - } - _value = range{.offset = p + non_shared, .length = value_length}; - while (_restart_index + 1 < _num_restarts - && get_restart_point(_restart_index + 1) < _current) { - ++_restart_index; - } - return true; } + _key.resize(shared + non_shared); + auto it = _key.begin(); + std::advance(it, shared); + for (std::string_view s : _data->read_string(p, non_shared)) { + it = std::ranges::copy(s, it).out; + } + _value = range{.offset = p + non_shared, .length = value_length}; + while (_restart_index + 1 < _num_restarts + && get_restart_point(_restart_index + 1) < next) { + ++_restart_index; + } + _current = next; + return true; } ss::lw_shared_ptr _data; diff --git a/src/v/lsm/core/internal/merging_iterator.cc b/src/v/lsm/core/internal/merging_iterator.cc index ca2fc702ccac7..69f4d5847904a 100644 --- a/src/v/lsm/core/internal/merging_iterator.cc +++ b/src/v/lsm/core/internal/merging_iterator.cc @@ -29,6 +29,7 @@ class merging_iterator : public iterator { bool valid() const override { return _current != nullptr; } ss::future<> seek_to_first() override { + invalidate(); for (auto& child : _children) { co_await child->seek_to_first(); } @@ -36,6 +37,7 @@ class merging_iterator : public iterator { _dir = direction::forward; } ss::future<> seek_to_last() override { + invalidate(); for (auto& child : _children) { co_await child->seek_to_last(); } @@ -43,6 +45,7 @@ class merging_iterator : public iterator { _dir = direction::backward; } ss::future<> seek(key_view target) override { + invalidate(); for (auto& child : _children) { co_await child->seek(target); } @@ -52,6 +55,10 @@ class merging_iterator : public iterator { ss::future<> next() override { dassert(valid(), "next must be called on a valid iterator"); + // Stash the current child before clearing _current so we can resume + // the forward step from the right position. + iterator* current = _current; + invalidate(); // Ensure that all children are positioned after key(). // If we are moving in the forward direction, it is already // true for all of the non-current_ children since current_ is @@ -59,20 +66,22 @@ class merging_iterator : public iterator { // we explicitly position the non-current_ children. if (_dir != direction::forward) { for (auto& child : _children) { - if (child.get() != _current) { - co_await child->seek(key()); - if (child->valid() && key() == child->key()) { + if (child.get() != current) { + co_await child->seek(current->key()); + if (child->valid() && current->key() == child->key()) { co_await child->next(); } } } _dir = direction::forward; } - co_await _current->next(); + co_await current->next(); find_smallest(); } ss::future<> prev() override { dassert(valid(), "prev must be called on a valid iterator"); + iterator* current = _current; + invalidate(); // Ensure that all children are positioned after key(). // If we are moving in the forward direction, it is already // true for all of the non-current_ children since current_ is @@ -80,8 +89,8 @@ class merging_iterator : public iterator { // we explicitly position the non-current_ children. if (_dir != direction::backward) { for (auto& child : _children) { - if (child.get() != _current) { - co_await child->seek(key()); + if (child.get() != current) { + co_await child->seek(current->key()); if (child->valid()) { // Child is at first entry >= key. Step back one to be < // key(). @@ -95,7 +104,7 @@ class merging_iterator : public iterator { } _dir = direction::backward; } - co_await _current->prev(); + co_await current->prev(); find_largest(); } key_view key() override { @@ -108,6 +117,9 @@ class merging_iterator : public iterator { } private: + // Reset to !valid() so a thrown await doesn't leave stale state. + void invalidate() { _current = nullptr; } + void find_smallest() { iterator* smallest = nullptr; for (auto& child : _children) { diff --git a/src/v/lsm/core/internal/tests/BUILD b/src/v/lsm/core/internal/tests/BUILD index ec5877f815064..70bfb3105daea 100644 --- a/src/v/lsm/core/internal/tests/BUILD +++ b/src/v/lsm/core/internal/tests/BUILD @@ -55,6 +55,18 @@ redpanda_test_cc_library( ], ) +redpanda_test_cc_library( + name = "throwing_iterator", + hdrs = ["throwing_iterator.h"], + visibility = ["//src/v/lsm:__subpackages__"], + deps = [ + "//src/v/bytes:iobuf", + "//src/v/lsm/core/internal:iterator", + "//src/v/lsm/core/internal:keys", + "@seastar", + ], +) + redpanda_cc_gtest( name = "merging_iterator_test", timeout = "short", @@ -63,6 +75,7 @@ redpanda_cc_gtest( ], deps = [ ":iterator_test_harness", + ":throwing_iterator", "//src/v/base", "//src/v/lsm/core/internal:iterator", "//src/v/lsm/core/internal:keys", @@ -72,3 +85,22 @@ redpanda_cc_gtest( "@seastar", ], ) + +redpanda_cc_gtest( + name = "two_level_iterator_test", + timeout = "short", + srcs = [ + "two_level_iterator_test.cc", + ], + deps = [ + ":throwing_iterator", + "//src/v/base", + "//src/v/bytes:iobuf", + "//src/v/lsm/core/internal:iterator", + "//src/v/lsm/core/internal:keys", + "//src/v/lsm/core/internal:two_level_iterator", + "//src/v/test_utils:gtest", + "@googletest//:gtest", + "@seastar", + ], +) diff --git a/src/v/lsm/core/internal/tests/merging_iterator_test.cc b/src/v/lsm/core/internal/tests/merging_iterator_test.cc index e76d77534d0d6..c4ffab8cc2b70 100644 --- a/src/v/lsm/core/internal/tests/merging_iterator_test.cc +++ b/src/v/lsm/core/internal/tests/merging_iterator_test.cc @@ -9,6 +9,7 @@ #include "lsm/core/internal/keys.h" #include "lsm/core/internal/merging_iterator.h" #include "lsm/core/internal/tests/iterator_test_harness.h" +#include "lsm/core/internal/tests/throwing_iterator.h" #include #include @@ -18,54 +19,7 @@ using ::testing::Pair; namespace { -class simple_iterator : public lsm::internal::iterator { -public: - explicit simple_iterator(std::map data) - : _data(std::move(data)) {} - - bool valid() const override { return _it != _data.end(); } - - ss::future<> seek_to_first() override { - _it = _data.begin(); - return ss::now(); - } - - ss::future<> seek_to_last() override { - _it = _data.empty() ? _data.end() : std::prev(_data.end()); - return ss::now(); - } - - ss::future<> seek(lsm::internal::key_view target) override { - _it = _data.lower_bound(lsm::internal::key(target)); - return ss::now(); - } - - ss::future<> next() override { - if (_it != _data.end()) { - ++_it; - } - return ss::now(); - } - - ss::future<> prev() override { - if (_it == _data.begin()) { - _it = _data.end(); - } else if (_it != _data.end()) { - --_it; - } else if (!_data.empty()) { - _it = std::prev(_data.end()); - } - return ss::now(); - } - - lsm::internal::key_view key() override { return _it->first; } - - iobuf value() override { return _it->second.copy(); } - -private: - std::map _data; - std::map::iterator _it; -}; +using lsm::internal::testing::throwing_iterator; class merging_iterator_factory { public: @@ -73,7 +27,7 @@ class merging_iterator_factory { make_iterator(std::map map) { chunked_vector> children; children.push_back(lsm::internal::iterator::create_empty()); - children.push_back(std::make_unique(std::move(map))); + children.push_back(std::make_unique(std::move(map))); children.push_back(lsm::internal::iterator::create_empty()); return lsm::internal::create_merging_iterator(std::move(children)); } @@ -132,8 +86,8 @@ TEST(MergingIteratorTest, MergeTwoIterators) { auto data2 = make_test_data({{"b", "2"}, {"d", "4"}, {"f", "6"}}); chunked_vector> children; - children.push_back(std::make_unique(std::move(data1))); - children.push_back(std::make_unique(std::move(data2))); + children.push_back(std::make_unique(std::move(data1))); + children.push_back(std::make_unique(std::move(data2))); auto it = lsm::internal::create_merging_iterator(std::move(children)); auto results = collect_all_pairs(it); @@ -153,8 +107,8 @@ TEST(MergingIteratorTest, DuplicateKeys) { auto data2 = make_test_data({{"a", "10"}, {"b", "20"}, {"d", "40"}}); chunked_vector> children; - children.push_back(std::make_unique(std::move(data1))); - children.push_back(std::make_unique(std::move(data2))); + children.push_back(std::make_unique(std::move(data1))); + children.push_back(std::make_unique(std::move(data2))); auto it = lsm::internal::create_merging_iterator(std::move(children)); // Verify that duplicates are yielded (should see "a" twice, "b" twice) @@ -187,7 +141,7 @@ TEST(MergingIteratorTest, SingleChild) { auto data = make_test_data({{"a", "1"}, {"b", "2"}}); chunked_vector> children; - children.push_back(std::make_unique(std::move(data))); + children.push_back(std::make_unique(std::move(data))); auto it = lsm::internal::create_merging_iterator(std::move(children)); it->seek_to_first().get(); @@ -209,8 +163,8 @@ TEST(MergingIteratorTest, BackwardIteration) { auto data2 = make_test_data({{"b", "2"}, {"d", "4"}}); chunked_vector> children; - children.push_back(std::make_unique(std::move(data1))); - children.push_back(std::make_unique(std::move(data2))); + children.push_back(std::make_unique(std::move(data1))); + children.push_back(std::make_unique(std::move(data2))); auto it = lsm::internal::create_merging_iterator(std::move(children)); auto results = collect_all_pairs_reverse(it); @@ -219,3 +173,82 @@ TEST(MergingIteratorTest, BackwardIteration) { ElementsAre( Pair("d", "4"), Pair("c", "3"), Pair("b", "2"), Pair("a", "1"))); } + +namespace { + +class MergingIteratorExceptionSafetyTest : public ::testing::Test { +public: + void SetUp() override { + auto good_data = make_test_data({{"a", "1"}, {"c", "3"}, {"e", "5"}}); + auto bad_data = make_test_data({{"b", "2"}, {"d", "4"}, {"f", "6"}}); + auto bad_iter = std::make_unique( + std::move(bad_data)); + _bad = bad_iter.get(); + chunked_vector> children; + children.push_back( + std::make_unique(std::move(good_data))); + children.push_back(std::move(bad_iter)); + _it = lsm::internal::create_merging_iterator(std::move(children)); + } + +protected: + throwing_iterator* _bad = nullptr; + std::unique_ptr _it; +}; + +} // namespace + +// After any thrown await in a mutating method, valid() must report false. +TEST_F(MergingIteratorExceptionSafetyTest, SeekChildThrowLeavesInvalid) { + _it->seek_to_first().get(); + ASSERT_TRUE(_it->valid()); + + _bad->fail_next(); + auto target = lsm::internal::key::encode({.key = lsm::user_key_view("b")}); + EXPECT_THROW(_it->seek(target).get(), std::runtime_error); + EXPECT_FALSE(_it->valid()); +} + +TEST_F(MergingIteratorExceptionSafetyTest, SeekToFirstChildThrowLeavesInvalid) { + _it->seek_to_first().get(); + ASSERT_TRUE(_it->valid()); + + _bad->fail_next(); + EXPECT_THROW(_it->seek_to_first().get(), std::runtime_error); + EXPECT_FALSE(_it->valid()); +} + +TEST_F(MergingIteratorExceptionSafetyTest, SeekToLastChildThrowLeavesInvalid) { + _it->seek_to_last().get(); + ASSERT_TRUE(_it->valid()); + + _bad->fail_next(); + EXPECT_THROW(_it->seek_to_last().get(), std::runtime_error); + EXPECT_FALSE(_it->valid()); +} + +TEST_F(MergingIteratorExceptionSafetyTest, NextChildThrowLeavesInvalid) { + _it->seek_to_first().get(); + // Walk forward until _current points at the throwing child so its + // next() is the call that throws. + while (_it->valid() && _it->key().user_key()() != "b") { + _it->next().get(); + } + ASSERT_TRUE(_it->valid()); + + _bad->fail_next(); + EXPECT_THROW(_it->next().get(), std::runtime_error); + EXPECT_FALSE(_it->valid()); +} + +TEST_F(MergingIteratorExceptionSafetyTest, PrevChildThrowLeavesInvalid) { + _it->seek_to_last().get(); + while (_it->valid() && _it->key().user_key()() != "f") { + _it->prev().get(); + } + ASSERT_TRUE(_it->valid()); + + _bad->fail_next(); + EXPECT_THROW(_it->prev().get(), std::runtime_error); + EXPECT_FALSE(_it->valid()); +} diff --git a/src/v/lsm/core/internal/tests/throwing_iterator.h b/src/v/lsm/core/internal/tests/throwing_iterator.h new file mode 100644 index 0000000000000..e8044a619dbc0 --- /dev/null +++ b/src/v/lsm/core/internal/tests/throwing_iterator.h @@ -0,0 +1,108 @@ +// Copyright 2026 Redpanda Data, Inc. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.md +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0 + +#pragma once + +#include "bytes/iobuf.h" +#include "lsm/core/internal/iterator.h" +#include "lsm/core/internal/keys.h" + +#include + +#include +#include +#include + +namespace lsm::internal::testing { + +// An in-memory iterator that can be primed to throw on its next mutating +// call. Used to drive the exception-safety paths of higher-level +// iterators (db_iter, two_level_iterator, merging_iterator) without +// needing a real cloud-backed SST. +class throwing_iterator : public lsm::internal::iterator { +public: + explicit throwing_iterator(std::map data) + : _data(std::move(data)) + , _it(_data.end()) {} + + // Arm the iterator to fail the next mutating call exactly once. + void fail_next() { _fail_pending = true; } + + bool valid() const override { return _it != _data.end(); } + + ss::future<> seek_to_first() override { + if (consume_failure()) { + return make_failure(); + } + _it = _data.begin(); + return ss::now(); + } + + ss::future<> seek_to_last() override { + if (consume_failure()) { + return make_failure(); + } + _it = _data.empty() ? _data.end() : std::prev(_data.end()); + return ss::now(); + } + + ss::future<> seek(lsm::internal::key_view target) override { + if (consume_failure()) { + return make_failure(); + } + _it = _data.lower_bound(lsm::internal::key(target)); + return ss::now(); + } + + ss::future<> next() override { + if (consume_failure()) { + return make_failure(); + } + if (_it != _data.end()) { + ++_it; + } + return ss::now(); + } + + ss::future<> prev() override { + if (consume_failure()) { + return make_failure(); + } + if (_it == _data.begin()) { + _it = _data.end(); + } else if (_it != _data.end()) { + --_it; + } else if (!_data.empty()) { + _it = std::prev(_data.end()); + } + return ss::now(); + } + + lsm::internal::key_view key() override { return _it->first; } + iobuf value() override { return _it->second.copy(); } + +private: + bool consume_failure() { + if (_fail_pending) { + _fail_pending = false; + return true; + } + return false; + } + static ss::future<> make_failure() { + return ss::make_exception_future<>( + std::runtime_error("simulated iterator failure")); + } + + std::map _data; + std::map::iterator _it; + bool _fail_pending = false; +}; + +} // namespace lsm::internal::testing diff --git a/src/v/lsm/core/internal/tests/two_level_iterator_test.cc b/src/v/lsm/core/internal/tests/two_level_iterator_test.cc new file mode 100644 index 0000000000000..a3525726d19e6 --- /dev/null +++ b/src/v/lsm/core/internal/tests/two_level_iterator_test.cc @@ -0,0 +1,155 @@ +// Copyright 2026 Redpanda Data, Inc. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.md +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0 + +#include "lsm/core/internal/iterator.h" +#include "lsm/core/internal/keys.h" +#include "lsm/core/internal/tests/throwing_iterator.h" +#include "lsm/core/internal/two_level_iterator.h" + +#include + +#include +#include +#include +#include + +namespace { + +using lsm::internal::testing::throwing_iterator; + +class TwoLevelIteratorExceptionSafetyTest : public ::testing::Test { +public: + void SetUp() override { + // Index has one entry whose key is the block's largest user key. + std::map index_entries; + index_entries.emplace( + lsm::internal::key::encode({.key = lsm::user_key_view("c")}), + iobuf::from("block1")); + + auto idx = std::make_unique( + std::move(index_entries)); + _index = idx.get(); + + _it = lsm::internal::create_two_level_iterator( + std::move(idx), + [this]( + iobuf) -> ss::future> { + if (_data_fn_throws) { + _data_fn_throws = false; + return ss::make_exception_future< + std::unique_ptr>( + std::runtime_error("data_iter_fn fail")); + } + std::map block_data; + for (auto k : {"a", "b", "c"}) { + block_data.emplace( + lsm::internal::key::encode({.key = lsm::user_key_view(k)}), + iobuf::from(k)); + } + auto data_iter = std::make_unique( + std::move(block_data)); + _last_data = data_iter.get(); + if (_arm_data_on_create) { + _arm_data_on_create = false; + data_iter->fail_next(); + } + return ss::make_ready_future< + std::unique_ptr>(std::move(data_iter)); + }); + } + +protected: + throwing_iterator* _index = nullptr; + // Raw pointer to the most recently created data iterator. Owned by + // the two-level iterator under test. + throwing_iterator* _last_data = nullptr; + // If set, the next data_iter_fn invocation throws. + bool _data_fn_throws = false; + // If set, the next data_iter_fn invocation returns an iterator + // already armed to fail on its next mutating call. + bool _arm_data_on_create = false; + std::unique_ptr _it; +}; + +// After any thrown await in a mutating method, valid() must report false. +TEST_F(TwoLevelIteratorExceptionSafetyTest, IndexSeekThrowLeavesInvalid) { + auto target = lsm::internal::key::encode({.key = lsm::user_key_view("a")}); + _it->seek(target).get(); + ASSERT_TRUE(_it->valid()); + + _index->fail_next(); + EXPECT_THROW(_it->seek(target).get(), std::runtime_error); + EXPECT_FALSE(_it->valid()); +} + +// data_iter_fn throwing during init_data_block leaves valid() == false. +TEST_F(TwoLevelIteratorExceptionSafetyTest, DataIterFnThrowLeavesInvalid) { + auto target = lsm::internal::key::encode({.key = lsm::user_key_view("a")}); + _it->seek(target).get(); + ASSERT_TRUE(_it->valid()); + + _data_fn_throws = true; + EXPECT_THROW(_it->seek(target).get(), std::runtime_error); + EXPECT_FALSE(_it->valid()); +} + +// _data_iter is assigned by init_data_block, then its own seek throws. +TEST_F(TwoLevelIteratorExceptionSafetyTest, DataIterSeekThrowLeavesInvalid) { + auto target = lsm::internal::key::encode({.key = lsm::user_key_view("a")}); + _it->seek(target).get(); + ASSERT_TRUE(_it->valid()); + + _arm_data_on_create = true; + EXPECT_THROW(_it->seek(target).get(), std::runtime_error); + EXPECT_FALSE(_it->valid()); +} + +// The data iterator throwing inside next() leaves valid() == false. +TEST_F(TwoLevelIteratorExceptionSafetyTest, DataIterNextThrowLeavesInvalid) { + auto target = lsm::internal::key::encode({.key = lsm::user_key_view("a")}); + _it->seek(target).get(); + ASSERT_TRUE(_it->valid()); + + _last_data->fail_next(); + EXPECT_THROW(_it->next().get(), std::runtime_error); + EXPECT_FALSE(_it->valid()); +} + +// The data iterator throwing inside prev() leaves valid() == false. +TEST_F(TwoLevelIteratorExceptionSafetyTest, DataIterPrevThrowLeavesInvalid) { + _it->seek_to_last().get(); + ASSERT_TRUE(_it->valid()); + + _last_data->fail_next(); + EXPECT_THROW(_it->prev().get(), std::runtime_error); + EXPECT_FALSE(_it->valid()); +} + +// seek_to_first with the index throwing leaves valid() == false. +TEST_F( + TwoLevelIteratorExceptionSafetyTest, SeekToFirstIndexThrowLeavesInvalid) { + _it->seek_to_first().get(); + ASSERT_TRUE(_it->valid()); + + _index->fail_next(); + EXPECT_THROW(_it->seek_to_first().get(), std::runtime_error); + EXPECT_FALSE(_it->valid()); +} + +// seek_to_last with the index throwing leaves valid() == false. +TEST_F(TwoLevelIteratorExceptionSafetyTest, SeekToLastIndexThrowLeavesInvalid) { + _it->seek_to_last().get(); + ASSERT_TRUE(_it->valid()); + + _index->fail_next(); + EXPECT_THROW(_it->seek_to_last().get(), std::runtime_error); + EXPECT_FALSE(_it->valid()); +} + +} // namespace diff --git a/src/v/lsm/core/internal/two_level_iterator.cc b/src/v/lsm/core/internal/two_level_iterator.cc index 1872fe9e6eca1..e3434024fa319 100644 --- a/src/v/lsm/core/internal/two_level_iterator.cc +++ b/src/v/lsm/core/internal/two_level_iterator.cc @@ -23,45 +23,55 @@ class impl : public iterator { ~impl() override = default; - bool valid() const override { return _data_iter && _data_iter->valid(); } + bool valid() const override { return _valid; } ss::future<> seek_to_first() override { + invalidate(); co_await _index_iter->seek_to_first(); co_await init_data_block(); if (_data_iter) { co_await _data_iter->seek_to_first(); } co_await skip_empty_data_blocks_forward(); + revalidate(); } ss::future<> seek_to_last() override { + invalidate(); co_await _index_iter->seek_to_last(); co_await init_data_block(); if (_data_iter) { co_await _data_iter->seek_to_last(); } co_await skip_empty_data_blocks_backward(); + revalidate(); } ss::future<> seek(key_view target) override { + invalidate(); co_await _index_iter->seek(target); co_await init_data_block(); if (_data_iter) { co_await _data_iter->seek(target); } co_await skip_empty_data_blocks_forward(); + revalidate(); } ss::future<> next() override { assert(valid()); + invalidate(); co_await _data_iter->next(); co_await skip_empty_data_blocks_forward(); + revalidate(); } ss::future<> prev() override { assert(valid()); + invalidate(); co_await _data_iter->prev(); co_await skip_empty_data_blocks_backward(); + revalidate(); } key_view key() override { @@ -75,6 +85,13 @@ class impl : public iterator { } private: + // Reset to !valid() so a thrown await doesn't leave stale state. + void invalidate() { _valid = false; } + // skip_empty_data_blocks_* maintains the invariant that _data_iter is + // either nullptr or positioned at a valid entry, so a non-null + // _data_iter implies validity at this point. + void revalidate() { _valid = _data_iter != nullptr; } + ss::future<> init_data_block() { // Clear previous iterator first in case `_data_iter_fn` throws. _data_iter = nullptr; @@ -118,6 +135,7 @@ class impl : public iterator { // ss::optimized_optional, as that makes the code slightly harder // to read. std::unique_ptr _data_iter; + bool _valid = false; }; } // namespace diff --git a/src/v/lsm/db/tests/BUILD b/src/v/lsm/db/tests/BUILD index 45a053296898e..99970e023450c 100644 --- a/src/v/lsm/db/tests/BUILD +++ b/src/v/lsm/db/tests/BUILD @@ -151,6 +151,7 @@ redpanda_cc_gtest( "//src/v/lsm/core/internal:iterator", "//src/v/lsm/core/internal:keys", "//src/v/lsm/core/internal:options", + "//src/v/lsm/core/internal/tests:throwing_iterator", "//src/v/lsm/db:iter", "//src/v/lsm/db:memtable", "//src/v/test_utils:gtest", diff --git a/src/v/lsm/db/tests/iter_test.cc b/src/v/lsm/db/tests/iter_test.cc index 847cd138429d8..f823f92dc6406 100644 --- a/src/v/lsm/db/tests/iter_test.cc +++ b/src/v/lsm/db/tests/iter_test.cc @@ -9,12 +9,12 @@ #include "lsm/core/internal/iterator.h" #include "lsm/core/internal/keys.h" #include "lsm/core/internal/options.h" +#include "lsm/core/internal/tests/throwing_iterator.h" #include "lsm/db/iter.h" #include "lsm/db/memtable.h" #include -#include #include #include #include @@ -513,89 +513,7 @@ TEST_F(DBIteratorTest, TombstoneAfterValues) { EXPECT_FALSE(it->valid()); } -// An iterator over an in-memory map that can be primed to throw on the -// next mutating operation. Used to model the cloud_io failure path where -// a deeper iterator (e.g. a two-level iterator's data block) raises an -// exception mid-operation, as observed in the AT reactor-1 SIGSEGV. -class throwing_iterator : public lsm::internal::iterator { -public: - explicit throwing_iterator(std::map data) - : _data(std::move(data)) - , _it(_data.end()) {} - - // Arm the iterator to fail the next mutating call exactly once. - void fail_next() { _fail_pending = true; } - - bool valid() const override { return _it != _data.end(); } - - ss::future<> seek_to_first() override { - if (consume_failure()) { - return make_failure(); - } - _it = _data.begin(); - return ss::now(); - } - - ss::future<> seek_to_last() override { - if (consume_failure()) { - return make_failure(); - } - _it = _data.empty() ? _data.end() : std::prev(_data.end()); - return ss::now(); - } - - ss::future<> seek(lsm::internal::key_view target) override { - if (consume_failure()) { - return make_failure(); - } - _it = _data.lower_bound(lsm::internal::key(target)); - return ss::now(); - } - - ss::future<> next() override { - if (consume_failure()) { - return make_failure(); - } - if (_it != _data.end()) { - ++_it; - } - return ss::now(); - } - - ss::future<> prev() override { - if (consume_failure()) { - return make_failure(); - } - if (_it == _data.begin()) { - _it = _data.end(); - } else if (_it != _data.end()) { - --_it; - } else if (!_data.empty()) { - _it = std::prev(_data.end()); - } - return ss::now(); - } - - lsm::internal::key_view key() override { return _it->first; } - iobuf value() override { return _it->second.copy(); } - -private: - bool consume_failure() { - if (_fail_pending) { - _fail_pending = false; - return true; - } - return false; - } - static ss::future<> make_failure() { - return ss::make_exception_future<>( - std::runtime_error("simulated cloud_io download failure")); - } - - std::map _data; - std::map::iterator _it; - bool _fail_pending = false; -}; +using lsm::internal::testing::throwing_iterator; class DBIteratorExceptionSafetyTest : public testing::Test { public: @@ -624,10 +542,7 @@ class DBIteratorExceptionSafetyTest : public testing::Test { std::unique_ptr _it; }; -// If seek throws partway through, valid() must report false. Previously -// _valid retained whatever it had been before the call, allowing a stale -// "true" to escape and a downstream key() to dereference a half-loaded -// underlying iterator (the AT reactor-1 SIGSEGV). +// After any thrown await in a mutating method, valid() must report false. TEST_F(DBIteratorExceptionSafetyTest, SeekThrowLeavesIteratorInvalid) { _it->seek("a"_seek_key).get(); ASSERT_TRUE(_it->valid());