diff --git a/db.go b/db.go index 5d3e26496..2ac0ffefc 100644 --- a/db.go +++ b/db.go @@ -143,7 +143,7 @@ type DB struct { batch *batch rwlock sync.Mutex // Allows only one writer at a time. - metalock sync.Mutex // Protects meta page access. + metalock sync.RWMutex // Protects meta page access. mmaplock sync.RWMutex // Protects mmap access during remapping. statlock sync.RWMutex // Protects stats access. @@ -422,6 +422,7 @@ func (db *DB) getPageSizeFromSecondMeta() (int, bool, error) { func (db *DB) loadFreelist() { db.freelistLoad.Do(func() { db.freelist = newFreelist(db.FreelistType) + db.freelist.AddCurrentTXID(db.meta().Txid()) if !db.hasSyncedFreelist() { // Reconstruct free list by scanning the DB. db.freelist.Init(db.freepages()) @@ -776,7 +777,7 @@ func (db *DB) beginTx() (*Tx, error) { // Lock the meta pages while we initialize the transaction. We obtain // the meta lock before the mmap lock because that's the order that the // write transaction will obtain them. - db.metalock.Lock() + db.metalock.RLock() // Obtain a read-only lock on the mmap. When the mmap is remapped it will // obtain a write lock so all transactions must finish before it can be @@ -786,14 +787,14 @@ func (db *DB) beginTx() (*Tx, error) { // Exit if the database is not open yet. if !db.opened { db.mmaplock.RUnlock() - db.metalock.Unlock() + db.metalock.RUnlock() return nil, berrors.ErrDatabaseNotOpen } // Exit if the database is not correctly mapped. if db.data == nil { db.mmaplock.RUnlock() - db.metalock.Unlock() + db.metalock.RUnlock() return nil, berrors.ErrInvalidMapping } @@ -806,7 +807,7 @@ func (db *DB) beginTx() (*Tx, error) { } // Unlock the meta pages. - db.metalock.Unlock() + db.metalock.RUnlock() // Update the transaction stats. if db.stats != nil { @@ -831,8 +832,8 @@ func (db *DB) beginRWTx() (*Tx, error) { // Once we have the writer lock then we can lock the meta pages so that // we can set up the transaction. - db.metalock.Lock() - defer db.metalock.Unlock() + db.metalock.RLock() + defer db.metalock.RUnlock() // Exit if the database is not open yet. if !db.opened { @@ -860,14 +861,14 @@ func (db *DB) removeTx(tx *Tx) { db.mmaplock.RUnlock() // Use the meta lock to restrict access to the DB object. - db.metalock.Lock() + db.metalock.RLock() if db.freelist != nil { db.freelist.RemoveReadonlyTXID(tx.meta.Txid()) } // Unlock the meta pages. - db.metalock.Unlock() + db.metalock.RUnlock() // Merge statistics. if db.stats != nil { diff --git a/internal/freelist/freelist.go b/internal/freelist/freelist.go index 2b819506b..5c2fba1e3 100644 --- a/internal/freelist/freelist.go +++ b/internal/freelist/freelist.go @@ -22,6 +22,9 @@ type Interface interface { // Init initializes this freelist with the given list of pages. Init(ids common.Pgids) + // AddCurrentTXID adds the latest known ID to the list. + AddCurrentTXID(tid common.Txid) + // Allocate tries to allocate the given number of contiguous pages // from the free list pages. It returns the starting page ID if // available; otherwise, it returns 0. diff --git a/internal/freelist/shared.go b/internal/freelist/shared.go index f2d113008..879763905 100644 --- a/internal/freelist/shared.go +++ b/internal/freelist/shared.go @@ -3,7 +3,9 @@ package freelist import ( "fmt" "math" + "slices" "sort" + "sync/atomic" "unsafe" "go.etcd.io/bbolt/internal/common" @@ -15,13 +17,18 @@ type txPending struct { lastReleaseBegin common.Txid // beginning txid of last matching releaseRange } +type txIdReference struct { + txid common.Txid + refs atomic.Int32 +} + type shared struct { Interface - readonlyTXIDs []common.Txid // all readonly transaction IDs. - allocs map[common.Pgid]common.Txid // mapping of Txid that allocated a pgid. - cache map[common.Pgid]struct{} // fast lookup of all free and pending page ids. - pending map[common.Txid]*txPending // mapping of soon-to-be free page ids by tx. + readerRefs []*txIdReference + allocs map[common.Pgid]common.Txid // mapping of Txid that allocated a pgid. + cache map[common.Pgid]struct{} // fast lookup of all free and pending page ids. + pending map[common.Txid]*txPending // mapping of soon-to-be free page ids by tx. } func newShared() *shared { @@ -118,15 +125,18 @@ func (t *shared) Rollback(txid common.Txid) { } func (t *shared) AddReadonlyTXID(tid common.Txid) { - t.readonlyTXIDs = append(t.readonlyTXIDs, tid) + for _, r := range t.readerRefs { + if r.txid == tid { + r.refs.Add(1) + break + } + } } func (t *shared) RemoveReadonlyTXID(tid common.Txid) { - for i := range t.readonlyTXIDs { - if t.readonlyTXIDs[i] == tid { - last := len(t.readonlyTXIDs) - 1 - t.readonlyTXIDs[i] = t.readonlyTXIDs[last] - t.readonlyTXIDs = t.readonlyTXIDs[:last] + for _, r := range t.readerRefs { + if r.txid == tid { + r.refs.Add(-1) break } } @@ -140,23 +150,37 @@ func (t txIDx) Less(i, j int) bool { return t[i] < t[j] } func (t *shared) ReleasePendingPages() { // Free all pending pages prior to the earliest open transaction. - sort.Sort(txIDx(t.readonlyTXIDs)) minid := common.Txid(math.MaxUint64) - if len(t.readonlyTXIDs) > 0 { - minid = t.readonlyTXIDs[0] + + for i := range t.readerRefs { + if t.readerRefs[i].refs.Load() != 0 && minid > t.readerRefs[i].txid { + minid = t.readerRefs[i].txid + } } if minid > 0 { t.release(minid - 1) } // Release unused txid extents. - for _, tid := range t.readonlyTXIDs { - t.releaseRange(minid, tid-1) - minid = tid + 1 + for _, e := range t.readerRefs { + t.releaseRange(minid, e.txid-1) + minid = e.txid + 1 } t.releaseRange(minid, common.Txid(math.MaxUint64)) // Any page both allocated and freed in an extent is safe to release. } +func (t *shared) AddCurrentTXID(tid common.Txid) { + inUseID := tid - 1 // New readers can still use it till meta pages are updated. + + t.readerRefs = slices.DeleteFunc(t.readerRefs, func(e *txIdReference) bool { + // tid can be left by previous unsuccessful transaction, there + // can't be any readers using it, but we'll append it anyway + // below (this just simplifies code). + return (e.txid < inUseID && e.refs.Load() == 0) || e.txid == tid + }) + t.readerRefs = append(t.readerRefs, &txIdReference{txid: tid}) +} + func (t *shared) release(txid common.Txid) { m := make(common.Pgids, 0) for tid, txp := range t.pending { diff --git a/tx.go b/tx.go index f32a20931..8c4567f46 100644 --- a/tx.go +++ b/tx.go @@ -569,6 +569,7 @@ func (tx *Tx) writeMeta() error { lg.Errorf("writeAt failed, pgid: %d, pageSize: %d, error: %v", p.Id(), tx.db.pageSize, err) return err } + tx.db.freelist.AddCurrentTXID(tx.meta.Txid()) tx.db.metalock.Unlock() if !tx.db.NoSync || common.IgnoreNoSync { // gofail: var beforeSyncMetaPage struct{}