Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/Index/ReadOnlyIndex.js
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,9 @@ class ReadOnlyIndex extends watchesFile(ReadableIndex) {
if (!this.fd) {
return;
}
const prevLength = this.data.length;
const prevLength = this.cache.length;
const newLength = this.readFileLength();
this.data.length = newLength;
this.cache.truncate(newLength);
if (newLength > prevLength) {
this.emit('append', prevLength, newLength);
}
Expand Down
130 changes: 90 additions & 40 deletions src/Index/ReadableIndex.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ const path = require('path');
const events = require('events');
const Entry = require('../IndexEntry');
const { assert, wrapAndCheck, binarySearch } = require('../util');
const RingBuffer = require('./RingBuffer');

// node-event-store-index V01
const HEADER_MAGIC = "nesidx01";
Expand Down Expand Up @@ -38,6 +39,7 @@ class ReadableIndex extends events.EventEmitter {
* @param {object} [options] An object with additional index options.
* @param {typeof EntryInterface} [options.EntryClass] The entry class to use for index items. Must implement the EntryInterface methods.
* @param {string} [options.dataDirectory] The directory to store the index file in. Default '.'.
* @param {number} [options.cacheSize] The number of most-recent index entries to keep in memory. Default 1024.
* @param {number} [options.writeBufferSize] The number of bytes to use for the write buffer. Default 4096.
* @param {number} [options.flushDelay] How many ms to delay the write buffer flush to optimize throughput. Default 100.
* @param {object} [options.metadata] An object containing the metadata information for this index. Will be written on initial creation and checked on subsequent openings.
Expand Down Expand Up @@ -65,8 +67,8 @@ class ReadableIndex extends events.EventEmitter {
* @param {object} options
*/
initialize(options) {
/* @type Array<Entry> */
this.data = [];
const cacheSize = options.cacheSize !== undefined ? options.cacheSize : 1024;
this.cache = new RingBuffer(cacheSize);
this.fd = null;
this.fileMode = 'r';
this.EntryClass = options.EntryClass;
Expand Down Expand Up @@ -100,7 +102,7 @@ class ReadableIndex extends events.EventEmitter {
* @returns {number}
*/
get length() {
return this.data.length;
return this.cache.length;
}

/**
Expand Down Expand Up @@ -155,7 +157,7 @@ class ReadableIndex extends events.EventEmitter {

const length = this.readFileLength();
if (length > 0) {
this.data = new Array(length);
this.cache.truncate(length);
// Read last item to get the index started
this.read(length);
}
Expand Down Expand Up @@ -232,7 +234,7 @@ class ReadableIndex extends events.EventEmitter {
* @api
*/
close() {
this.data = [];
this.cache.reset();
this.readUntil = -1;
this.readBuffer.fill(0);
if (this.fd) {
Expand All @@ -250,21 +252,24 @@ class ReadableIndex extends events.EventEmitter {
* @returns {Entry} The index entry at the given position.
*/
read(index) {
index = Number(index) - 1;
const zeroBasedIndex = Number(index) - 1;

fs.readSync(this.fd, this.readBuffer, 0, this.EntryClass.size, this.headerSize + index * this.EntryClass.size);
if (index === this.readUntil + 1) {
fs.readSync(this.fd, this.readBuffer, 0, this.EntryClass.size, this.headerSize + zeroBasedIndex * this.EntryClass.size);
if (zeroBasedIndex === this.readUntil + 1) {
this.readUntil++;
}
this.data[index] = this.EntryClass.fromBuffer(this.readBuffer);

return this.data[index];
const entry = this.EntryClass.fromBuffer(this.readBuffer);
this.cache.set(zeroBasedIndex, entry);
return entry;
}

/**
* Read a range of entries from disk. This method will not do any range checks.
* It will however optimize to prevent reading entries that have already been read sequentially from start.
*
* Entries within the cache window are stored in the cache; entries outside the window
* (older than cacheSize) are read from disk for the return value but not cached.
*
* @private
* @param {number} from The 1-based index position from where to read from (inclusive).
* @param {number} until The 1-based index position until which to read to (inclusive).
Expand All @@ -275,25 +280,58 @@ class ReadableIndex extends events.EventEmitter {
return [this.read(from)];
}

from--;
until--;

const readFrom = Math.max(this.readUntil + 1, from);
const amount = (until - readFrom + 1);

const readBuffer = Buffer.allocUnsafe(amount * this.EntryClass.size);
let readSize = fs.readSync(this.fd, readBuffer, 0, readBuffer.byteLength, this.headerSize + readFrom * this.EntryClass.size);
let index = 0;
while (index < amount && readSize > 0) {
this.data[index + readFrom] = this.EntryClass.fromBuffer(readBuffer, index * this.EntryClass.size);
readSize -= this.EntryClass.size;
index++;
const zeroBasedFrom = from - 1;
const zeroBasedUntil = until - 1;
const cacheStart = this.cache.windowStart;

// Build the result array up front
const result = new Array(zeroBasedUntil - zeroBasedFrom + 1);

// Part 1: Out-of-window entries [zeroBasedFrom, min(cacheStart-1, zeroBasedUntil)] — read from disk, do not cache
const outEnd = Math.min(cacheStart - 1, zeroBasedUntil);
if (zeroBasedFrom < cacheStart && outEnd >= zeroBasedFrom) {
const count = outEnd - zeroBasedFrom + 1;
const outBuf = Buffer.allocUnsafe(count * this.EntryClass.size);
const bytesRead = fs.readSync(this.fd, outBuf, 0, outBuf.byteLength, this.headerSize + zeroBasedFrom * this.EntryClass.size);
const entries = Math.floor(bytesRead / this.EntryClass.size);
for (let idx = 0; idx < entries; idx++) {
result[idx] = this.EntryClass.fromBuffer(outBuf, idx * this.EntryClass.size);
}
}
if (from <= this.readUntil + 1) {
this.readUntil = Math.max(this.readUntil, until);

// Part 2: In-window entries [max(cacheStart, zeroBasedFrom), zeroBasedUntil] — use cache + disk for uncached ones
const inStart = Math.max(cacheStart, zeroBasedFrom);
if (inStart <= zeroBasedUntil) {
// Optimisation: skip entries already loaded sequentially into the cache
const readFrom = Math.max(this.readUntil + 1, inStart);

// Trim trailing entries already present in the cache
let readUntilPos = zeroBasedUntil;
while (readUntilPos >= readFrom && this.cache.get(readUntilPos)) {
readUntilPos--;
}

if (readFrom <= readUntilPos) {
const count = readUntilPos - readFrom + 1;
const inBuf = Buffer.allocUnsafe(count * this.EntryClass.size);
const bytesRead = fs.readSync(this.fd, inBuf, 0, inBuf.byteLength, this.headerSize + readFrom * this.EntryClass.size);
const entries = Math.floor(bytesRead / this.EntryClass.size);
for (let idx = 0; idx < entries; idx++) {
const index = readFrom + idx;
this.cache.set(index, this.EntryClass.fromBuffer(inBuf, idx * this.EntryClass.size));
}
if (inStart <= this.readUntil + 1) {
this.readUntil = Math.max(this.readUntil, readUntilPos);
}
}

// Fill the result from the cache for the in-window portion
for (let index = inStart; index <= zeroBasedUntil; index++) {
result[index - zeroBasedFrom] = this.cache.get(index);
}
}

return this.data.slice(from, until + 1);
return result;
}

/**
Expand All @@ -318,14 +356,13 @@ class ReadableIndex extends events.EventEmitter {
* @returns {Entry|boolean} The entry at the given index position or false if out of bounds.
*/
get(index) {
index = wrapAndCheck(index, this.length);
index = wrapAndCheck(index, this.cache.length);
if (index <= 0) {
return false;
}

if (this.data[index - 1]) {
return this.data[index - 1];
}
const cached = this.cache.get(index - 1);
if (cached) return cached;

return this.read(index);
}
Expand Down Expand Up @@ -354,24 +391,37 @@ class ReadableIndex extends events.EventEmitter {
* @returns {Array<Entry>|boolean} An array of entries for the given range or false on error.
*/
range(from, until = -1) {
from = wrapAndCheck(from, this.length);
until = wrapAndCheck(until, this.length);
from = wrapAndCheck(from, this.cache.length);
until = wrapAndCheck(until, this.cache.length);

if (from <= 0 || until < from) {
return false;
}

const readFrom = Math.max(this.readUntil + 1, from);
let readUntil = until;
while (readUntil >= readFrom && this.data[readUntil - 1]) {
readUntil--;
const zeroBasedFrom = from - 1;
const zeroBasedUntil = until - 1;
const cacheStart = this.cache.windowStart;

// Determine if any disk reads are required
const hasOutOfWindow = zeroBasedFrom < cacheStart;
const inStart = Math.max(cacheStart, zeroBasedFrom);
const readFrom = Math.max(this.readUntil + 1, inStart);
let needsDiskRead = hasOutOfWindow;
if (!needsDiskRead && inStart <= zeroBasedUntil) {
// Scan backwards for uncached in-window tail entries
let scanUntil = zeroBasedUntil;
while (scanUntil >= readFrom && this.cache.get(scanUntil)) {
scanUntil--;
}
needsDiskRead = readFrom <= scanUntil;
}

if (readFrom <= readUntil) {
this.readRange(readFrom, readUntil);
if (needsDiskRead) {
return this.readRange(from, until);
}

return this.data.slice(from - 1, until);
// All required entries are already in the cache — return a slice directly
return this.cache.slice(zeroBasedFrom, zeroBasedUntil);
}

/**
Expand Down
149 changes: 149 additions & 0 deletions src/Index/RingBuffer.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
/**
* A fixed-capacity ring buffer used as an index entry cache.
*
* Only the most-recent `capacity` entries are kept in memory. The buffer also
* tracks the total number of entries ever added (`length`) so callers can tell
* whether a slot is within the live in-memory window.
*
* API contract
* ------------
* - `get(index)` — cached item at 0-based `index`, or `null` when the
* index is outside the in-memory window or the slot has
* not been written yet.
* - `set(index, item)` — stores `item` at 0-based `index` if it falls inside
* the current window; silently ignores out-of-window
* writes.
* - `add(item)` — appends `item` at position `length`, advances
* `length`, and returns the new length.
* - `truncate(newLength)` — discards entries from `newLength` onwards (nulls
* their slots) and sets `length = newLength`. Safe to
* call with `newLength >= length` (grow-only update).
* - `reset()` — clears all slots and resets `length` to 0.
*/
class RingBuffer {

/**
* @param {number} capacity Maximum number of entries held in memory.
*/
constructor(capacity) {
this._capacity = Math.max(1, capacity >>> 0); // jshint ignore:line
this._buffer = new Array(this._capacity);
this._length = 0;
}

/**
* Total number of items ever appended (not capped at capacity).
* @type {number}
*/
get length() {
return this._length;
}

/**
* Maximum number of items kept in memory.
* @type {number}
*/
get capacity() {
return this._capacity;
}

/**
* The smallest 0-based index that is currently inside the in-memory window.
* Indices below this value are not cached and require a disk read.
* @type {number}
*/
get windowStart() {
return Math.max(0, this._length - this._capacity);
}

/**
* Return the cached item at the given 0-based index, or `null` if the
* index is outside the in-memory window or the slot has not been populated.
*
* @param {number} index 0-based position.
* @returns {*|null}
*/
get(index) {
if (index < this.windowStart) {
return null;
}
const item = this._buffer[index % this._capacity];
return item !== undefined ? item : null;
}

/**
* Store `item` at the given 0-based `index`.
* Writes outside the current in-memory window are silently ignored.
*
* @param {number} index 0-based position.
* @param {*} item
*/
set(index, item) {
if (index < this.windowStart) {
return;
}
this._buffer[index % this._capacity] = item;
}

/**
* Append `item` at position `length` and advance `length`.
*
* @param {*} item
* @returns {number} The new length (1-based position of the appended item).
*/
add(item) {
this._buffer[this._length % this._capacity] = item;
this._length++;
return this._length;
}

/**
* Discard entries from `newLength` onwards by nulling their cache slots,
* then set `length = newLength`.
*
* When `newLength >= length` no eviction is performed and only `length` is
* updated (useful when the underlying file has grown and the caller just
* needs to advance the length counter without populating new slots).
*
* @param {number} newLength The new total length.
*/
truncate(newLength) {
if (newLength < this._length) {
const cacheStart = this.windowStart;
for (let i = Math.max(cacheStart, newLength); i < this._length; i++) {
this._buffer[i % this._capacity] = null;
}
}
this._length = newLength;
}

/**
* Return a copy of the cached items for the 0-based range [from, until] (inclusive).
* Both `from` and `until` must be within the current window (>= windowStart).
*
* When the range is contiguous in the internal buffer a single native slice is
* returned. When it wraps the two halves are concatenated.
*
* @param {number} from 0-based start position (inclusive).
* @param {number} until 0-based end position (inclusive).
* @returns {Array<*>}
*/
slice(from, until) {
const slotFrom = from % this._capacity;
const slotUntil = until % this._capacity;
if (slotFrom <= slotUntil) {
return this._buffer.slice(slotFrom, slotUntil + 1);
}
return this._buffer.slice(slotFrom).concat(this._buffer.slice(0, slotUntil + 1));
}

/**
* Clear all cached slots and reset `length` to 0.
*/
reset() {
this._buffer.fill(null);
this._length = 0;
}
}

module.exports = RingBuffer;
Loading