Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 27 additions & 21 deletions src/v/lsm/block/reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -57,11 +57,13 @@ class iter final : public internal::iterator {

bool valid() const final { return _current < _restarts_offset; }
ss::future<> seek_to_first() final {
invalidate();
seek_to_restart_point(0);
parse_next_key();
return ss::now();
}
ss::future<> seek_to_last() final {
invalidate();
seek_to_restart_point(_num_restarts - 1);
while (parse_next_key() && next_entry_offset() < _restarts_offset) {
// Keep skipping
Expand Down Expand Up @@ -91,6 +93,8 @@ class iter final : public internal::iterator {
}
}

invalidate();

while (left < right) {
uint32_t mid = (left + right + 1) / 2;
uint32_t region_offset = get_restart_point(mid);
Expand Down Expand Up @@ -118,7 +122,6 @@ class iter final : public internal::iterator {
// We might be able to use our current position within the restart
// block. This is true if we determined the key we desire is in the
// current block and is after than the current key.
assert(current_key_compare == 0 || valid());
bool skip_seek = left == _restart_index && current_key_compare < 0;
if (!skip_seek) {
seek_to_restart_point(left);
Expand All @@ -141,10 +144,10 @@ class iter final : public internal::iterator {
ss::future<> prev() final {
// Scan backwards to a restart point before current_
const uint32_t original = _current;
invalidate();
while (get_restart_point(_restart_index) >= original) {
if (_restart_index == 0) {
// No more entries
_current = _restarts_offset;
_restart_index = _num_restarts;
return ss::now();
}
Expand Down Expand Up @@ -184,36 +187,39 @@ class iter final : public internal::iterator {
_value = {.offset = get_restart_point(index), .length = 0};
}

// Reset to !valid() so a thrown corruption_exception doesn't leave
// stale _key / _value bytes from the previous entry.
void invalidate() { _current = _restarts_offset; }

bool parse_next_key() {
_current = next_entry_offset();
const uint32_t next = next_entry_offset();
invalidate();
// Restarts come right after data
if (_current >= _restarts_offset) {
// No more entries to return. Mark as invalid.
_current = _restarts_offset;
if (next >= _restarts_offset) {
_restart_index = _num_restarts;
return false;
}
// Decode next entry
auto [shared, non_shared, value_length] = decode_entry(
*_data, _current, _restarts_offset);
uint32_t p = _current + (3 * sizeof(uint32_t));
*_data, next, _restarts_offset);
uint32_t p = next + (3 * sizeof(uint32_t));
if (_key.size() < shared) {
throw corruption_exception(
"corruption: shared key too short: {} < {}", _key.size(), shared);
} else {
_key.resize(shared + non_shared);
auto it = _key.begin();
std::advance(it, shared);
for (std::string_view s : _data->read_string(p, non_shared)) {
it = std::ranges::copy(s, it).out;
}
_value = range{.offset = p + non_shared, .length = value_length};
while (_restart_index + 1 < _num_restarts
&& get_restart_point(_restart_index + 1) < _current) {
++_restart_index;
}
return true;
}
_key.resize(shared + non_shared);
auto it = _key.begin();
std::advance(it, shared);
for (std::string_view s : _data->read_string(p, non_shared)) {
it = std::ranges::copy(s, it).out;
}
_value = range{.offset = p + non_shared, .length = value_length};
while (_restart_index + 1 < _num_restarts
&& get_restart_point(_restart_index + 1) < next) {
++_restart_index;
}
_current = next;
return true;
}

ss::lw_shared_ptr<contents> _data;
Expand Down
26 changes: 19 additions & 7 deletions src/v/lsm/core/internal/merging_iterator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -29,20 +29,23 @@ class merging_iterator : public iterator {

bool valid() const override { return _current != nullptr; }
ss::future<> seek_to_first() override {
invalidate();
for (auto& child : _children) {
co_await child->seek_to_first();
}
find_smallest();
_dir = direction::forward;
}
ss::future<> seek_to_last() override {
invalidate();
for (auto& child : _children) {
co_await child->seek_to_last();
}
find_largest();
_dir = direction::backward;
}
ss::future<> seek(key_view target) override {
invalidate();
for (auto& child : _children) {
co_await child->seek(target);
}
Expand All @@ -52,36 +55,42 @@ class merging_iterator : public iterator {

ss::future<> next() override {
dassert(valid(), "next must be called on a valid iterator");
// Stash the current child before clearing _current so we can resume
// the forward step from the right position.
iterator* current = _current;
invalidate();
// Ensure that all children are positioned after key().
// If we are moving in the forward direction, it is already
// true for all of the non-current_ children since current_ is
// the smallest child and key() == current_->key(). Otherwise,
// we explicitly position the non-current_ children.
if (_dir != direction::forward) {
for (auto& child : _children) {
if (child.get() != _current) {
co_await child->seek(key());
if (child->valid() && key() == child->key()) {
if (child.get() != current) {
co_await child->seek(current->key());
if (child->valid() && current->key() == child->key()) {
co_await child->next();
}
}
}
_dir = direction::forward;
}
co_await _current->next();
co_await current->next();
find_smallest();
}
ss::future<> prev() override {
dassert(valid(), "prev must be called on a valid iterator");
iterator* current = _current;
invalidate();
// Ensure that all children are positioned after key().
// If we are moving in the forward direction, it is already
// true for all of the non-current_ children since current_ is
// the smallest child and key() == current_->key(). Otherwise,
// we explicitly position the non-current_ children.
if (_dir != direction::backward) {
for (auto& child : _children) {
if (child.get() != _current) {
co_await child->seek(key());
if (child.get() != current) {
co_await child->seek(current->key());
if (child->valid()) {
// Child is at first entry >= key. Step back one to be <
// key().
Expand All @@ -95,7 +104,7 @@ class merging_iterator : public iterator {
}
_dir = direction::backward;
}
co_await _current->prev();
co_await current->prev();
find_largest();
}
key_view key() override {
Expand All @@ -108,6 +117,9 @@ class merging_iterator : public iterator {
}

private:
// Reset to !valid() so a thrown await doesn't leave stale state.
void invalidate() { _current = nullptr; }

void find_smallest() {
iterator* smallest = nullptr;
for (auto& child : _children) {
Expand Down
32 changes: 32 additions & 0 deletions src/v/lsm/core/internal/tests/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,18 @@ redpanda_test_cc_library(
],
)

redpanda_test_cc_library(
name = "throwing_iterator",
hdrs = ["throwing_iterator.h"],
visibility = ["//src/v/lsm:__subpackages__"],
deps = [
"//src/v/bytes:iobuf",
"//src/v/lsm/core/internal:iterator",
"//src/v/lsm/core/internal:keys",
"@seastar",
],
)

redpanda_cc_gtest(
name = "merging_iterator_test",
timeout = "short",
Expand All @@ -63,6 +75,7 @@ redpanda_cc_gtest(
],
deps = [
":iterator_test_harness",
":throwing_iterator",
"//src/v/base",
"//src/v/lsm/core/internal:iterator",
"//src/v/lsm/core/internal:keys",
Expand All @@ -72,3 +85,22 @@ redpanda_cc_gtest(
"@seastar",
],
)

redpanda_cc_gtest(
name = "two_level_iterator_test",
timeout = "short",
srcs = [
"two_level_iterator_test.cc",
],
deps = [
":throwing_iterator",
"//src/v/base",
"//src/v/bytes:iobuf",
"//src/v/lsm/core/internal:iterator",
"//src/v/lsm/core/internal:keys",
"//src/v/lsm/core/internal:two_level_iterator",
"//src/v/test_utils:gtest",
"@googletest//:gtest",
"@seastar",
],
)
Loading
Loading