diff --git a/src/Index/ReadOnlyIndex.js b/src/Index/ReadOnlyIndex.js index fa1db0d..bd85307 100644 --- a/src/Index/ReadOnlyIndex.js +++ b/src/Index/ReadOnlyIndex.js @@ -24,9 +24,9 @@ class ReadOnlyIndex extends watchesFile(ReadableIndex) { if (!this.fd) { return; } - const prevLength = this.data.length; + const prevLength = this.cache.length; const newLength = this.readFileLength(); - this.data.length = newLength; + this.cache.truncate(newLength); if (newLength > prevLength) { this.emit('append', prevLength, newLength); } diff --git a/src/Index/ReadableIndex.js b/src/Index/ReadableIndex.js index c998d4d..fc82ef6 100644 --- a/src/Index/ReadableIndex.js +++ b/src/Index/ReadableIndex.js @@ -3,6 +3,7 @@ const path = require('path'); const events = require('events'); const Entry = require('../IndexEntry'); const { assert, wrapAndCheck, binarySearch } = require('../util'); +const RingBuffer = require('./RingBuffer'); // node-event-store-index V01 const HEADER_MAGIC = "nesidx01"; @@ -38,6 +39,7 @@ class ReadableIndex extends events.EventEmitter { * @param {object} [options] An object with additional index options. * @param {typeof EntryInterface} [options.EntryClass] The entry class to use for index items. Must implement the EntryInterface methods. * @param {string} [options.dataDirectory] The directory to store the index file in. Default '.'. + * @param {number} [options.cacheSize] The number of most-recent index entries to keep in memory. Default 1024. * @param {number} [options.writeBufferSize] The number of bytes to use for the write buffer. Default 4096. * @param {number} [options.flushDelay] How many ms to delay the write buffer flush to optimize throughput. Default 100. * @param {object} [options.metadata] An object containing the metadata information for this index. Will be written on initial creation and checked on subsequent openings. @@ -65,8 +67,8 @@ class ReadableIndex extends events.EventEmitter { * @param {object} options */ initialize(options) { - /* @type Array */ - this.data = []; + const cacheSize = options.cacheSize !== undefined ? options.cacheSize : 1024; + this.cache = new RingBuffer(cacheSize); this.fd = null; this.fileMode = 'r'; this.EntryClass = options.EntryClass; @@ -100,7 +102,7 @@ class ReadableIndex extends events.EventEmitter { * @returns {number} */ get length() { - return this.data.length; + return this.cache.length; } /** @@ -155,7 +157,7 @@ class ReadableIndex extends events.EventEmitter { const length = this.readFileLength(); if (length > 0) { - this.data = new Array(length); + this.cache.truncate(length); // Read last item to get the index started this.read(length); } @@ -232,7 +234,7 @@ class ReadableIndex extends events.EventEmitter { * @api */ close() { - this.data = []; + this.cache.reset(); this.readUntil = -1; this.readBuffer.fill(0); if (this.fd) { @@ -250,21 +252,24 @@ class ReadableIndex extends events.EventEmitter { * @returns {Entry} The index entry at the given position. */ read(index) { - index = Number(index) - 1; + const zeroBasedIndex = Number(index) - 1; - fs.readSync(this.fd, this.readBuffer, 0, this.EntryClass.size, this.headerSize + index * this.EntryClass.size); - if (index === this.readUntil + 1) { + fs.readSync(this.fd, this.readBuffer, 0, this.EntryClass.size, this.headerSize + zeroBasedIndex * this.EntryClass.size); + if (zeroBasedIndex === this.readUntil + 1) { this.readUntil++; } - this.data[index] = this.EntryClass.fromBuffer(this.readBuffer); - - return this.data[index]; + const entry = this.EntryClass.fromBuffer(this.readBuffer); + this.cache.set(zeroBasedIndex, entry); + return entry; } /** * Read a range of entries from disk. This method will not do any range checks. * It will however optimize to prevent reading entries that have already been read sequentially from start. * + * Entries within the cache window are stored in the cache; entries outside the window + * (older than cacheSize) are read from disk for the return value but not cached. + * * @private * @param {number} from The 1-based index position from where to read from (inclusive). * @param {number} until The 1-based index position until which to read to (inclusive). @@ -275,25 +280,58 @@ class ReadableIndex extends events.EventEmitter { return [this.read(from)]; } - from--; - until--; - - const readFrom = Math.max(this.readUntil + 1, from); - const amount = (until - readFrom + 1); - - const readBuffer = Buffer.allocUnsafe(amount * this.EntryClass.size); - let readSize = fs.readSync(this.fd, readBuffer, 0, readBuffer.byteLength, this.headerSize + readFrom * this.EntryClass.size); - let index = 0; - while (index < amount && readSize > 0) { - this.data[index + readFrom] = this.EntryClass.fromBuffer(readBuffer, index * this.EntryClass.size); - readSize -= this.EntryClass.size; - index++; + const zeroBasedFrom = from - 1; + const zeroBasedUntil = until - 1; + const cacheStart = this.cache.windowStart; + + // Build the result array up front + const result = new Array(zeroBasedUntil - zeroBasedFrom + 1); + + // Part 1: Out-of-window entries [zeroBasedFrom, min(cacheStart-1, zeroBasedUntil)] — read from disk, do not cache + const outEnd = Math.min(cacheStart - 1, zeroBasedUntil); + if (zeroBasedFrom < cacheStart && outEnd >= zeroBasedFrom) { + const count = outEnd - zeroBasedFrom + 1; + const outBuf = Buffer.allocUnsafe(count * this.EntryClass.size); + const bytesRead = fs.readSync(this.fd, outBuf, 0, outBuf.byteLength, this.headerSize + zeroBasedFrom * this.EntryClass.size); + const entries = Math.floor(bytesRead / this.EntryClass.size); + for (let idx = 0; idx < entries; idx++) { + result[idx] = this.EntryClass.fromBuffer(outBuf, idx * this.EntryClass.size); + } } - if (from <= this.readUntil + 1) { - this.readUntil = Math.max(this.readUntil, until); + + // Part 2: In-window entries [max(cacheStart, zeroBasedFrom), zeroBasedUntil] — use cache + disk for uncached ones + const inStart = Math.max(cacheStart, zeroBasedFrom); + if (inStart <= zeroBasedUntil) { + // Optimisation: skip entries already loaded sequentially into the cache + const readFrom = Math.max(this.readUntil + 1, inStart); + + // Trim trailing entries already present in the cache + let readUntilPos = zeroBasedUntil; + while (readUntilPos >= readFrom && this.cache.get(readUntilPos)) { + readUntilPos--; + } + + if (readFrom <= readUntilPos) { + const count = readUntilPos - readFrom + 1; + const inBuf = Buffer.allocUnsafe(count * this.EntryClass.size); + const bytesRead = fs.readSync(this.fd, inBuf, 0, inBuf.byteLength, this.headerSize + readFrom * this.EntryClass.size); + const entries = Math.floor(bytesRead / this.EntryClass.size); + for (let idx = 0; idx < entries; idx++) { + const index = readFrom + idx; + this.cache.set(index, this.EntryClass.fromBuffer(inBuf, idx * this.EntryClass.size)); + } + if (inStart <= this.readUntil + 1) { + this.readUntil = Math.max(this.readUntil, readUntilPos); + } + } + + // Fill the result from the cache for the in-window portion + for (let index = inStart; index <= zeroBasedUntil; index++) { + result[index - zeroBasedFrom] = this.cache.get(index); + } } - return this.data.slice(from, until + 1); + return result; } /** @@ -318,14 +356,13 @@ class ReadableIndex extends events.EventEmitter { * @returns {Entry|boolean} The entry at the given index position or false if out of bounds. */ get(index) { - index = wrapAndCheck(index, this.length); + index = wrapAndCheck(index, this.cache.length); if (index <= 0) { return false; } - if (this.data[index - 1]) { - return this.data[index - 1]; - } + const cached = this.cache.get(index - 1); + if (cached) return cached; return this.read(index); } @@ -354,24 +391,37 @@ class ReadableIndex extends events.EventEmitter { * @returns {Array|boolean} An array of entries for the given range or false on error. */ range(from, until = -1) { - from = wrapAndCheck(from, this.length); - until = wrapAndCheck(until, this.length); + from = wrapAndCheck(from, this.cache.length); + until = wrapAndCheck(until, this.cache.length); if (from <= 0 || until < from) { return false; } - const readFrom = Math.max(this.readUntil + 1, from); - let readUntil = until; - while (readUntil >= readFrom && this.data[readUntil - 1]) { - readUntil--; + const zeroBasedFrom = from - 1; + const zeroBasedUntil = until - 1; + const cacheStart = this.cache.windowStart; + + // Determine if any disk reads are required + const hasOutOfWindow = zeroBasedFrom < cacheStart; + const inStart = Math.max(cacheStart, zeroBasedFrom); + const readFrom = Math.max(this.readUntil + 1, inStart); + let needsDiskRead = hasOutOfWindow; + if (!needsDiskRead && inStart <= zeroBasedUntil) { + // Scan backwards for uncached in-window tail entries + let scanUntil = zeroBasedUntil; + while (scanUntil >= readFrom && this.cache.get(scanUntil)) { + scanUntil--; + } + needsDiskRead = readFrom <= scanUntil; } - if (readFrom <= readUntil) { - this.readRange(readFrom, readUntil); + if (needsDiskRead) { + return this.readRange(from, until); } - return this.data.slice(from - 1, until); + // All required entries are already in the cache — return a slice directly + return this.cache.slice(zeroBasedFrom, zeroBasedUntil); } /** diff --git a/src/Index/RingBuffer.js b/src/Index/RingBuffer.js new file mode 100644 index 0000000..447987b --- /dev/null +++ b/src/Index/RingBuffer.js @@ -0,0 +1,149 @@ +/** + * A fixed-capacity ring buffer used as an index entry cache. + * + * Only the most-recent `capacity` entries are kept in memory. The buffer also + * tracks the total number of entries ever added (`length`) so callers can tell + * whether a slot is within the live in-memory window. + * + * API contract + * ------------ + * - `get(index)` — cached item at 0-based `index`, or `null` when the + * index is outside the in-memory window or the slot has + * not been written yet. + * - `set(index, item)` — stores `item` at 0-based `index` if it falls inside + * the current window; silently ignores out-of-window + * writes. + * - `add(item)` — appends `item` at position `length`, advances + * `length`, and returns the new length. + * - `truncate(newLength)` — discards entries from `newLength` onwards (nulls + * their slots) and sets `length = newLength`. Safe to + * call with `newLength >= length` (grow-only update). + * - `reset()` — clears all slots and resets `length` to 0. + */ +class RingBuffer { + + /** + * @param {number} capacity Maximum number of entries held in memory. + */ + constructor(capacity) { + this._capacity = Math.max(1, capacity >>> 0); // jshint ignore:line + this._buffer = new Array(this._capacity); + this._length = 0; + } + + /** + * Total number of items ever appended (not capped at capacity). + * @type {number} + */ + get length() { + return this._length; + } + + /** + * Maximum number of items kept in memory. + * @type {number} + */ + get capacity() { + return this._capacity; + } + + /** + * The smallest 0-based index that is currently inside the in-memory window. + * Indices below this value are not cached and require a disk read. + * @type {number} + */ + get windowStart() { + return Math.max(0, this._length - this._capacity); + } + + /** + * Return the cached item at the given 0-based index, or `null` if the + * index is outside the in-memory window or the slot has not been populated. + * + * @param {number} index 0-based position. + * @returns {*|null} + */ + get(index) { + if (index < this.windowStart) { + return null; + } + const item = this._buffer[index % this._capacity]; + return item !== undefined ? item : null; + } + + /** + * Store `item` at the given 0-based `index`. + * Writes outside the current in-memory window are silently ignored. + * + * @param {number} index 0-based position. + * @param {*} item + */ + set(index, item) { + if (index < this.windowStart) { + return; + } + this._buffer[index % this._capacity] = item; + } + + /** + * Append `item` at position `length` and advance `length`. + * + * @param {*} item + * @returns {number} The new length (1-based position of the appended item). + */ + add(item) { + this._buffer[this._length % this._capacity] = item; + this._length++; + return this._length; + } + + /** + * Discard entries from `newLength` onwards by nulling their cache slots, + * then set `length = newLength`. + * + * When `newLength >= length` no eviction is performed and only `length` is + * updated (useful when the underlying file has grown and the caller just + * needs to advance the length counter without populating new slots). + * + * @param {number} newLength The new total length. + */ + truncate(newLength) { + if (newLength < this._length) { + const cacheStart = this.windowStart; + for (let i = Math.max(cacheStart, newLength); i < this._length; i++) { + this._buffer[i % this._capacity] = null; + } + } + this._length = newLength; + } + + /** + * Return a copy of the cached items for the 0-based range [from, until] (inclusive). + * Both `from` and `until` must be within the current window (>= windowStart). + * + * When the range is contiguous in the internal buffer a single native slice is + * returned. When it wraps the two halves are concatenated. + * + * @param {number} from 0-based start position (inclusive). + * @param {number} until 0-based end position (inclusive). + * @returns {Array<*>} + */ + slice(from, until) { + const slotFrom = from % this._capacity; + const slotUntil = until % this._capacity; + if (slotFrom <= slotUntil) { + return this._buffer.slice(slotFrom, slotUntil + 1); + } + return this._buffer.slice(slotFrom).concat(this._buffer.slice(0, slotUntil + 1)); + } + + /** + * Clear all cached slots and reset `length` to 0. + */ + reset() { + this._buffer.fill(null); + this._length = 0; + } +} + +module.exports = RingBuffer; diff --git a/src/Index/WritableIndex.js b/src/Index/WritableIndex.js index 9cf8a88..fae7ee7 100644 --- a/src/Index/WritableIndex.js +++ b/src/Index/WritableIndex.js @@ -20,6 +20,7 @@ class WritableIndex extends ReadableIndex { * @param {object} [options] An object with additional index options. * @param {EntryInterface} [options.EntryClass] The entry class to use for index items. Must implement the EntryInterface methods. * @param {string} [options.dataDirectory] The directory to store the index file in. Default '.'. + * @param {number} [options.cacheSize] The number of most-recent index entries to keep in memory. Default 1024. * @param {number} [options.writeBufferSize] The number of bytes to use for the write buffer. Default 4096. * @param {number} [options.flushDelay] How many ms to delay the write buffer flush to optimize throughput. Default 100. * @param {object} [options.metadata] An object containing the metadata information for this index. Will be written on initial creation and checked on subsequent openings. @@ -192,22 +193,22 @@ class WritableIndex extends ReadableIndex { throw new Error('Consistency error. Tried to add an index that should come before existing last entry.'); } - if (this.readUntil === this.data.length - 1) { + if (this.readUntil === this.cache.length - 1) { this.readUntil++; } - this.data[this.data.length] = entry; + this.cache.add(entry); if (this.writeBufferCursor === 0) { this.flushTimeout = setTimeout(() => this.flush(), this.flushDelay); } this.writeBufferCursor += entry.toBuffer(this.writeBuffer, this.writeBufferCursor); - this.onFlush(callback, this.length); + this.onFlush(callback, this.cache.length); if (this.writeBufferCursor >= this.writeBuffer.byteLength) { this.flush(); } - return this.length; + return this.cache.length; } /** @@ -230,7 +231,8 @@ class WritableIndex extends ReadableIndex { return; } fs.truncateSync(this.fileName, truncatePosition); - this.data.splice(after); + + this.cache.truncate(after); this.readUntil = Math.min(this.readUntil, after); } } diff --git a/test/Index.spec.js b/test/Index.spec.js index 7652282..8b32fdf 100644 --- a/test/Index.spec.js +++ b/test/Index.spec.js @@ -504,6 +504,71 @@ describe('Index', function() { }); + describe('cacheSize / ring buffer', function() { + + it('limits in-memory entries to cacheSize (old entries still readable via disk)', function() { + const cacheSize = 5; + index = setupIndexWithEntries(20, { cacheSize }); + index.close(); + index.open(); + // All entries must be readable even though only the last `cacheSize` are ever in memory. + for (let i = 1; i <= 20; i++) { + const entry = index.get(i); + expect(entry).not.to.be(false); + expect(entry.number).to.be(i); + } + }); + + it('can still read entries outside the cache window from disk', function() { + const cacheSize = 5; + index = setupIndexWithEntries(15, { cacheSize }); + // Entry 1 is outside the cache window (window = [11, 15]) + const entry = index.get(1); + expect(entry).not.to.be(false); + expect(entry.number).to.be(1); + }); + + it('can read a range that spans both cached and uncached entries', function() { + const cacheSize = 5; + index = setupIndexWithEntries(15, { cacheSize }); + const entries = index.range(8, 13); // crosses the boundary + expect(entries.length).to.be(6); + for (let i = 0; i < entries.length; i++) { + expect(entries[i].number).to.be(8 + i); + } + }); + + it('truncation followed by re-adding entries returns the new values, not stale ones', function() { + const cacheSize = 10; + index = setupIndexWithEntries(15, { cacheSize }); + index.truncate(7); // keep entries 1-7 + // Add new entries at positions 8-10 with different numbers + for (let i = 8; i <= 10; i++) { + index.add(new Index.Entry(i + 100, i)); + } + index.flush(); + // New entries must return the just-added values, not the old truncated ones + for (let i = 8; i <= 10; i++) { + const entry = index.get(i); + expect(entry.number).to.be(i + 100); + } + }); + + it('remains fully functional after many adds that cycle the ring buffer', function() { + const cacheSize = 8; + index = setupIndexWithEntries(32, { cacheSize }); // 4 full cycles + index.close(); + index.open(); + // All entries readable via disk reads + const entries = index.all(); + expect(entries.length).to.be(32); + for (let i = 0; i < entries.length; i++) { + expect(entries[i].number).to.be(i + 1); + } + }); + + }); + describe('ReadOnly', function(){ it('can be created without explicit name', function(){ diff --git a/test/RingBuffer.spec.js b/test/RingBuffer.spec.js new file mode 100644 index 0000000..2e3f2f8 --- /dev/null +++ b/test/RingBuffer.spec.js @@ -0,0 +1,202 @@ +const expect = require('expect.js'); +const RingBuffer = require('../src/Index/RingBuffer'); + +describe('RingBuffer', function() { + + describe('constructor', function() { + + it('initialises length to 0', function() { + const rb = new RingBuffer(4); + expect(rb.length).to.be(0); + }); + + it('exposes the configured capacity', function() { + const rb = new RingBuffer(8); + expect(rb.capacity).to.be(8); + }); + + it('clamps capacity to a minimum of 1', function() { + const rb = new RingBuffer(0); + expect(rb.capacity).to.be(1); + }); + + }); + + describe('windowStart', function() { + + it('is 0 when length <= capacity', function() { + const rb = new RingBuffer(4); + rb.add('a'); rb.add('b'); + expect(rb.windowStart).to.be(0); + }); + + it('advances as length exceeds capacity', function() { + const rb = new RingBuffer(3); + for (let i = 0; i < 5; i++) rb.add(i); + // length=5, capacity=3 → windowStart=2 + expect(rb.windowStart).to.be(2); + }); + + }); + + describe('add', function() { + + it('returns the new length after each add', function() { + const rb = new RingBuffer(4); + expect(rb.add('x')).to.be(1); + expect(rb.add('y')).to.be(2); + }); + + it('increments length on every add', function() { + const rb = new RingBuffer(3); + rb.add('a'); rb.add('b'); rb.add('c'); rb.add('d'); + expect(rb.length).to.be(4); + }); + + it('makes items retrievable via get', function() { + const rb = new RingBuffer(4); + rb.add('first'); + rb.add('second'); + expect(rb.get(0)).to.be('first'); + expect(rb.get(1)).to.be('second'); + }); + + }); + + describe('get', function() { + + it('returns null for an index below windowStart', function() { + const rb = new RingBuffer(2); + rb.add('a'); rb.add('b'); rb.add('c'); // window = [1, 2] + expect(rb.get(0)).to.be(null); + }); + + it('returns null for a slot that has never been set', function() { + const rb = new RingBuffer(4); + expect(rb.get(0)).to.be(null); + }); + + it('returns the correct item within the window', function() { + const rb = new RingBuffer(4); + ['a', 'b', 'c', 'd', 'e'].forEach(v => rb.add(v)); + // window = [1, 4], item at index 4 = 'e' + expect(rb.get(4)).to.be('e'); + }); + + it('returns null after the item has been evicted by further adds', function() { + const rb = new RingBuffer(3); + rb.add('a'); rb.add('b'); rb.add('c'); // window=[0,2] + rb.add('d'); // window=[1,3], index 0 evicted + expect(rb.get(0)).to.be(null); + }); + + }); + + describe('set', function() { + + it('stores an item at an in-window index', function() { + const rb = new RingBuffer(4); + rb.add(null); // length=1 + rb.set(0, 'hello'); + expect(rb.get(0)).to.be('hello'); + }); + + it('ignores writes below windowStart', function() { + const rb = new RingBuffer(2); + rb.add('a'); rb.add('b'); rb.add('c'); // window=[1,2] + rb.set(0, 'overwrite'); + expect(rb.get(0)).to.be(null); + }); + + }); + + describe('truncate', function() { + + it('reduces length and evicts stale in-window slots', function() { + const rb = new RingBuffer(4); + rb.add('a'); rb.add('b'); rb.add('c'); + rb.truncate(1); // keep only index 0 + expect(rb.length).to.be(1); + expect(rb.get(1)).to.be(null); + expect(rb.get(2)).to.be(null); + }); + + it('retains entries below the truncation point', function() { + const rb = new RingBuffer(4); + rb.add('a'); rb.add('b'); rb.add('c'); + rb.truncate(2); + expect(rb.get(0)).to.be('a'); + expect(rb.get(1)).to.be('b'); + }); + + it('can grow length (no eviction)', function() { + const rb = new RingBuffer(4); + rb.add('a'); + rb.truncate(3); // grow + expect(rb.length).to.be(3); + expect(rb.get(0)).to.be('a'); + }); + + it('allows re-adding after truncation without returning stale data', function() { + const rb = new RingBuffer(4); + rb.add('old-1'); rb.add('old-2'); rb.add('old-3'); + rb.truncate(1); // keep only index 0 + rb.add('new-1'); // now at index 1 + expect(rb.get(1)).to.be('new-1'); + }); + + }); + + describe('reset', function() { + + it('clears all slots and sets length to 0', function() { + const rb = new RingBuffer(4); + rb.add('a'); rb.add('b'); + rb.reset(); + expect(rb.length).to.be(0); + expect(rb.get(0)).to.be(null); + expect(rb.get(1)).to.be(null); + }); + + it('allows fresh adds after reset', function() { + const rb = new RingBuffer(4); + rb.add('a'); rb.add('b'); + rb.reset(); + rb.add('x'); + expect(rb.get(0)).to.be('x'); + expect(rb.length).to.be(1); + }); + + }); + + describe('slice', function() { + + it('returns a contiguous sub-range from the buffer', function() { + const rb = new RingBuffer(4); + rb.add('a'); rb.add('b'); rb.add('c'); + expect(rb.slice(0, 2)).to.eql(['a', 'b', 'c']); + }); + + it('returns a sub-range that does not wrap', function() { + const rb = new RingBuffer(8); + ['a', 'b', 'c', 'd', 'e'].forEach(v => rb.add(v)); + expect(rb.slice(1, 3)).to.eql(['b', 'c', 'd']); + }); + + it('handles a range that wraps around the internal buffer', function() { + const rb = new RingBuffer(4); + // After 6 adds: window=[2,5], slots: [4%4=0]=>'e', [5%4=1]=>'f', [2%4=2]=>'c', [3%4=3]=>'d' + ['a', 'b', 'c', 'd', 'e', 'f'].forEach(v => rb.add(v)); + // Ask for [2, 5] (0-based, both inclusive) = 'c','d','e','f' + expect(rb.slice(2, 5)).to.eql(['c', 'd', 'e', 'f']); + }); + + it('returns a single-element slice', function() { + const rb = new RingBuffer(4); + rb.add('x'); rb.add('y'); rb.add('z'); + expect(rb.slice(1, 1)).to.eql(['y']); + }); + + }); + +}); \ No newline at end of file