Skip to content
Draft
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/Index/ReadOnlyIndex.js
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,9 @@ class ReadOnlyIndex extends watchesFile(ReadableIndex) {
if (!this.fd) {
return;
}
const prevLength = this.data.length;
const prevLength = this._cache.length;
const newLength = this.readFileLength();
this.data.length = newLength;
this._cache.truncate(newLength);
if (newLength > prevLength) {
this.emit('append', prevLength, newLength);
}
Expand Down
135 changes: 95 additions & 40 deletions src/Index/ReadableIndex.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ const path = require('path');
const events = require('events');
const Entry = require('../IndexEntry');
const { assert, wrapAndCheck, binarySearch } = require('../util');
const RingBuffer = require('./RingBuffer');

// node-event-store-index V01
const HEADER_MAGIC = "nesidx01";
Expand Down Expand Up @@ -38,6 +39,7 @@ class ReadableIndex extends events.EventEmitter {
* @param {object} [options] An object with additional index options.
* @param {typeof EntryInterface} [options.EntryClass] The entry class to use for index items. Must implement the EntryInterface methods.
* @param {string} [options.dataDirectory] The directory to store the index file in. Default '.'.
* @param {number} [options.cacheSize] The number of most-recent index entries to keep in memory. Default 1024.
* @param {number} [options.writeBufferSize] The number of bytes to use for the write buffer. Default 4096.
* @param {number} [options.flushDelay] How many ms to delay the write buffer flush to optimize throughput. Default 100.
* @param {object} [options.metadata] An object containing the metadata information for this index. Will be written on initial creation and checked on subsequent openings.
Expand Down Expand Up @@ -65,8 +67,8 @@ class ReadableIndex extends events.EventEmitter {
* @param {object} options
*/
initialize(options) {
/* @type Array<Entry> */
this.data = [];
const cacheSize = options.cacheSize !== undefined ? options.cacheSize : 1024;
this._cache = new RingBuffer(cacheSize);
this.fd = null;
this.fileMode = 'r';
this.EntryClass = options.EntryClass;
Expand Down Expand Up @@ -100,7 +102,7 @@ class ReadableIndex extends events.EventEmitter {
* @returns {number}
*/
get length() {
return this.data.length;
return this._cache.length;
}

/**
Expand Down Expand Up @@ -155,7 +157,7 @@ class ReadableIndex extends events.EventEmitter {

const length = this.readFileLength();
if (length > 0) {
this.data = new Array(length);
this._cache.truncate(length);
// Read last item to get the index started
this.read(length);
}
Expand Down Expand Up @@ -232,7 +234,7 @@ class ReadableIndex extends events.EventEmitter {
* @api
*/
close() {
this.data = [];
this._cache.reset();
this.readUntil = -1;
this.readBuffer.fill(0);
if (this.fd) {
Expand All @@ -250,21 +252,24 @@ class ReadableIndex extends events.EventEmitter {
* @returns {Entry} The index entry at the given position.
*/
read(index) {
index = Number(index) - 1;
const i = Number(index) - 1; // 0-based

fs.readSync(this.fd, this.readBuffer, 0, this.EntryClass.size, this.headerSize + index * this.EntryClass.size);
if (index === this.readUntil + 1) {
fs.readSync(this.fd, this.readBuffer, 0, this.EntryClass.size, this.headerSize + i * this.EntryClass.size);
if (i === this.readUntil + 1) {
this.readUntil++;
}
this.data[index] = this.EntryClass.fromBuffer(this.readBuffer);

return this.data[index];
const entry = this.EntryClass.fromBuffer(this.readBuffer);
this._cache.set(i, entry);
return entry;
}

/**
* Read a range of entries from disk. This method will not do any range checks.
* It will however optimize to prevent reading entries that have already been read sequentially from start.
*
* Entries within the cache window are stored in the cache; entries outside the window
* (older than cacheSize) are read from disk for the return value but not cached.
*
* @private
* @param {number} from The 1-based index position from where to read from (inclusive).
* @param {number} until The 1-based index position until which to read to (inclusive).
Expand All @@ -275,25 +280,58 @@ class ReadableIndex extends events.EventEmitter {
return [this.read(from)];
}

from--;
until--;

const readFrom = Math.max(this.readUntil + 1, from);
const amount = (until - readFrom + 1);

const readBuffer = Buffer.allocUnsafe(amount * this.EntryClass.size);
let readSize = fs.readSync(this.fd, readBuffer, 0, readBuffer.byteLength, this.headerSize + readFrom * this.EntryClass.size);
let index = 0;
while (index < amount && readSize > 0) {
this.data[index + readFrom] = this.EntryClass.fromBuffer(readBuffer, index * this.EntryClass.size);
readSize -= this.EntryClass.size;
index++;
const f = from - 1; // 0-based
const u = until - 1; // 0-based
const cacheStart = this._cache.windowStart;

// Build the result array up front
const result = new Array(u - f + 1);

// Part 1: Out-of-window entries [f, min(cacheStart-1, u)] — read from disk, do not cache
const outEnd = Math.min(cacheStart - 1, u);
if (f < cacheStart && outEnd >= f) {
const count = outEnd - f + 1;
const outBuf = Buffer.allocUnsafe(count * this.EntryClass.size);
const bytesRead = fs.readSync(this.fd, outBuf, 0, outBuf.byteLength, this.headerSize + f * this.EntryClass.size);
const entries = Math.floor(bytesRead / this.EntryClass.size);
for (let idx = 0; idx < entries; idx++) {
result[idx] = this.EntryClass.fromBuffer(outBuf, idx * this.EntryClass.size);
}
}
if (from <= this.readUntil + 1) {
this.readUntil = Math.max(this.readUntil, until);

// Part 2: In-window entries [max(cacheStart, f), u] — use cache + disk for uncached ones
const inStart = Math.max(cacheStart, f);
if (inStart <= u) {
// Optimisation: skip entries already loaded sequentially into the cache
const readFrom = Math.max(this.readUntil + 1, inStart);

// Trim trailing entries already present in the cache
let readUntil = u;
while (readUntil >= readFrom && this._cache.get(readUntil)) {
readUntil--;
}

if (readFrom <= readUntil) {
const count = readUntil - readFrom + 1;
const inBuf = Buffer.allocUnsafe(count * this.EntryClass.size);
const bytesRead = fs.readSync(this.fd, inBuf, 0, inBuf.byteLength, this.headerSize + readFrom * this.EntryClass.size);
const entries = Math.floor(bytesRead / this.EntryClass.size);
for (let idx = 0; idx < entries; idx++) {
const i = readFrom + idx;
this._cache.set(i, this.EntryClass.fromBuffer(inBuf, idx * this.EntryClass.size));
}
if (inStart <= this.readUntil + 1) {
this.readUntil = Math.max(this.readUntil, readUntil);
}
}

// Fill the result from the cache for the in-window portion
for (let i = inStart; i <= u; i++) {
result[i - f] = this._cache.get(i);
}
}

return this.data.slice(from, until + 1);
return result;
}

/**
Expand All @@ -318,14 +356,14 @@ class ReadableIndex extends events.EventEmitter {
* @returns {Entry|boolean} The entry at the given index position or false if out of bounds.
*/
get(index) {
index = wrapAndCheck(index, this.length);
index = wrapAndCheck(index, this._cache.length);
if (index <= 0) {
return false;
}

if (this.data[index - 1]) {
return this.data[index - 1];
}
const i = index - 1; // 0-based
const cached = this._cache.get(i);
if (cached) return cached;

return this.read(index);
}
Expand Down Expand Up @@ -354,24 +392,41 @@ class ReadableIndex extends events.EventEmitter {
* @returns {Array<Entry>|boolean} An array of entries for the given range or false on error.
*/
range(from, until = -1) {
from = wrapAndCheck(from, this.length);
until = wrapAndCheck(until, this.length);
from = wrapAndCheck(from, this._cache.length);
until = wrapAndCheck(until, this._cache.length);

if (from <= 0 || until < from) {
return false;
}

const readFrom = Math.max(this.readUntil + 1, from);
let readUntil = until;
while (readUntil >= readFrom && this.data[readUntil - 1]) {
readUntil--;
const f = from - 1; // 0-based
const u = until - 1; // 0-based
const cacheStart = this._cache.windowStart;

// Determine if any disk reads are required
const hasOutOfWindow = f < cacheStart;
const inStart = Math.max(cacheStart, f);
const readFrom = Math.max(this.readUntil + 1, inStart);
let needsDiskRead = hasOutOfWindow;
if (!needsDiskRead && inStart <= u) {
// Scan backwards for uncached in-window tail entries
let scanUntil = u;
while (scanUntil >= readFrom && this._cache.get(scanUntil)) {
scanUntil--;
}
needsDiskRead = readFrom <= scanUntil;
}

if (readFrom <= readUntil) {
this.readRange(readFrom, readUntil);
if (needsDiskRead) {
return this.readRange(from, until);
}

return this.data.slice(from - 1, until);
// All required entries are already in the cache — build result directly
const result = new Array(u - f + 1);
for (let i = f; i <= u; i++) {
result[i - f] = this._cache.get(i);
}
return result;
}

/**
Expand Down
129 changes: 129 additions & 0 deletions src/Index/RingBuffer.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
/**
* A fixed-capacity ring buffer used as an index entry cache.
*
* Only the most-recent `capacity` entries are kept in memory. The buffer also
* tracks the total number of entries ever added (`length`) so callers can tell
* whether a slot is within the live in-memory window.
*
* API contract
* ------------
* - `get(index)` — cached item at 0-based `index`, or `null` when the
* index is outside the in-memory window or the slot has
* not been written yet.
* - `set(index, item)` — stores `item` at 0-based `index` if it falls inside
* the current window; silently ignores out-of-window
* writes.
* - `add(item)` — appends `item` at position `length`, advances
* `length`, and returns the new length.
* - `truncate(newLength)` — discards entries from `newLength` onwards (nulls
* their slots) and sets `length = newLength`. Safe to
* call with `newLength >= length` (grow-only update).
* - `reset()` — clears all slots and resets `length` to 0.
*/
class RingBuffer {

/**
* @param {number} capacity Maximum number of entries held in memory.
*/
constructor(capacity) {
this._capacity = Math.max(1, capacity >>> 0); // jshint ignore:line
this._buffer = new Array(this._capacity);
this._length = 0;
}

/**
* Total number of items ever appended (not capped at capacity).
* @type {number}
*/
get length() {
return this._length;
}

/**
* Maximum number of items kept in memory.
* @type {number}
*/
get capacity() {
return this._capacity;
}

/**
* The smallest 0-based index that is currently inside the in-memory window.
* Indices below this value are not cached and require a disk read.
* @type {number}
*/
get windowStart() {
return Math.max(0, this._length - this._capacity);
}

/**
* Return the cached item at the given 0-based index, or `null` if the
* index is outside the in-memory window or the slot has not been populated.
*
* @param {number} index 0-based position.
* @returns {*|null}
*/
get(index) {
if (index < this.windowStart) {
return null;
}
const item = this._buffer[index % this._capacity];
return item !== undefined ? item : null;
}

/**
* Store `item` at the given 0-based `index`.
* Writes outside the current in-memory window are silently ignored.
*
* @param {number} index 0-based position.
* @param {*} item
*/
set(index, item) {
if (index < this.windowStart) {
return;
}
this._buffer[index % this._capacity] = item;
}

/**
* Append `item` at position `length` and advance `length`.
*
* @param {*} item
* @returns {number} The new length (1-based position of the appended item).
*/
add(item) {
this._buffer[this._length % this._capacity] = item;
this._length++;
return this._length;
}

/**
* Discard entries from `newLength` onwards by nulling their cache slots,
* then set `length = newLength`.
*
* When `newLength >= length` no eviction is performed and only `length` is
* updated (useful when the underlying file has grown and the caller just
* needs to advance the length counter without populating new slots).
*
* @param {number} newLength The new total length.
*/
truncate(newLength) {
if (newLength < this._length) {
const cacheStart = this.windowStart;
for (let i = Math.max(cacheStart, newLength); i < this._length; i++) {
this._buffer[i % this._capacity] = null;
}
}
this._length = newLength;
}

/**
* Clear all cached slots and reset `length` to 0.
*/
reset() {
this._buffer.fill(null);
this._length = 0;
}
}

module.exports = RingBuffer;
Loading