From a5c72f7ed85b4a6acf94428f1e59f0417220dd9e Mon Sep 17 00:00:00 2001 From: Luit van Drongelen Date: Wed, 28 Jan 2026 11:16:52 +0100 Subject: [PATCH 1/4] fix limit scanner use in sweeper The LimitScanner's use of a sentinel error inside an LMDB transaction aborted the transaction. I instead moved the "limit reached" marker to the cursor returned by Last(). --- lmdbenv/limitscanner/scanner.go | 38 +++++++++++++--------------- lmdbenv/limitscanner/scanner_test.go | 33 +++++++++++++++--------- syncer/sweeper/sweeper.go | 8 ++++-- syncer/sweeper/sweeper_test.go | 18 ++++++++++--- 4 files changed, 59 insertions(+), 38 deletions(-) diff --git a/lmdbenv/limitscanner/scanner.go b/lmdbenv/limitscanner/scanner.go index 868881f..6ee38bd 100644 --- a/lmdbenv/limitscanner/scanner.go +++ b/lmdbenv/limitscanner/scanner.go @@ -9,15 +9,12 @@ import ( "github.com/PowerDNS/lmdb-go/lmdbscan" ) -// ErrLimitReached is returned when the LimitScanner reaches the configured limit -var ErrLimitReached = errors.New("limit reached") - func NewLimitScanner(opt Options) (*LimitScanner, error) { if opt.Txn == nil { - panic("limit scanner requires Options.Txn") + return nil, errors.New("limit scanner requires Options.Txn") } if opt.DBI == 0 { - panic("limit scanner requires Options.DBI") + return nil, errors.New("limit scanner requires Options.DBI") } if opt.LimitDurationCheckEvery <= 0 { opt.LimitDurationCheckEvery = LimitDurationCheckEveryDefault @@ -64,15 +61,15 @@ func (c LimitCursor) IsZero() bool { // LimitScanner allows iteration over chunks of the LMDB for processing. // The chunk size can either be given as a number or as a time limit. type LimitScanner struct { - opt Options - sc *lmdbscan.Scanner - count int - deadline time.Time - err error + opt Options + sc *lmdbscan.Scanner + count int + deadline time.Time + limitReached bool } func (s *LimitScanner) Scan() bool { - if s.err != nil { + if s.limitReached { return false } @@ -81,7 +78,7 @@ func (s *LimitScanner) Scan() bool { // If the entry still exists, it will be the first one (basically // rescanning the last entry). If it is gone, we will start from the // next one after that. - //s.sc.SetNext(s.opt.Last.key, s.opt.Last.val, lmdb.SetRange, lmdb.Next) + // s.sc.SetNext(s.opt.Last.key, s.opt.Last.val, lmdb.SetRange, lmdb.Next) s.sc.Set(s.opt.Last.key, s.opt.Last.val, lmdb.SetRange) if bytes.Equal(s.Key(), s.opt.Last.key) && bytes.Equal(s.Val(), s.opt.Last.val) { // Advance one @@ -91,7 +88,7 @@ func (s *LimitScanner) Scan() bool { // Check number-of-records limit if s.opt.LimitRecords > 0 && s.count >= s.opt.LimitRecords { - s.err = ErrLimitReached + s.limitReached = true return false } @@ -99,7 +96,7 @@ func (s *LimitScanner) Scan() bool { checkEvery := s.opt.LimitDurationCheckEvery if checkEvery > 0 && s.count > 0 && s.count%checkEvery == 0 && !s.deadline.IsZero() { if time.Now().After(s.deadline) { - s.err = ErrLimitReached + s.limitReached = true return false } } @@ -108,17 +105,19 @@ func (s *LimitScanner) Scan() bool { return s.sc.Scan() } +// Last returns the LimitCursor for the next LimitScanner to use in +// (Options).Last. If this scanner didn't run into any limits Last will return a +// zero LimitCursor. func (s *LimitScanner) Last() LimitCursor { + if !s.limitReached { + return LimitCursor{} + } return LimitCursor{ key: s.Key(), val: s.Val(), } } -func (s *LimitScanner) Cursor() *lmdb.Cursor { - return s.sc.Cursor() -} - func (s *LimitScanner) Key() []byte { return s.sc.Key() } @@ -128,9 +127,6 @@ func (s *LimitScanner) Val() []byte { } func (s *LimitScanner) Err() error { - if s.err != nil { - return s.err - } return s.sc.Err() } diff --git a/lmdbenv/limitscanner/scanner_test.go b/lmdbenv/limitscanner/scanner_test.go index 4d3fadd..068924b 100644 --- a/lmdbenv/limitscanner/scanner_test.go +++ b/lmdbenv/limitscanner/scanner_test.go @@ -1,7 +1,6 @@ package limitscanner import ( - "errors" "fmt" "testing" "time" @@ -40,7 +39,8 @@ func TestLimitScanner(t *testing.T) { DBI: dbi, LimitRecords: 100, }) - require.NoError(t, err) + assert.NoError(t, err) + defer ls.Close() count := 0 for ls.Scan() { @@ -54,7 +54,8 @@ func TestLimitScanner(t *testing.T) { return ls.Err() }) - require.True(t, errors.Is(err, ErrLimitReached), "expected ErrLimitReached") + require.NoError(t, err) + require.False(t, last.IsZero(), "expected a limited scan") }) t.Run("limited-scan-continued", func(t *testing.T) { @@ -66,7 +67,8 @@ func TestLimitScanner(t *testing.T) { LimitRecords: 100, Last: last, // Note this added cursor }) - require.NoError(t, err) + assert.NoError(t, err) + defer ls.Close() count := 0 for ls.Scan() { @@ -80,7 +82,7 @@ func TestLimitScanner(t *testing.T) { return ls.Err() }) - require.True(t, errors.Is(err, ErrLimitReached), "expected ErrLimitReached") + require.False(t, last.IsZero(), "expected a limited scan") }) t.Run("limited-scan-continued-deleted", func(t *testing.T) { @@ -96,7 +98,8 @@ func TestLimitScanner(t *testing.T) { LimitRecords: 10, Last: last, }) - require.NoError(t, err) + assert.NoError(t, err) + defer ls.Close() count := 0 for ls.Scan() { @@ -110,7 +113,8 @@ func TestLimitScanner(t *testing.T) { return ls.Err() }) - require.True(t, errors.Is(err, ErrLimitReached), "expected ErrLimitReached") + require.NoError(t, err) + require.False(t, last.IsZero(), "expected a limited scan") }) t.Run("limited-scan-final", func(t *testing.T) { @@ -122,7 +126,8 @@ func TestLimitScanner(t *testing.T) { LimitRecords: 100, Last: last, }) - require.NoError(t, err) + assert.NoError(t, err) + defer ls.Close() count := 0 for ls.Scan() { @@ -137,6 +142,7 @@ func TestLimitScanner(t *testing.T) { return ls.Err() }) require.NoError(t, err) + require.True(t, last.IsZero(), "unexpected limited scan") }) t.Run("limited-by-time", func(t *testing.T) { @@ -148,7 +154,8 @@ func TestLimitScanner(t *testing.T) { LimitDuration: time.Nanosecond, // very short LimitDurationCheckEvery: 50, // check time every 50 records }) - require.NoError(t, err) + assert.NoError(t, err) + defer ls.Close() count := 0 for ls.Scan() { @@ -164,7 +171,8 @@ func TestLimitScanner(t *testing.T) { return ls.Err() }) - require.True(t, errors.Is(err, ErrLimitReached), "expected ErrLimitReached") + require.NoError(t, err) + require.False(t, last.IsZero(), "expected a limited scan") }) t.Run("limited-by-plenty-of-time", func(t *testing.T) { @@ -176,7 +184,8 @@ func TestLimitScanner(t *testing.T) { LimitDuration: time.Second, // an eternity LimitDurationCheckEvery: 50, // check time every 50 records }) - require.NoError(t, err) + assert.NoError(t, err) + defer ls.Close() count := 0 for ls.Scan() { @@ -193,10 +202,10 @@ func TestLimitScanner(t *testing.T) { return ls.Err() }) require.NoError(t, err) + require.True(t, last.IsZero(), "unexpected limited scan") }) return nil }) require.NoError(t, err) - } diff --git a/syncer/sweeper/sweeper.go b/syncer/sweeper/sweeper.go index 596e3ef..5990afa 100644 --- a/syncer/sweeper/sweeper.go +++ b/syncer/sweeper/sweeper.go @@ -4,6 +4,7 @@ import ( "context" "errors" "fmt" + "runtime" "strings" "time" @@ -49,7 +50,7 @@ type Sweeper struct { } // Run runs the sweeper according to the configured schedule. -// It only runs when an error occurs or the context is closed. +// It only runs until an error occurs or the context is closed. func (s *Sweeper) Run(ctx context.Context) error { wait := s.conf.FirstInterval for { @@ -72,6 +73,8 @@ func (s *Sweeper) Run(ctx context.Context) error { // sweep performs a single full database sweep. func (s *Sweeper) sweep(ctx context.Context) error { + runtime.LockOSThread() + defer runtime.UnlockOSThread() t0 := time.Now() retention := s.conf.RetentionDuration() @@ -127,6 +130,7 @@ func (s *Sweeper) sweep(ctx context.Context) error { if err != nil { return err // configuration error } + defer ls.Close() // Actual cleaning for ls.Scan() { @@ -159,7 +163,7 @@ func (s *Sweeper) sweep(ctx context.Context) error { last = ls.Last() return ls.Err() }) - if errors.Is(err, limitscanner.ErrLimitReached) { + if !last.IsZero() { l.Debug("Sweep limit reached, continuing after pause") // Give the app some room to get a write lock before continuing if err := utils.SleepContext(ctx, s.conf.ReleaseDuration); err != nil { diff --git a/syncer/sweeper/sweeper_test.go b/syncer/sweeper/sweeper_test.go index 45bf50a..52bd421 100644 --- a/syncer/sweeper/sweeper_test.go +++ b/syncer/sweeper/sweeper_test.go @@ -103,7 +103,7 @@ func BenchmarkSweeper(b *testing.B) { RetentionDays: 2, Interval: time.Second, FirstInterval: 0, - LockDuration: 100 * time.Millisecond, // Forces split operation + LockDuration: 10 * time.Millisecond, // Forces split operation ReleaseDuration: 0, } @@ -116,7 +116,7 @@ func BenchmarkSweeper(b *testing.B) { var dbi lmdb.DBI err := env.Update(func(txn *lmdb.Txn) error { var err error - dbi, err = txn.CreateDBI("test1") + dbi, err = txn.CreateDBI(name) return err }) assert.NoError(b, err) @@ -124,7 +124,7 @@ func BenchmarkSweeper(b *testing.B) { } now := time.Now() - past := now.Add(-50 * time.Hour) // longer than RetentionDays=2 + past := now.Add(-3 * 24 * time.Hour) // longer than RetentionDays=2 nowTS := header.TimestampFromTime(now) pastTS := header.TimestampFromTime(past) @@ -155,6 +155,18 @@ func BenchmarkSweeper(b *testing.B) { b.ResetTimer() err := sweeper.sweep(b.Context()) b.StopTimer() + + if b.N > 2 { + assert.NoError(b, env.Update(func(txn *lmdb.Txn) error { + stat, err := txn.Stat(mix) + if err != nil { + return err + } + assert.Greater(b, b.N-int(stat.Entries), b.N/4, "less than a 1/4 entries were cleaned") + return nil + })) + } + // This is the most interesting metric b.ReportMetric(float64(b.N)/b.Elapsed().Seconds(), "entries/s") From 4007ea6d8b8bdeb06186c7ad19c7555addff1a04 Mon Sep 17 00:00:00 2001 From: Luit van Drongelen Date: Wed, 28 Jan 2026 16:50:51 +0100 Subject: [PATCH 2/4] improve passing the LimitCursor with limitReached bool --- lmdbenv/limitscanner/scanner.go | 13 +++++-------- lmdbenv/limitscanner/scanner_test.go | 25 +++++++++++++------------ syncer/sweeper/sweeper.go | 5 +++-- 3 files changed, 21 insertions(+), 22 deletions(-) diff --git a/lmdbenv/limitscanner/scanner.go b/lmdbenv/limitscanner/scanner.go index 6ee38bd..9e533a1 100644 --- a/lmdbenv/limitscanner/scanner.go +++ b/lmdbenv/limitscanner/scanner.go @@ -105,17 +105,14 @@ func (s *LimitScanner) Scan() bool { return s.sc.Scan() } -// Last returns the LimitCursor for the next LimitScanner to use in -// (Options).Last. If this scanner didn't run into any limits Last will return a -// zero LimitCursor. -func (s *LimitScanner) Last() LimitCursor { - if !s.limitReached { - return LimitCursor{} - } +// Cursor returns the LimitCursor for the next LimitScanner to use in +// (Options).Cursor, and whether the limit was reached by this LimitScanner +// (meaning this cursor is actually useful). +func (s *LimitScanner) Cursor() (c LimitCursor, limitReached bool) { return LimitCursor{ key: s.Key(), val: s.Val(), - } + }, s.limitReached } func (s *LimitScanner) Key() []byte { diff --git a/lmdbenv/limitscanner/scanner_test.go b/lmdbenv/limitscanner/scanner_test.go index 068924b..a784077 100644 --- a/lmdbenv/limitscanner/scanner_test.go +++ b/lmdbenv/limitscanner/scanner_test.go @@ -32,6 +32,7 @@ func TestLimitScanner(t *testing.T) { require.NoError(t, err) var last LimitCursor + var limitReached bool t.Run("limited-scan", func(t *testing.T) { err = env.View(func(txn *lmdb.Txn) error { ls, err := NewLimitScanner(Options{ @@ -48,14 +49,14 @@ func TestLimitScanner(t *testing.T) { } assert.Equal(t, 100, count) - last = ls.Last() + last, limitReached = ls.Cursor() assert.Equal(t, "key-00100", string(last.key)) assert.Equal(t, "val-00100", string(last.val)) return ls.Err() }) require.NoError(t, err) - require.False(t, last.IsZero(), "expected a limited scan") + require.True(t, limitReached, "expected a limited scan") }) t.Run("limited-scan-continued", func(t *testing.T) { @@ -76,13 +77,13 @@ func TestLimitScanner(t *testing.T) { } assert.Equal(t, 100, count) - last = ls.Last() + last, limitReached = ls.Cursor() assert.Equal(t, "key-00200", string(last.key)) assert.Equal(t, "val-00200", string(last.val)) return ls.Err() }) - require.False(t, last.IsZero(), "expected a limited scan") + require.True(t, limitReached, "expected a limited scan") }) t.Run("limited-scan-continued-deleted", func(t *testing.T) { @@ -107,14 +108,14 @@ func TestLimitScanner(t *testing.T) { } assert.Equal(t, 10, count) - last = ls.Last() + last, limitReached = ls.Cursor() assert.Equal(t, "key-00210", string(last.key)) assert.Equal(t, "val-00210", string(last.val)) return ls.Err() }) require.NoError(t, err) - require.False(t, last.IsZero(), "expected a limited scan") + require.True(t, limitReached, "expected a limited scan") }) t.Run("limited-scan-final", func(t *testing.T) { @@ -135,14 +136,14 @@ func TestLimitScanner(t *testing.T) { } assert.Equal(t, 40, count) - last = ls.Last() + last, limitReached = ls.Cursor() assert.Nil(t, last.key) assert.Nil(t, last.val) return ls.Err() }) require.NoError(t, err) - require.True(t, last.IsZero(), "unexpected limited scan") + require.False(t, limitReached, "unexpected limited scan") }) t.Run("limited-by-time", func(t *testing.T) { @@ -165,14 +166,14 @@ func TestLimitScanner(t *testing.T) { // before we realize we passed the short deadline. assert.Equal(t, 50, count) - last = ls.Last() + last, limitReached = ls.Cursor() assert.Equal(t, "key-00050", string(last.key)) assert.Equal(t, "val-00050", string(last.val)) return ls.Err() }) require.NoError(t, err) - require.False(t, last.IsZero(), "expected a limited scan") + require.True(t, limitReached, "expected a limited scan") }) t.Run("limited-by-plenty-of-time", func(t *testing.T) { @@ -195,14 +196,14 @@ func TestLimitScanner(t *testing.T) { // (note that we deleted one before) assert.Equal(t, 249, count) - last = ls.Last() + last, limitReached = ls.Cursor() assert.Nil(t, last.key) assert.Nil(t, last.val) return ls.Err() }) require.NoError(t, err) - require.True(t, last.IsZero(), "unexpected limited scan") + require.False(t, limitReached, "unexpected limited scan") }) return nil diff --git a/syncer/sweeper/sweeper.go b/syncer/sweeper/sweeper.go index 5990afa..b992085 100644 --- a/syncer/sweeper/sweeper.go +++ b/syncer/sweeper/sweeper.go @@ -111,6 +111,7 @@ func (s *Sweeper) sweep(ctx context.Context) error { l.Debug("Sweep DBI") var last limitscanner.LimitCursor + var limitReached bool for { err := s.env.Update(func(txn *lmdb.Txn) error { st.nTxn++ @@ -160,10 +161,10 @@ func (s *Sweeper) sweep(ctx context.Context) error { } } - last = ls.Last() + last, limitReached = ls.Cursor() return ls.Err() }) - if !last.IsZero() { + if limitReached { l.Debug("Sweep limit reached, continuing after pause") // Give the app some room to get a write lock before continuing if err := utils.SleepContext(ctx, s.conf.ReleaseDuration); err != nil { From dc5571cb919f339745e15a96c21e665479036b33 Mon Sep 17 00:00:00 2001 From: Luit van Drongelen Date: Thu, 29 Jan 2026 09:36:35 +0100 Subject: [PATCH 3/4] below b.N == 9 the expected cleaned entries can still be < 25% --- syncer/sweeper/sweeper_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/syncer/sweeper/sweeper_test.go b/syncer/sweeper/sweeper_test.go index 52bd421..9a73d86 100644 --- a/syncer/sweeper/sweeper_test.go +++ b/syncer/sweeper/sweeper_test.go @@ -156,7 +156,7 @@ func BenchmarkSweeper(b *testing.B) { err := sweeper.sweep(b.Context()) b.StopTimer() - if b.N > 2 { + if b.N > 8 { assert.NoError(b, env.Update(func(txn *lmdb.Txn) error { stat, err := txn.Stat(mix) if err != nil { From 0141333c2eeb9b44a70d0de81ff30d956401e63a Mon Sep 17 00:00:00 2001 From: Luit van Drongelen Date: Thu, 29 Jan 2026 15:04:08 +0100 Subject: [PATCH 4/4] remove Lock-/UnlockOSThread --- syncer/sweeper/sweeper.go | 3 --- 1 file changed, 3 deletions(-) diff --git a/syncer/sweeper/sweeper.go b/syncer/sweeper/sweeper.go index b992085..8094bcf 100644 --- a/syncer/sweeper/sweeper.go +++ b/syncer/sweeper/sweeper.go @@ -4,7 +4,6 @@ import ( "context" "errors" "fmt" - "runtime" "strings" "time" @@ -73,8 +72,6 @@ func (s *Sweeper) Run(ctx context.Context) error { // sweep performs a single full database sweep. func (s *Sweeper) sweep(ctx context.Context) error { - runtime.LockOSThread() - defer runtime.UnlockOSThread() t0 := time.Now() retention := s.conf.RetentionDuration()