From 287debeb38fd6052ec03f9306b83bdadad08c5a7 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Sat, 21 Mar 2026 09:21:06 +0000 Subject: [PATCH 01/19] Initial plan From 4ecb1526adb05bcc40cb3f8d1283207c4ec5cceb Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Sat, 21 Mar 2026 09:40:12 +0000 Subject: [PATCH 02/19] feat: replace unbounded data array with fixed-size ring buffer in ReadableIndex - Replace `this.data` (sparse Array, grows without bound) with `this.cache` (ring buffer of `cacheSize` slots) and `this._length` (total entry count). - The ring buffer window is always [_length - cacheSize, _length - 1]; entries outside that window are read from disk on demand without consuming memory. - New `cacheSize` option (default 1024) configures how many of the most-recent entries are kept in memory, bounding memory use regardless of index size. - WritableIndex.add() stores new entries in the ring buffer and increments _length; truncate() clears stale ring buffer slots for removed entries. - ReadOnlyIndex.onChange() manages _length directly and clears stale cache slots when the underlying file is truncated by the writer. - readRange() handles out-of-window reads (disk-only, no cache) separately from in-window reads (disk + ring buffer), with explanatory comments showing why in-window slots can never hold stale data. - 5 new tests validate: memory-bounding after cycling, disk fallback for out-of-window entries, cross-boundary range reads, stale-slot safety after truncation + re-add, and correctness after full ring cycles. Co-authored-by: albe <4259532+albe@users.noreply.github.com> Agent-Logs-Url: https://github.com/albe/node-event-storage/sessions/58a8bfd7-606c-4d78-86e4-713c830c84be --- src/Index/ReadOnlyIndex.js | 11 ++- src/Index/ReadableIndex.js | 149 +++++++++++++++++++++++++++---------- src/Index/WritableIndex.js | 18 +++-- test/Index.spec.js | 65 ++++++++++++++++ 4 files changed, 197 insertions(+), 46 deletions(-) diff --git a/src/Index/ReadOnlyIndex.js b/src/Index/ReadOnlyIndex.js index fa1db0d..0b01019 100644 --- a/src/Index/ReadOnlyIndex.js +++ b/src/Index/ReadOnlyIndex.js @@ -24,9 +24,16 @@ class ReadOnlyIndex extends watchesFile(ReadableIndex) { if (!this.fd) { return; } - const prevLength = this.data.length; + const prevLength = this._length; const newLength = this.readFileLength(); - this.data.length = newLength; + if (newLength < prevLength) { + // Clear ring buffer slots for the removed entries to avoid stale reads + const oldCacheStart = Math.max(0, prevLength - this.cacheSize); + for (let i = Math.max(oldCacheStart, newLength); i < prevLength; i++) { + this.cache[i % this.cacheSize] = null; + } + } + this._length = newLength; if (newLength > prevLength) { this.emit('append', prevLength, newLength); } diff --git a/src/Index/ReadableIndex.js b/src/Index/ReadableIndex.js index c998d4d..e1e6467 100644 --- a/src/Index/ReadableIndex.js +++ b/src/Index/ReadableIndex.js @@ -38,6 +38,7 @@ class ReadableIndex extends events.EventEmitter { * @param {object} [options] An object with additional index options. * @param {typeof EntryInterface} [options.EntryClass] The entry class to use for index items. Must implement the EntryInterface methods. * @param {string} [options.dataDirectory] The directory to store the index file in. Default '.'. + * @param {number} [options.cacheSize] The number of most-recent index entries to keep in memory. Default 1024. * @param {number} [options.writeBufferSize] The number of bytes to use for the write buffer. Default 4096. * @param {number} [options.flushDelay] How many ms to delay the write buffer flush to optimize throughput. Default 100. * @param {object} [options.metadata] An object containing the metadata information for this index. Will be written on initial creation and checked on subsequent openings. @@ -65,8 +66,10 @@ class ReadableIndex extends events.EventEmitter { * @param {object} options */ initialize(options) { - /* @type Array */ - this.data = []; + this._length = 0; + this.cacheSize = options.cacheSize !== undefined ? Math.max(1, options.cacheSize >>> 0) : 1024; // jshint ignore:line + /* @type Array Ring buffer holding at most cacheSize entries */ + this.cache = new Array(this.cacheSize); this.fd = null; this.fileMode = 'r'; this.EntryClass = options.EntryClass; @@ -100,7 +103,7 @@ class ReadableIndex extends events.EventEmitter { * @returns {number} */ get length() { - return this.data.length; + return this._length; } /** @@ -155,7 +158,7 @@ class ReadableIndex extends events.EventEmitter { const length = this.readFileLength(); if (length > 0) { - this.data = new Array(length); + this._length = length; // Read last item to get the index started this.read(length); } @@ -232,8 +235,9 @@ class ReadableIndex extends events.EventEmitter { * @api */ close() { - this.data = []; + this._length = 0; this.readUntil = -1; + this.cache.fill(null); this.readBuffer.fill(0); if (this.fd) { fs.closeSync(this.fd); @@ -250,21 +254,27 @@ class ReadableIndex extends events.EventEmitter { * @returns {Entry} The index entry at the given position. */ read(index) { - index = Number(index) - 1; + const i = Number(index) - 1; // 0-based - fs.readSync(this.fd, this.readBuffer, 0, this.EntryClass.size, this.headerSize + index * this.EntryClass.size); - if (index === this.readUntil + 1) { + fs.readSync(this.fd, this.readBuffer, 0, this.EntryClass.size, this.headerSize + i * this.EntryClass.size); + if (i === this.readUntil + 1) { this.readUntil++; } - this.data[index] = this.EntryClass.fromBuffer(this.readBuffer); - - return this.data[index]; + const entry = this.EntryClass.fromBuffer(this.readBuffer); + // Store in ring buffer only if within the current cache window + if (i >= this._length - this.cacheSize) { + this.cache[i % this.cacheSize] = entry; + } + return entry; } /** * Read a range of entries from disk. This method will not do any range checks. * It will however optimize to prevent reading entries that have already been read sequentially from start. * + * Entries within the ring buffer cache window are stored in the cache; entries outside the window + * (older than cacheSize) are read from disk for the return value but not cached. + * * @private * @param {number} from The 1-based index position from where to read from (inclusive). * @param {number} until The 1-based index position until which to read to (inclusive). @@ -275,25 +285,61 @@ class ReadableIndex extends events.EventEmitter { return [this.read(from)]; } - from--; - until--; - - const readFrom = Math.max(this.readUntil + 1, from); - const amount = (until - readFrom + 1); - - const readBuffer = Buffer.allocUnsafe(amount * this.EntryClass.size); - let readSize = fs.readSync(this.fd, readBuffer, 0, readBuffer.byteLength, this.headerSize + readFrom * this.EntryClass.size); - let index = 0; - while (index < amount && readSize > 0) { - this.data[index + readFrom] = this.EntryClass.fromBuffer(readBuffer, index * this.EntryClass.size); - readSize -= this.EntryClass.size; - index++; + const f = from - 1; // 0-based + const u = until - 1; // 0-based + const cacheStart = Math.max(0, this._length - this.cacheSize); + + // Build the result array up front + const result = new Array(u - f + 1); + + // Part 1: Out-of-window entries [f, min(cacheStart-1, u)] — read from disk, do not cache + const outEnd = Math.min(cacheStart - 1, u); + if (f < cacheStart && outEnd >= f) { + const count = outEnd - f + 1; + const outBuf = Buffer.allocUnsafe(count * this.EntryClass.size); + const bytesRead = fs.readSync(this.fd, outBuf, 0, outBuf.byteLength, this.headerSize + f * this.EntryClass.size); + const entries = Math.floor(bytesRead / this.EntryClass.size); + for (let idx = 0; idx < entries; idx++) { + result[idx] = this.EntryClass.fromBuffer(outBuf, idx * this.EntryClass.size); + } } - if (from <= this.readUntil + 1) { - this.readUntil = Math.max(this.readUntil, until); + + // Part 2: In-window entries [max(cacheStart, f), u] — use cache + disk for uncached ones + // All indices accessed below satisfy i >= inStart >= cacheStart, so each slot i % cacheSize + // is exclusive to index i within the window and cannot hold a stale entry. + const inStart = Math.max(cacheStart, f); + if (inStart <= u) { + // Optimisation: skip entries already loaded sequentially into the cache + const readFrom = Math.max(this.readUntil + 1, inStart); + + // Trim trailing entries already present in the cache. + // readUntil >= readFrom >= cacheStart throughout, so all slots checked are in-window. + let readUntil = u; + while (readUntil >= readFrom && this.cache[readUntil % this.cacheSize]) { + readUntil--; + } + + if (readFrom <= readUntil) { + const count = readUntil - readFrom + 1; + const inBuf = Buffer.allocUnsafe(count * this.EntryClass.size); + const bytesRead = fs.readSync(this.fd, inBuf, 0, inBuf.byteLength, this.headerSize + readFrom * this.EntryClass.size); + const entries = Math.floor(bytesRead / this.EntryClass.size); + for (let idx = 0; idx < entries; idx++) { + const i = readFrom + idx; + this.cache[i % this.cacheSize] = this.EntryClass.fromBuffer(inBuf, idx * this.EntryClass.size); + } + if (inStart <= this.readUntil + 1) { + this.readUntil = Math.max(this.readUntil, readUntil); + } + } + + // Fill the result from the ring buffer for the in-window portion + for (let i = inStart; i <= u; i++) { + result[i - f] = this.cache[i % this.cacheSize]; + } } - return this.data.slice(from, until + 1); + return result; } /** @@ -318,13 +364,18 @@ class ReadableIndex extends events.EventEmitter { * @returns {Entry|boolean} The entry at the given index position or false if out of bounds. */ get(index) { - index = wrapAndCheck(index, this.length); + index = wrapAndCheck(index, this._length); if (index <= 0) { return false; } - if (this.data[index - 1]) { - return this.data[index - 1]; + const i = index - 1; // 0-based + // The ring buffer window is [_length - cacheSize, _length - 1]. + // Within this window every index maps to a unique slot (no two indices share a slot), + // so a non-null slot is guaranteed to belong to index i and cannot be stale. + if (i >= this._length - this.cacheSize) { + const cached = this.cache[i % this.cacheSize]; + if (cached) return cached; } return this.read(index); @@ -354,24 +405,44 @@ class ReadableIndex extends events.EventEmitter { * @returns {Array|boolean} An array of entries for the given range or false on error. */ range(from, until = -1) { - from = wrapAndCheck(from, this.length); - until = wrapAndCheck(until, this.length); + from = wrapAndCheck(from, this._length); + until = wrapAndCheck(until, this._length); if (from <= 0 || until < from) { return false; } - const readFrom = Math.max(this.readUntil + 1, from); - let readUntil = until; - while (readUntil >= readFrom && this.data[readUntil - 1]) { - readUntil--; + const f = from - 1; // 0-based + const u = until - 1; // 0-based + const cacheStart = Math.max(0, this._length - this.cacheSize); + + // Determine if any disk reads are required + const hasOutOfWindow = f < cacheStart; + const inStart = Math.max(cacheStart, f); + // Entries in [inStart, readUntil] are assumed cached (sequential read guarantee). + // All indices in [readFrom, u] satisfy >= inStart >= cacheStart — unique, non-stale slots. + const readFrom = Math.max(this.readUntil + 1, inStart); + let needsDiskRead = hasOutOfWindow; + if (!needsDiskRead && inStart <= u) { + // Scan backwards for uncached in-window tail entries (all >= cacheStart, no stale slots). + let scanUntil = u; + while (scanUntil >= readFrom && this.cache[scanUntil % this.cacheSize]) { + scanUntil--; + } + needsDiskRead = readFrom <= scanUntil; } - if (readFrom <= readUntil) { - this.readRange(readFrom, readUntil); + if (needsDiskRead) { + return this.readRange(from, until); } - return this.data.slice(from - 1, until); + // All required entries are already in the ring buffer — build result directly. + // f >= cacheStart here (hasOutOfWindow is false), so all slots are in-window and valid. + const result = new Array(u - f + 1); + for (let i = f; i <= u; i++) { + result[i - f] = this.cache[i % this.cacheSize]; + } + return result; } /** diff --git a/src/Index/WritableIndex.js b/src/Index/WritableIndex.js index 6ba1edc..df477cb 100644 --- a/src/Index/WritableIndex.js +++ b/src/Index/WritableIndex.js @@ -20,6 +20,7 @@ class WritableIndex extends ReadableIndex { * @param {object} [options] An object with additional index options. * @param {EntryInterface} [options.EntryClass] The entry class to use for index items. Must implement the EntryInterface methods. * @param {string} [options.dataDirectory] The directory to store the index file in. Default '.'. + * @param {number} [options.cacheSize] The number of most-recent index entries to keep in memory. Default 1024. * @param {number} [options.writeBufferSize] The number of bytes to use for the write buffer. Default 4096. * @param {number} [options.flushDelay] How many ms to delay the write buffer flush to optimize throughput. Default 100. * @param {object} [options.metadata] An object containing the metadata information for this index. Will be written on initial creation and checked on subsequent openings. @@ -187,22 +188,23 @@ class WritableIndex extends ReadableIndex { assertEqual(entry.constructor.name, this.EntryClass.name, `Wrong entry object.`); assertEqual(entry.constructor.size, this.EntryClass.size, `Invalid entry size.`); - if (this.readUntil === this.data.length - 1) { + if (this.readUntil === this._length - 1) { this.readUntil++; } - this.data[this.data.length] = entry; + this.cache[this._length % this.cacheSize] = entry; + this._length++; if (this.writeBufferCursor === 0) { this.flushTimeout = setTimeout(() => this.flush(), this.flushDelay); } this.writeBufferCursor += entry.toBuffer(this.writeBuffer, this.writeBufferCursor); - this.onFlush(callback, this.length); + this.onFlush(callback, this._length); if (this.writeBufferCursor >= this.writeBuffer.byteLength) { this.flush(); } - return this.length; + return this._length; } /** @@ -225,7 +227,13 @@ class WritableIndex extends ReadableIndex { return; } fs.truncateSync(this.fileName, truncatePosition); - this.data.splice(after); + + // Clear ring buffer slots for the removed entries to avoid stale reads + const oldCacheStart = Math.max(0, this._length - this.cacheSize); + for (let i = Math.max(oldCacheStart, after); i < this._length; i++) { + this.cache[i % this.cacheSize] = null; + } + this._length = after; this.readUntil = Math.min(this.readUntil, after); } } diff --git a/test/Index.spec.js b/test/Index.spec.js index 7652282..a3de214 100644 --- a/test/Index.spec.js +++ b/test/Index.spec.js @@ -488,6 +488,71 @@ describe('Index', function() { }); + describe('cacheSize / ring buffer', function() { + + it('limits in-memory entries to cacheSize (old entries still readable via disk)', function() { + const cacheSize = 5; + index = setupIndexWithEntries(20, { cacheSize }); + index.close(); + index.open(); + // All entries must be readable even though only the last `cacheSize` are ever in memory. + for (let i = 1; i <= 20; i++) { + const entry = index.get(i); + expect(entry).not.to.be(false); + expect(entry.number).to.be(i); + } + }); + + it('can still read entries outside the cache window from disk', function() { + const cacheSize = 5; + index = setupIndexWithEntries(15, { cacheSize }); + // Entry 1 is outside the cache window (window = [11, 15]) + const entry = index.get(1); + expect(entry).not.to.be(false); + expect(entry.number).to.be(1); + }); + + it('can read a range that spans both cached and uncached entries', function() { + const cacheSize = 5; + index = setupIndexWithEntries(15, { cacheSize }); + const entries = index.range(8, 13); // crosses the boundary + expect(entries.length).to.be(6); + for (let i = 0; i < entries.length; i++) { + expect(entries[i].number).to.be(8 + i); + } + }); + + it('truncation followed by re-adding entries returns the new values, not stale ones', function() { + const cacheSize = 10; + index = setupIndexWithEntries(15, { cacheSize }); + index.truncate(7); // keep entries 1-7 + // Add new entries at positions 8-10 with different numbers + for (let i = 8; i <= 10; i++) { + index.add(new Index.Entry(i + 100, i)); + } + index.flush(); + // New entries must return the just-added values, not the old truncated ones + for (let i = 8; i <= 10; i++) { + const entry = index.get(i); + expect(entry.number).to.be(i + 100); + } + }); + + it('remains fully functional after many adds that cycle the ring buffer', function() { + const cacheSize = 8; + index = setupIndexWithEntries(32, { cacheSize }); // 4 full cycles + index.close(); + index.open(); + // All entries readable via disk reads + const entries = index.all(); + expect(entries.length).to.be(32); + for (let i = 0; i < entries.length; i++) { + expect(entries[i].number).to.be(i + 1); + } + }); + + }); + describe('flush', function(){ it('returns false on a closed index', function(){ From 5d9626d42829046f48d7c13a7d7c9c59d194753e Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Sat, 21 Mar 2026 11:55:54 +0000 Subject: [PATCH 03/19] Add read-scenario benchmark and fix backwards-scan off-by-one Co-authored-by: albe <4259532+albe@users.noreply.github.com> Agent-Logs-Url: https://github.com/albe/node-event-storage/sessions/883111c6-3b9d-4839-a829-5e2c1f265c4e --- bench/package.json | 2 +- src/Storage/ReadableStorage.js | 2 +- test/Storage.spec.js | 20 ++++++++++++++++++++ 3 files changed, 22 insertions(+), 2 deletions(-) diff --git a/bench/package.json b/bench/package.json index a75fcd0..9a61e5a 100644 --- a/bench/package.json +++ b/bench/package.json @@ -2,7 +2,7 @@ "name": "event-storage-bench", "version": "0.0.1", "scripts": { - "bench": "node bench-index.js && node bench-storage.js && node bench-eventstore.js" + "bench": "node bench-index.js && node bench-storage.js && node bench-eventstore.js && node bench-read-scenarios.js" }, "dependencies": { "beautify-benchmark": "^0.2.4", diff --git a/src/Storage/ReadableStorage.js b/src/Storage/ReadableStorage.js index 7cc3ef7..2ec3e13 100644 --- a/src/Storage/ReadableStorage.js +++ b/src/Storage/ReadableStorage.js @@ -255,7 +255,7 @@ class ReadableStorage extends events.EventEmitter { if (readFrom > readUntil) { const batchSize = 10; let batchUntil = readFrom; - while (batchUntil > readUntil) { + while (batchUntil >= readUntil) { const batchFrom = Math.max(readUntil, batchUntil - batchSize); yield* reverse(this.iterateRange(batchFrom, batchUntil, index)); batchUntil = batchFrom - 1; diff --git a/test/Storage.spec.js b/test/Storage.spec.js index 4eee296..3f030af 100644 --- a/test/Storage.spec.js +++ b/test/Storage.spec.js @@ -296,6 +296,26 @@ describe('Storage', function() { expect(i).to.be(0); }); + it('reads all items in reverse when range length is a multiple of batch size', function() { + // 12 items: sequence 12, 1 (step 11) → old code yielded 2..12 then exited, + // missing item 1. Regression test for the >= readUntil boundary fix. + storage = createStorage(); + storage.open(); + + for (let i = 1; i <= 12; i++) { + storage.write({ foo: i }); + } + storage.close(); + storage.open(); + + let i = 12; + let documents = storage.readRange(i, 1); + for (let doc of documents) { + expect(doc).to.eql({ foo: i-- }); + } + expect(i).to.be(0); + }); + it('can read a sub range', function() { storage = createStorage(); storage.open(); From 18849cfaa2f5bbdc22d1c08f8d60796086e530d5 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Fri, 20 Mar 2026 16:48:09 +0000 Subject: [PATCH 04/19] Initial plan From 7e8e2d4d4b486be4a876fb07ca763a147d5b8aec Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Fri, 20 Mar 2026 16:55:35 +0000 Subject: [PATCH 05/19] Add configurable preCommit and preRead hooks with per-partition metadata support Co-authored-by: albe <4259532+albe@users.noreply.github.com> --- src/Storage/ReadableStorage.js | 19 +++++- src/Storage/WritableStorage.js | 21 +++++- test/Storage.spec.js | 119 +++++++++++++++++++++++++++++++++ 3 files changed, 157 insertions(+), 2 deletions(-) diff --git a/src/Storage/ReadableStorage.js b/src/Storage/ReadableStorage.js index 2ec3e13..1e58db0 100644 --- a/src/Storage/ReadableStorage.js +++ b/src/Storage/ReadableStorage.js @@ -66,6 +66,8 @@ class ReadableStorage extends events.EventEmitter { this.dataDirectory = path.resolve(config.dataDirectory); + this.preReadHook = null; + this.initializeIndexes(config); this.scanPartitions(config); } @@ -197,6 +199,17 @@ class ReadableStorage extends events.EventEmitter { return this.partitions[partitionIdentifier]; } + /** + * Register a hook that is called before a document is returned from a read operation. + * The hook receives the document and the partition metadata and may throw to abort the read. + * + * @api + * @param {function(object, object): void} hook A function receiving (document, partitionMetadata). + */ + preRead(hook) { + this.preReadHook = hook; + } + /** * @protected * @param {number} partitionId The partition to read from. @@ -208,7 +221,11 @@ class ReadableStorage extends events.EventEmitter { readFrom(partitionId, position, size) { const partition = this.getPartition(partitionId); const data = partition.readFrom(position, size); - return this.serializer.deserialize(data); + const document = this.serializer.deserialize(data); + if (this.preReadHook) { + this.preReadHook(document, partition.metadata); + } + return document; } /** diff --git a/src/Storage/WritableStorage.js b/src/Storage/WritableStorage.js index 34572ab..4241d4f 100644 --- a/src/Storage/WritableStorage.js +++ b/src/Storage/WritableStorage.js @@ -64,6 +64,7 @@ class WritableStorage extends ReadableStorage { this.unlock(); } this.partitioner = config.partitioner; + this.preCommitHook = null; } /** @@ -178,6 +179,17 @@ class WritableStorage extends ReadableStorage { return entry; } + /** + * Register a hook that is called before a document is written to storage. + * The hook receives the document and the partition metadata and may throw to abort the write. + * + * @api + * @param {function(object, object): void} hook A function receiving (document, partitionMetadata). + */ + preCommit(hook) { + this.preCommitHook = hook; + } + /** * Get a partition either by name or by id. * If a partition with the given name does not exist, a new one will be created. @@ -190,10 +202,14 @@ class WritableStorage extends ReadableStorage { */ getPartition(partitionIdentifier) { if (typeof partitionIdentifier === 'string') { + const partitionShortName = partitionIdentifier; const partitionName = this.storageFile + (partitionIdentifier.length ? '.' + partitionIdentifier : ''); partitionIdentifier = WritablePartition.idFor(partitionName); if (!this.partitions[partitionIdentifier]) { - this.partitions[partitionIdentifier] = this.createPartition(partitionName, this.partitionConfig); + const partitionConfig = typeof this.partitionConfig.metadata === 'function' + ? { ...this.partitionConfig, metadata: this.partitionConfig.metadata(partitionShortName) } + : this.partitionConfig; + this.partitions[partitionIdentifier] = this.createPartition(partitionName, partitionConfig); this.emit('partition-created', partitionIdentifier); } this.partitions[partitionIdentifier].open(); @@ -214,6 +230,9 @@ class WritableStorage extends ReadableStorage { const partitionName = this.partitioner(document, this.index.length + 1); const partition = this.getPartition(partitionName); + if (this.preCommitHook) { + this.preCommitHook(document, partition.metadata); + } const position = partition.write(data, this.length, callback); assert(position !== false, 'Error writing document.'); diff --git a/test/Storage.spec.js b/test/Storage.spec.js index 3f030af..a2df5ca 100644 --- a/test/Storage.spec.js +++ b/test/Storage.spec.js @@ -1040,4 +1040,123 @@ describe('Storage', function() { storage.getPartition(''); }); }); + + describe('preCommit', function() { + + it('calls the hook before writing with the document and partition metadata', function() { + storage = createStorage({ metadata: { allowedRoles: ['admin'] } }); + storage.open(); + + let hookDocument, hookMetadata; + storage.preCommit((document, partitionMetadata) => { + hookDocument = document; + hookMetadata = partitionMetadata; + }); + + storage.write({ foo: 'bar' }); + + expect(hookDocument).to.eql({ foo: 'bar' }); + expect(hookMetadata.allowedRoles).to.eql(['admin']); + }); + + it('aborts the write when the hook throws', function() { + storage = createStorage(); + storage.open(); + + storage.preCommit(() => { + throw new Error('not allowed'); + }); + + expect(() => storage.write({ foo: 'bar' })).to.throwError(/not allowed/); + expect(storage.length).to.be(0); + }); + + it('allows writes when the hook does not throw', function() { + storage = createStorage({ metadata: { allowedRoles: ['admin'] } }); + storage.open(); + + const globalContext = { authorizedRoles: ['admin', 'user'] }; + storage.preCommit((document, partitionMetadata) => { + if (!partitionMetadata.allowedRoles.some(role => globalContext.authorizedRoles.includes(role))) { + throw new Error('Not allowed'); + } + }); + + expect(storage.write({ foo: 'bar' })).to.be(1); + }); + + it('uses per-partition metadata when config.metadata is a function', function() { + storage = createStorage({ + partitioner: (doc) => doc.type, + metadata: (partitionName) => ({ allowedRoles: partitionName === 'admin' ? ['admin'] : ['user'] }) + }); + storage.open(); + + const calls = []; + storage.preCommit((document, partitionMetadata) => { + calls.push({ document, metadata: partitionMetadata }); + }); + + storage.write({ foo: 1, type: 'admin' }); + storage.write({ foo: 2, type: 'user' }); + + expect(calls.length).to.be(2); + expect(calls[0].metadata.allowedRoles).to.eql(['admin']); + expect(calls[1].metadata.allowedRoles).to.eql(['user']); + }); + + }); + + describe('preRead', function() { + + it('calls the hook after reading with the document and partition metadata', function() { + storage = createStorage({ metadata: { allowedRoles: ['admin'] } }); + storage.open(); + storage.write({ foo: 'bar' }); + + let hookDocument, hookMetadata; + storage.preRead((document, partitionMetadata) => { + hookDocument = document; + hookMetadata = partitionMetadata; + }); + + const result = storage.read(1); + + expect(result).to.eql({ foo: 'bar' }); + expect(hookDocument).to.eql({ foo: 'bar' }); + expect(hookMetadata.allowedRoles).to.eql(['admin']); + }); + + it('aborts the read when the hook throws', function() { + storage = createStorage(); + storage.open(); + storage.write({ foo: 'bar' }); + + storage.preRead(() => { + throw new Error('read not allowed'); + }); + + expect(() => storage.read(1)).to.throwError(/read not allowed/); + }); + + it('calls the hook for each document in a range read', function() { + storage = createStorage({ metadata: { allowedRoles: ['user'] } }); + storage.open(); + + for (let i = 1; i <= 3; i++) { + storage.write({ foo: i }); + } + + const hookCalls = []; + storage.preRead((document, partitionMetadata) => { + hookCalls.push({ document, metadata: partitionMetadata }); + }); + + const docs = Array.from(storage.readRange(1, 3)); + expect(docs.length).to.be(3); + expect(hookCalls.length).to.be(3); + expect(hookCalls[0].metadata.allowedRoles).to.eql(['user']); + }); + + }); }); From c3bcceb79f82fe36278578767a27e786c76aaca2 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Fri, 20 Mar 2026 17:11:34 +0000 Subject: [PATCH 06/19] Fix preRead hook: invoke before partition.readFrom() with (position, partitionMetadata) Co-authored-by: albe <4259532+albe@users.noreply.github.com> --- src/Storage/ReadableStorage.js | 13 ++++++------- test/Storage.spec.js | 14 +++++++------- 2 files changed, 13 insertions(+), 14 deletions(-) diff --git a/src/Storage/ReadableStorage.js b/src/Storage/ReadableStorage.js index 1e58db0..d142303 100644 --- a/src/Storage/ReadableStorage.js +++ b/src/Storage/ReadableStorage.js @@ -200,11 +200,11 @@ class ReadableStorage extends events.EventEmitter { } /** - * Register a hook that is called before a document is returned from a read operation. - * The hook receives the document and the partition metadata and may throw to abort the read. + * Register a hook that is called before a document is read from a partition. + * The hook receives the position and the partition metadata and may throw to abort the read. * * @api - * @param {function(object, object): void} hook A function receiving (document, partitionMetadata). + * @param {function(number, object): void} hook A function receiving (position, partitionMetadata). */ preRead(hook) { this.preReadHook = hook; @@ -220,12 +220,11 @@ class ReadableStorage extends events.EventEmitter { */ readFrom(partitionId, position, size) { const partition = this.getPartition(partitionId); - const data = partition.readFrom(position, size); - const document = this.serializer.deserialize(data); if (this.preReadHook) { - this.preReadHook(document, partition.metadata); + this.preReadHook(position, partition.metadata); } - return document; + const data = partition.readFrom(position, size); + return this.serializer.deserialize(data); } /** diff --git a/test/Storage.spec.js b/test/Storage.spec.js index a2df5ca..ba55812 100644 --- a/test/Storage.spec.js +++ b/test/Storage.spec.js @@ -1109,21 +1109,21 @@ describe('Storage', function() { describe('preRead', function() { - it('calls the hook after reading with the document and partition metadata', function() { + it('calls the hook before reading with the position and partition metadata', function() { storage = createStorage({ metadata: { allowedRoles: ['admin'] } }); storage.open(); storage.write({ foo: 'bar' }); - let hookDocument, hookMetadata; - storage.preRead((document, partitionMetadata) => { - hookDocument = document; + let hookPosition, hookMetadata; + storage.preRead((position, partitionMetadata) => { + hookPosition = position; hookMetadata = partitionMetadata; }); const result = storage.read(1); expect(result).to.eql({ foo: 'bar' }); - expect(hookDocument).to.eql({ foo: 'bar' }); + expect(typeof hookPosition).to.be('number'); expect(hookMetadata.allowedRoles).to.eql(['admin']); }); @@ -1148,8 +1148,8 @@ describe('Storage', function() { } const hookCalls = []; - storage.preRead((document, partitionMetadata) => { - hookCalls.push({ document, metadata: partitionMetadata }); + storage.preRead((position, partitionMetadata) => { + hookCalls.push({ position, metadata: partitionMetadata }); }); const docs = Array.from(storage.readRange(1, 3)); From 37b639b3b8e46dd7415bfc8bf782c570caa5de88 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Fri, 20 Mar 2026 17:34:44 +0000 Subject: [PATCH 07/19] Add README section on partition metadata and access control hooks Co-authored-by: albe <4259532+albe@users.noreply.github.com> --- README.md | 67 +++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 67 insertions(+) diff --git a/README.md b/README.md index 0cee1ac..1088ef0 100644 --- a/README.md +++ b/README.md @@ -600,3 +600,70 @@ specify an own unique random secret for this in production. Alternatively you should always explicitly specify your matchers when opening an existing index, since that will check the specified matcher matches the one in the index file. + +### Partition Metadata and Access Control Hooks + +Each partition can carry an arbitrary metadata object that is written once into the partition's file header at +creation time and cannot be changed afterwards. This makes it a stable anchor for access control policies that +need to be defined when a stream is first created. + +Two hooks can be registered on the storage to intercept reads and writes: + +- **`preCommit(hook)`** — called inside `write()` with `(document, partitionMetadata)` *before* the document is + written. Throw from the hook to abort the write. +- **`preRead(hook)`** — called inside `readFrom()` with `(position, partitionMetadata)` *before* the document is + read from disk. Throw from the hook to abort the read. + +Because the metadata is defined per partition, you can store different access control information for each +stream/partition. Pass a function as `config.metadata` and it will be called with the partition name whenever a +new partition is created: + +```javascript +const EventStore = require('event-storage'); + +// Application-owned context — not part of the library +const globalContext = { authorizedRoles: ['user'] }; + +const eventstore = new EventStore('my-event-store', { + storageDirectory: './data', + storageConfig: { + // Called once per partition at creation time; the result is persisted in the file header + metadata: (partitionName) => ({ + allowedRoles: partitionName === 'admin-stream' ? ['admin'] : ['user'] + }) + } +}); + +eventstore.on('ready', () => { + const storage = eventstore.storage; + + // Reject writes to partitions whose allowedRoles don't overlap with the caller's roles + storage.preCommit((document, partitionMetadata) => { + if (!partitionMetadata.allowedRoles.some(role => globalContext.authorizedRoles.includes(role))) { + throw new Error( + 'Not authorized to write to this partition with roles ' + + JSON.stringify(globalContext.authorizedRoles) + ); + } + }); + + // Reject reads from partitions whose allowedRoles don't overlap with the caller's roles + storage.preRead((position, partitionMetadata) => { + if (!partitionMetadata.allowedRoles.some(role => globalContext.authorizedRoles.includes(role))) { + throw new Error( + 'Not authorized to read from this partition with roles ' + + JSON.stringify(globalContext.authorizedRoles) + ); + } + }); + + // This write succeeds — 'user' is in allowedRoles for 'user-stream' + eventstore.commit('user-stream', [{ type: 'UserCreated', id: 1 }], 0); + + // This write throws — 'admin' is NOT in globalContext.authorizedRoles + eventstore.commit('admin-stream', [{ type: 'AdminAction' }], 0); +}); +``` + +The `globalContext` object is entirely application-owned. The library only calls the hook with the position (or +document for `preCommit`) and the stored partition metadata — everything else is up to the application. From 523a963c020a46360ed36244fd89d413e88ac7ca Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Fri, 20 Mar 2026 17:45:52 +0000 Subject: [PATCH 08/19] Add performance note for preCommit/preRead hooks in README Co-authored-by: albe <4259532+albe@users.noreply.github.com> Agent-Logs-Url: https://github.com/albe/node-event-storage/sessions/a3d29880-0ef1-4485-b1f8-3b3405a5feab --- README.md | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/README.md b/README.md index 1088ef0..18f0d92 100644 --- a/README.md +++ b/README.md @@ -614,6 +614,10 @@ Two hooks can be registered on the storage to intercept reads and writes: - **`preRead(hook)`** — called inside `readFrom()` with `(position, partitionMetadata)` *before* the document is read from disk. Throw from the hook to abort the read. +> **Performance note:** Both hooks are invoked synchronously on *every* read and write operation. Keep the hook +> logic as cheap and fast as possible — avoid I/O, async operations, or any non-trivial computation inside a hook, +> as the overhead will be paid on every document accessed. + Because the metadata is defined per partition, you can store different access control information for each stream/partition. Pass a function as `config.metadata` and it will be called with the partition name whenever a new partition is created: From f67a7d327ca687dc6719e91d640f43014fd32805 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Fri, 20 Mar 2026 18:08:37 +0000 Subject: [PATCH 09/19] Expose preCommit/preRead hooks and streamMetadata on EventStore Co-authored-by: albe <4259532+albe@users.noreply.github.com> Agent-Logs-Url: https://github.com/albe/node-event-storage/sessions/3995bf0a-704b-4c73-8c31-acebe87c88ba --- README.md | 74 ++++++++++++++--------- src/EventStore.js | 41 +++++++++++++ test/EventStore.spec.js | 126 ++++++++++++++++++++++++++++++++++++++++ 3 files changed, 213 insertions(+), 28 deletions(-) diff --git a/README.md b/README.md index 18f0d92..a96adf2 100644 --- a/README.md +++ b/README.md @@ -603,24 +603,26 @@ check the specified matcher matches the one in the index file. ### Partition Metadata and Access Control Hooks -Each partition can carry an arbitrary metadata object that is written once into the partition's file header at -creation time and cannot be changed afterwards. This makes it a stable anchor for access control policies that +Each stream (partition) can carry an arbitrary metadata object that is written once into the partition's file header +at creation time and cannot be changed afterwards. This makes it a stable anchor for access control policies that need to be defined when a stream is first created. -Two hooks can be registered on the storage to intercept reads and writes: +Two hooks can be registered on the `EventStore` (or directly on the `Storage` instance) to intercept reads and writes: -- **`preCommit(hook)`** — called inside `write()` with `(document, partitionMetadata)` *before* the document is - written. Throw from the hook to abort the write. -- **`preRead(hook)`** — called inside `readFrom()` with `(position, partitionMetadata)` *before* the document is - read from disk. Throw from the hook to abort the read. +- **`preCommit(hook)`** — called with `(event, partitionMetadata)` *before* the event is written. Throw from the hook + to abort the write. +- **`preRead(hook)`** — called with `(position, partitionMetadata)` *before* the event is read from disk. Throw from + the hook to abort the read. > **Performance note:** Both hooks are invoked synchronously on *every* read and write operation. Keep the hook > logic as cheap and fast as possible — avoid I/O, async operations, or any non-trivial computation inside a hook, -> as the overhead will be paid on every document accessed. +> as the overhead will be paid on every event accessed. -Because the metadata is defined per partition, you can store different access control information for each -stream/partition. Pass a function as `config.metadata` and it will be called with the partition name whenever a -new partition is created: +#### Using the `EventStore` API + +At the `EventStore` level the unit of work is a *stream*. Use `config.streamMetadata` to attach metadata per stream: +the value can be a plain object (same metadata for every stream), or a function `(streamName) => object` to assign +different metadata to each stream. Both `preCommit` and `preRead` are exposed directly on `EventStore`. ```javascript const EventStore = require('event-storage'); @@ -630,32 +632,28 @@ const globalContext = { authorizedRoles: ['user'] }; const eventstore = new EventStore('my-event-store', { storageDirectory: './data', - storageConfig: { - // Called once per partition at creation time; the result is persisted in the file header - metadata: (partitionName) => ({ - allowedRoles: partitionName === 'admin-stream' ? ['admin'] : ['user'] - }) - } + // Called once per stream at creation time; the result is persisted in the file header + streamMetadata: (streamName) => ({ + allowedRoles: streamName === 'admin-stream' ? ['admin'] : ['user'] + }) }); eventstore.on('ready', () => { - const storage = eventstore.storage; - - // Reject writes to partitions whose allowedRoles don't overlap with the caller's roles - storage.preCommit((document, partitionMetadata) => { - if (!partitionMetadata.allowedRoles.some(role => globalContext.authorizedRoles.includes(role))) { + // Reject writes to streams whose allowedRoles don't overlap with the caller's roles + eventstore.preCommit((event, streamMetadata) => { + if (!streamMetadata.allowedRoles.some(role => globalContext.authorizedRoles.includes(role))) { throw new Error( - 'Not authorized to write to this partition with roles ' + + 'Not authorized to write to this stream with roles ' + JSON.stringify(globalContext.authorizedRoles) ); } }); - // Reject reads from partitions whose allowedRoles don't overlap with the caller's roles - storage.preRead((position, partitionMetadata) => { - if (!partitionMetadata.allowedRoles.some(role => globalContext.authorizedRoles.includes(role))) { + // Reject reads from streams whose allowedRoles don't overlap with the caller's roles + eventstore.preRead((position, streamMetadata) => { + if (!streamMetadata.allowedRoles.some(role => globalContext.authorizedRoles.includes(role))) { throw new Error( - 'Not authorized to read from this partition with roles ' + + 'Not authorized to read from this stream with roles ' + JSON.stringify(globalContext.authorizedRoles) ); } @@ -669,5 +667,25 @@ eventstore.on('ready', () => { }); ``` +#### Using the `Storage` API directly + +If you work with `Storage` directly (bypassing `EventStore`), use `config.metadata` — a function +`(partitionName) => object` called whenever a new partition is created — and call `storage.preCommit` / +`storage.preRead` on the storage instance: + +```javascript +const Storage = require('event-storage').Storage; + +const storage = new Storage('events', { + partitioner: (doc) => doc.stream, + metadata: (partitionName) => ({ + allowedRoles: partitionName === 'admin' ? ['admin'] : ['user'] + }) +}); + +storage.preCommit((document, partitionMetadata) => { /* ... */ }); +storage.preRead((position, partitionMetadata) => { /* ... */ }); +``` + The `globalContext` object is entirely application-owned. The library only calls the hook with the position (or -document for `preCommit`) and the stored partition metadata — everything else is up to the application. +event for `preCommit`) and the stored metadata — everything else is up to the application. diff --git a/src/EventStore.js b/src/EventStore.js index 8fe66ac..a4588ae 100644 --- a/src/EventStore.js +++ b/src/EventStore.js @@ -28,6 +28,10 @@ class EventStore extends events.EventEmitter { * @param {string} [config.streamsDirectory] The directory where the streams should be stored. Default '{storageDirectory}/streams'. * @param {object} [config.storageConfig] Additional config options given to the storage backend. See `Storage`. * @param {boolean} [config.readOnly] If the storage should be mounted in read-only mode. + * @param {object|function(string): object} [config.streamMetadata] A metadata object or a function `(streamName) => object` + * that is called whenever a new stream partition is created. The returned object is stored once in the partition + * file header and surfaced to `preCommit` / `preRead` hooks. Takes precedence only when + * `config.storageConfig.metadata` is not also set. */ constructor(storeName = 'eventstore', config = {}) { super(); @@ -44,6 +48,17 @@ class EventStore extends events.EventEmitter { readOnly: config.readOnly || false }; const storageConfig = Object.assign(defaults, config.storageConfig); + + // Translate the high-level streamMetadata option into the storage-level metadata function, + // but only when the caller has not already provided a lower-level storageConfig.metadata. + if (config.streamMetadata !== undefined && storageConfig.metadata === undefined) { + if (typeof config.streamMetadata === 'function') { + storageConfig.metadata = config.streamMetadata; + } else { + storageConfig.metadata = (streamName) => config.streamMetadata[streamName] || {}; + } + } + this.initialize(storeName, storageConfig); } @@ -145,6 +160,32 @@ class EventStore extends events.EventEmitter { this.storage.close(); } + /** + * Register a hook that is called before an event is committed to storage. + * The hook receives `(event, partitionMetadata)` and may throw to abort the write. + * The hook is invoked on every write, so its logic should be cheap, fast, and synchronous. + * + * @api + * @param {function(object, object): void} hook A function receiving (event, partitionMetadata). + * @throws {Error} If the storage was opened in read-only mode. + */ + preCommit(hook) { + assert(!(this.storage instanceof Storage.ReadOnly), 'The storage was opened in read-only mode. Can not register a preCommit hook on it.'); + this.storage.preCommit(hook); + } + + /** + * Register a hook that is called before an event is read from storage. + * The hook receives `(position, partitionMetadata)` and may throw to abort the read. + * The hook is invoked on every read, so its logic should be cheap, fast, and synchronous. + * + * @api + * @param {function(number, object): void} hook A function receiving (position, partitionMetadata). + */ + preRead(hook) { + this.storage.preRead(hook); + } + /** * Get the number of events stored. * diff --git a/test/EventStore.spec.js b/test/EventStore.spec.js index eea0e08..e3d085b 100644 --- a/test/EventStore.spec.js +++ b/test/EventStore.spec.js @@ -963,4 +963,130 @@ describe('EventStore', function() { }); + describe('preCommit', function() { + + it('calls the hook before writing with the event and partition metadata', function() { + eventstore = new EventStore({ + storageDirectory, + streamMetadata: { 'foo': { allowedRoles: ['admin'] } } + }); + const calls = []; + eventstore.preCommit((event, metadata) => calls.push({ event, metadata })); + eventstore.commit('foo', [{ type: 'FooCreated' }]); + expect(calls).to.have.length(1); + expect(calls[0].metadata.allowedRoles).to.eql(['admin']); + expect(calls[0].event.payload.type).to.be('FooCreated'); + }); + + it('aborts the write when the hook throws', function() { + eventstore = new EventStore({ + storageDirectory, + streamMetadata: { 'foo': { allowedRoles: ['admin'] } } + }); + eventstore.preCommit((event, metadata) => { + if (!metadata.allowedRoles.includes('user')) { + throw new Error('Not authorized'); + } + }); + expect(() => eventstore.commit('foo', [{ type: 'FooCreated' }])).to.throwError(/Not authorized/); + expect(eventstore.length).to.be(0); + }); + + it('supports per-stream metadata via a function', function() { + eventstore = new EventStore({ + storageDirectory, + streamMetadata: (streamName) => ({ streamName }) + }); + const calls = []; + eventstore.preCommit((event, metadata) => calls.push(metadata)); + eventstore.commit('foo', [{ type: 'A' }]); + eventstore.commit('bar', [{ type: 'B' }]); + expect(calls[0].streamName).to.be('foo'); + expect(calls[1].streamName).to.be('bar'); + }); + + it('throws when the storage is opened in read-only mode', function(done) { + eventstore = new EventStore({ + storageDirectory, + streamMetadata: { 'foo': { allowedRoles: ['admin'] } } + }); + eventstore.commit('foo', [{ type: 'FooCreated' }]); + eventstore.close(); + + const readOnly = new EventStore({ + storageDirectory, + storageConfig: { readOnly: true } + }); + readOnly.on('ready', () => { + expect(() => readOnly.preCommit(() => {})).to.throwError(); + readOnly.close(); + eventstore = null; + done(); + }); + }); + + }); + + describe('preRead', function() { + + it('calls the hook before reading with the position and partition metadata', function() { + eventstore = new EventStore({ + storageDirectory, + streamMetadata: { 'foo': { allowedRoles: ['user'] } } + }); + eventstore.commit('foo', [{ type: 'FooCreated' }]); + const calls = []; + eventstore.preRead((position, metadata) => calls.push({ position, metadata })); + + const stream = eventstore.getEventStream('foo'); + const events = Array.from(stream); + expect(calls).to.have.length(1); + expect(calls[0].metadata.allowedRoles).to.eql(['user']); + expect(typeof calls[0].position).to.be('number'); + }); + + it('aborts the read when the hook throws', function() { + eventstore = new EventStore({ + storageDirectory, + streamMetadata: { 'foo': { allowedRoles: ['admin'] } } + }); + eventstore.commit('foo', [{ type: 'FooCreated' }]); + eventstore.preRead((position, metadata) => { + if (!metadata.allowedRoles.includes('user')) { + throw new Error('Not authorized to read'); + } + }); + + // EventStream.next() catches iterator errors, so iteration stops with no events + const stream = eventstore.getEventStream('foo'); + const result = Array.from(stream); + expect(result).to.have.length(0); + }); + + it('works on a read-only store', function(done) { + eventstore = new EventStore({ + storageDirectory, + streamMetadata: { 'foo': { allowedRoles: ['user'] } } + }); + eventstore.commit('foo', [{ type: 'FooCreated' }]); + eventstore.close(); + + const readOnly = new EventStore({ + storageDirectory, + storageConfig: { readOnly: true } + }); + readOnly.on('ready', () => { + const calls = []; + expect(() => readOnly.preRead((pos, meta) => calls.push(meta))).to.not.throwError(); + const stream = readOnly.getEventStream('foo'); + Array.from(stream); + expect(calls).to.have.length(1); + readOnly.close(); + eventstore = null; + done(); + }); + }); + + }); + }); From 1575103202c9911c71ca61367dd41ad5cf10cb42 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Sat, 21 Mar 2026 11:53:21 +0000 Subject: [PATCH 10/19] Switch preCommit/preRead from single-hook fields to EventEmitter events Co-authored-by: albe <4259532+albe@users.noreply.github.com> Agent-Logs-Url: https://github.com/albe/node-event-storage/sessions/72e4073d-8177-4078-89fe-5f8b88c97876 --- README.md | 43 +++++++----- src/EventStore.js | 89 ++++++++++++++++++++++--- src/Storage/ReadableStorage.js | 14 ++-- src/Storage/WritableStorage.js | 13 ++-- test/EventStore.spec.js | 117 +++++++++++++++++++++++++++++++++ 5 files changed, 235 insertions(+), 41 deletions(-) diff --git a/README.md b/README.md index a96adf2..6b29670 100644 --- a/README.md +++ b/README.md @@ -607,22 +607,31 @@ Each stream (partition) can carry an arbitrary metadata object that is written o at creation time and cannot be changed afterwards. This makes it a stable anchor for access control policies that need to be defined when a stream is first created. -Two hooks can be registered on the `EventStore` (or directly on the `Storage` instance) to intercept reads and writes: +Two handlers can be registered on the `EventStore` (or directly on the `Storage` instance) to intercept reads and +writes, using the standard Node.js `EventEmitter` API (`on`, `once`, `off`) or the convenience wrapper methods: -- **`preCommit(hook)`** — called with `(event, partitionMetadata)` *before* the event is written. Throw from the hook - to abort the write. -- **`preRead(hook)`** — called with `(position, partitionMetadata)` *before* the event is read from disk. Throw from - the hook to abort the read. +- **`on('preCommit', handler)`** — handler called with `(event, partitionMetadata)` *before* the event is written. Throw from the handler to abort the write. +- **`on('preRead', handler)`** — handler called with `(position, partitionMetadata)` *before* the event is read from disk. Throw from the handler to abort the read. -> **Performance note:** Both hooks are invoked synchronously on *every* read and write operation. Keep the hook -> logic as cheap and fast as possible — avoid I/O, async operations, or any non-trivial computation inside a hook, +Multiple handlers can be registered for the same event; they all run on every operation in registration order. +Individual handlers can be removed with `off('preCommit', handler)` / `off('preRead', handler)`, and `once()` is +available for one-time handlers. + +> **Performance note:** Both handlers are invoked synchronously on *every* read and write operation. Keep the handler +> logic as cheap and fast as possible — avoid I/O, async operations, or any non-trivial computation inside a handler, > as the overhead will be paid on every event accessed. #### Using the `EventStore` API -At the `EventStore` level the unit of work is a *stream*. Use `config.streamMetadata` to attach metadata per stream: -the value can be a plain object (same metadata for every stream), or a function `(streamName) => object` to assign -different metadata to each stream. Both `preCommit` and `preRead` are exposed directly on `EventStore`. +At the `EventStore` level the unit of work is a *stream*. Use `config.streamMetadata` to attach metadata per stream. +It accepts either: +- A **function** `(streamName) => object` — called once per stream at creation time, so each stream can have its own + metadata (shown in the example below). +- A **plain object** `{ streamName: metadataObject, ... }` — each key is a stream name whose value becomes that + stream's metadata; streams not present in the object receive an empty metadata object `{}`. + +Register handlers using `eventstore.on('preCommit', ...)` / `eventstore.on('preRead', ...)`, +or via the convenience methods `eventstore.preCommit(handler)` / `eventstore.preRead(handler)`. ```javascript const EventStore = require('event-storage'); @@ -640,7 +649,7 @@ const eventstore = new EventStore('my-event-store', { eventstore.on('ready', () => { // Reject writes to streams whose allowedRoles don't overlap with the caller's roles - eventstore.preCommit((event, streamMetadata) => { + eventstore.on('preCommit', (event, streamMetadata) => { if (!streamMetadata.allowedRoles.some(role => globalContext.authorizedRoles.includes(role))) { throw new Error( 'Not authorized to write to this stream with roles ' + @@ -650,7 +659,7 @@ eventstore.on('ready', () => { }); // Reject reads from streams whose allowedRoles don't overlap with the caller's roles - eventstore.preRead((position, streamMetadata) => { + eventstore.on('preRead', (position, streamMetadata) => { if (!streamMetadata.allowedRoles.some(role => globalContext.authorizedRoles.includes(role))) { throw new Error( 'Not authorized to read from this stream with roles ' + @@ -670,8 +679,8 @@ eventstore.on('ready', () => { #### Using the `Storage` API directly If you work with `Storage` directly (bypassing `EventStore`), use `config.metadata` — a function -`(partitionName) => object` called whenever a new partition is created — and call `storage.preCommit` / -`storage.preRead` on the storage instance: +`(partitionName) => object` called whenever a new partition is created — and register handlers using +`storage.on('preCommit', ...)` / `storage.on('preRead', ...)` or the equivalent convenience methods: ```javascript const Storage = require('event-storage').Storage; @@ -683,9 +692,9 @@ const storage = new Storage('events', { }) }); -storage.preCommit((document, partitionMetadata) => { /* ... */ }); -storage.preRead((position, partitionMetadata) => { /* ... */ }); +storage.on('preCommit', (document, partitionMetadata) => { /* ... */ }); +storage.on('preRead', (position, partitionMetadata) => { /* ... */ }); ``` -The `globalContext` object is entirely application-owned. The library only calls the hook with the position (or +The `globalContext` object is entirely application-owned. The library only calls the handler with the position (or event for `preCommit`) and the stored metadata — everything else is up to the application. diff --git a/src/EventStore.js b/src/EventStore.js index a4588ae..2a50595 100644 --- a/src/EventStore.js +++ b/src/EventStore.js @@ -161,29 +161,100 @@ class EventStore extends events.EventEmitter { } /** - * Register a hook that is called before an event is committed to storage. - * The hook receives `(event, partitionMetadata)` and may throw to abort the write. - * The hook is invoked on every write, so its logic should be cheap, fast, and synchronous. + * Override EventEmitter.on() to delegate 'preCommit' and 'preRead' event registrations + * to the underlying storage, so that `eventstore.on('preCommit', handler)` works naturally. + * All other events are handled by the default EventEmitter. + * + * @param {string} event + * @param {function} listener + * @returns {this} + */ + on(event, listener) { + if (event === 'preCommit' || event === 'preRead') { + if (event === 'preCommit') { + assert(!(this.storage instanceof Storage.ReadOnly), 'The storage was opened in read-only mode. Can not register a preCommit handler on it.'); + } + this.storage.on(event, listener); + return this; + } + return super.on(event, listener); + } + + /** + * @inheritDoc + */ + addListener(event, listener) { + return this.on(event, listener); + } + + /** + * Override EventEmitter.once() to delegate 'preCommit' and 'preRead' to the underlying storage. + * + * @param {string} event + * @param {function} listener + * @returns {this} + */ + once(event, listener) { + if (event === 'preCommit' || event === 'preRead') { + if (event === 'preCommit') { + assert(!(this.storage instanceof Storage.ReadOnly), 'The storage was opened in read-only mode. Can not register a preCommit handler on it.'); + } + this.storage.once(event, listener); + return this; + } + return super.once(event, listener); + } + + /** + * Override EventEmitter.off() / removeListener() to delegate 'preCommit' and 'preRead' + * to the underlying storage. + * + * @param {string} event + * @param {function} listener + * @returns {this} + */ + off(event, listener) { + if (event === 'preCommit' || event === 'preRead') { + this.storage.off(event, listener); + return this; + } + return super.off(event, listener); + } + + /** + * @inheritDoc + */ + removeListener(event, listener) { + return this.off(event, listener); + } + + /** + * Convenience method to register a handler called before an event is committed to storage. + * Equivalent to `eventstore.on('preCommit', hook)`. + * The handler receives `(event, partitionMetadata)` and may throw to abort the write. + * Multiple handlers can be registered; all run on every write in registration order. + * The handler is invoked on every write, so its logic should be cheap, fast, and synchronous. * * @api * @param {function(object, object): void} hook A function receiving (event, partitionMetadata). * @throws {Error} If the storage was opened in read-only mode. */ preCommit(hook) { - assert(!(this.storage instanceof Storage.ReadOnly), 'The storage was opened in read-only mode. Can not register a preCommit hook on it.'); - this.storage.preCommit(hook); + this.on('preCommit', hook); } /** - * Register a hook that is called before an event is read from storage. - * The hook receives `(position, partitionMetadata)` and may throw to abort the read. - * The hook is invoked on every read, so its logic should be cheap, fast, and synchronous. + * Convenience method to register a handler called before an event is read from storage. + * Equivalent to `eventstore.on('preRead', hook)`. + * The handler receives `(position, partitionMetadata)` and may throw to abort the read. + * Multiple handlers can be registered; all run on every read in registration order. + * The handler is invoked on every read, so its logic should be cheap, fast, and synchronous. * * @api * @param {function(number, object): void} hook A function receiving (position, partitionMetadata). */ preRead(hook) { - this.storage.preRead(hook); + this.on('preRead', hook); } /** diff --git a/src/Storage/ReadableStorage.js b/src/Storage/ReadableStorage.js index d142303..59f1558 100644 --- a/src/Storage/ReadableStorage.js +++ b/src/Storage/ReadableStorage.js @@ -66,8 +66,6 @@ class ReadableStorage extends events.EventEmitter { this.dataDirectory = path.resolve(config.dataDirectory); - this.preReadHook = null; - this.initializeIndexes(config); this.scanPartitions(config); } @@ -200,14 +198,16 @@ class ReadableStorage extends events.EventEmitter { } /** - * Register a hook that is called before a document is read from a partition. - * The hook receives the position and the partition metadata and may throw to abort the read. + * Register a handler that is called before a document is read from a partition. + * The handler receives the position and the partition metadata and may throw to abort the read. + * Multiple handlers can be registered; all run on every read in registration order. + * Equivalent to `storage.on('preRead', hook)`. * * @api * @param {function(number, object): void} hook A function receiving (position, partitionMetadata). */ preRead(hook) { - this.preReadHook = hook; + this.on('preRead', hook); } /** @@ -220,9 +220,7 @@ class ReadableStorage extends events.EventEmitter { */ readFrom(partitionId, position, size) { const partition = this.getPartition(partitionId); - if (this.preReadHook) { - this.preReadHook(position, partition.metadata); - } + this.emit('preRead', position, partition.metadata); const data = partition.readFrom(position, size); return this.serializer.deserialize(data); } diff --git a/src/Storage/WritableStorage.js b/src/Storage/WritableStorage.js index 4241d4f..e14ffce 100644 --- a/src/Storage/WritableStorage.js +++ b/src/Storage/WritableStorage.js @@ -64,7 +64,6 @@ class WritableStorage extends ReadableStorage { this.unlock(); } this.partitioner = config.partitioner; - this.preCommitHook = null; } /** @@ -180,14 +179,16 @@ class WritableStorage extends ReadableStorage { } /** - * Register a hook that is called before a document is written to storage. - * The hook receives the document and the partition metadata and may throw to abort the write. + * Register a handler that is called before a document is written to storage. + * The handler receives the document and the partition metadata and may throw to abort the write. + * Multiple handlers can be registered; all run on every write in registration order. + * Equivalent to `storage.on('preCommit', hook)`. * * @api * @param {function(object, object): void} hook A function receiving (document, partitionMetadata). */ preCommit(hook) { - this.preCommitHook = hook; + this.on('preCommit', hook); } /** @@ -230,9 +231,7 @@ class WritableStorage extends ReadableStorage { const partitionName = this.partitioner(document, this.index.length + 1); const partition = this.getPartition(partitionName); - if (this.preCommitHook) { - this.preCommitHook(document, partition.metadata); - } + this.emit('preCommit', document, partition.metadata); const position = partition.write(data, this.length, callback); assert(position !== false, 'Error writing document.'); diff --git a/test/EventStore.spec.js b/test/EventStore.spec.js index e3d085b..50da805 100644 --- a/test/EventStore.spec.js +++ b/test/EventStore.spec.js @@ -963,6 +963,7 @@ describe('EventStore', function() { }); + describe('preCommit', function() { it('calls the hook before writing with the event and partition metadata', function() { @@ -1025,6 +1026,77 @@ describe('EventStore', function() { }); }); + it('supports eventstore.on("preCommit", handler) style', function() { + eventstore = new EventStore({ + storageDirectory, + streamMetadata: { 'foo': { allowedRoles: ['admin'] } } + }); + const calls = []; + eventstore.on('preCommit', (event, metadata) => calls.push({ event, metadata })); + eventstore.commit('foo', [{ type: 'FooCreated' }]); + expect(calls).to.have.length(1); + expect(calls[0].metadata.allowedRoles).to.eql(['admin']); + }); + + it('supports multiple handlers registered via on()', function() { + eventstore = new EventStore({ + storageDirectory, + streamMetadata: { 'foo': { allowedRoles: ['admin'] } } + }); + const order = []; + eventstore.on('preCommit', () => order.push('first')); + eventstore.on('preCommit', () => order.push('second')); + eventstore.commit('foo', [{ type: 'FooCreated' }]); + expect(order).to.eql(['first', 'second']); + }); + + it('supports removal of a handler via off()', function() { + eventstore = new EventStore({ + storageDirectory, + streamMetadata: { 'foo': { allowedRoles: ['admin'] } } + }); + let callCount = 0; + const handler = () => callCount++; + eventstore.on('preCommit', handler); + eventstore.commit('foo', [{ type: 'FooCreated' }]); + expect(callCount).to.be(1); + eventstore.off('preCommit', handler); + eventstore.commit('foo', [{ type: 'FooUpdated' }]); + expect(callCount).to.be(1); + }); + + it('supports once() for a one-time handler', function() { + eventstore = new EventStore({ + storageDirectory, + streamMetadata: { 'foo': { allowedRoles: ['admin'] } } + }); + let callCount = 0; + eventstore.once('preCommit', () => callCount++); + eventstore.commit('foo', [{ type: 'FooCreated' }]); + eventstore.commit('foo', [{ type: 'FooUpdated' }]); + expect(callCount).to.be(1); + }); + + it('throws when registering preCommit via on() on a read-only store', function(done) { + eventstore = new EventStore({ + storageDirectory, + streamMetadata: { 'foo': { allowedRoles: ['admin'] } } + }); + eventstore.commit('foo', [{ type: 'FooCreated' }]); + eventstore.close(); + + const readOnly = new EventStore({ + storageDirectory, + storageConfig: { readOnly: true } + }); + readOnly.on('ready', () => { + expect(() => readOnly.on('preCommit', () => {})).to.throwError(); + readOnly.close(); + eventstore = null; + done(); + }); + }); + }); describe('preRead', function() { @@ -1087,6 +1159,51 @@ describe('EventStore', function() { }); }); + it('supports eventstore.on("preRead", handler) style', function() { + eventstore = new EventStore({ + storageDirectory, + streamMetadata: { 'foo': { allowedRoles: ['user'] } } + }); + eventstore.commit('foo', [{ type: 'FooCreated' }]); + const calls = []; + eventstore.on('preRead', (position, metadata) => calls.push({ position, metadata })); + + const stream = eventstore.getEventStream('foo'); + Array.from(stream); + expect(calls).to.have.length(1); + expect(calls[0].metadata.allowedRoles).to.eql(['user']); + }); + + it('supports multiple read handlers via on()', function() { + eventstore = new EventStore({ + storageDirectory, + streamMetadata: { 'foo': { allowedRoles: ['user'] } } + }); + eventstore.commit('foo', [{ type: 'FooCreated' }]); + const order = []; + eventstore.on('preRead', () => order.push('first')); + eventstore.on('preRead', () => order.push('second')); + + Array.from(eventstore.getEventStream('foo')); + expect(order).to.eql(['first', 'second']); + }); + + it('supports removal of a read handler via off()', function() { + eventstore = new EventStore({ + storageDirectory, + streamMetadata: { 'foo': { allowedRoles: ['user'] } } + }); + eventstore.commit('foo', [{ type: 'FooCreated' }, { type: 'FooUpdated' }]); + let callCount = 0; + const handler = () => callCount++; + eventstore.on('preRead', handler); + Array.from(eventstore.getEventStream('foo')); + expect(callCount).to.be(2); + eventstore.off('preRead', handler); + Array.from(eventstore.getEventStream('foo').reset()); + expect(callCount).to.be(2); + }); + }); }); From 908557736c5ef9c02687d7433747979638104e74 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Sat, 21 Mar 2026 14:20:50 +0000 Subject: [PATCH 11/19] Add tests to cover new EventStore on/once/off/addListener/removeListener and streamMetadata fallback paths Co-authored-by: albe <4259532+albe@users.noreply.github.com> Agent-Logs-Url: https://github.com/albe/node-event-storage/sessions/d1ba14e3-fb46-42f3-88fc-05ecf508f745 --- test/EventStore.spec.js | 81 +++++++++++++++++++++++++++++++++++++++++ 1 file changed, 81 insertions(+) diff --git a/test/EventStore.spec.js b/test/EventStore.spec.js index 50da805..3c0415f 100644 --- a/test/EventStore.spec.js +++ b/test/EventStore.spec.js @@ -1006,6 +1006,18 @@ describe('EventStore', function() { expect(calls[1].streamName).to.be('bar'); }); + it('uses an empty metadata object for streams not in the streamMetadata map', function() { + eventstore = new EventStore({ + storageDirectory, + streamMetadata: { 'known-stream': { allowedRoles: ['admin'] } } + }); + const calls = []; + eventstore.preCommit((event, metadata) => calls.push(metadata)); + // 'unknown-stream' is not in the object — should receive {} (no allowedRoles) + eventstore.commit('unknown-stream', [{ type: 'A' }]); + expect(calls[0].allowedRoles).to.be(undefined); + }); + it('throws when the storage is opened in read-only mode', function(done) { eventstore = new EventStore({ storageDirectory, @@ -1188,6 +1200,18 @@ describe('EventStore', function() { expect(order).to.eql(['first', 'second']); }); + it('supports once() for a one-time preRead handler', function() { + eventstore = new EventStore({ + storageDirectory, + streamMetadata: { 'foo': { allowedRoles: ['user'] } } + }); + eventstore.commit('foo', [{ type: 'FooCreated' }, { type: 'FooUpdated' }]); + let callCount = 0; + eventstore.once('preRead', () => callCount++); + Array.from(eventstore.getEventStream('foo')); + expect(callCount).to.be(1); + }); + it('supports removal of a read handler via off()', function() { eventstore = new EventStore({ storageDirectory, @@ -1204,6 +1228,63 @@ describe('EventStore', function() { expect(callCount).to.be(2); }); + it('supports removal of a read handler via removeListener()', function() { + eventstore = new EventStore({ + storageDirectory, + streamMetadata: { 'foo': { allowedRoles: ['user'] } } + }); + eventstore.commit('foo', [{ type: 'FooCreated' }, { type: 'FooUpdated' }]); + let callCount = 0; + const handler = () => callCount++; + eventstore.on('preRead', handler); + Array.from(eventstore.getEventStream('foo')); + expect(callCount).to.be(2); + eventstore.removeListener('preRead', handler); + Array.from(eventstore.getEventStream('foo').reset()); + expect(callCount).to.be(2); + }); + + it('delegates non-hook once() to EventEmitter', function(done) { + eventstore = new EventStore({ storageDirectory }); + let callCount = 0; + eventstore.once('ready', () => { + callCount++; + expect(callCount).to.be(1); + done(); + }); + }); + + it('delegates non-hook off() to EventEmitter', function() { + eventstore = new EventStore({ storageDirectory }); + let callCount = 0; + const handler = () => callCount++; + eventstore.on('ready', handler); + eventstore.off('ready', handler); + // No assertion needed — just must not throw + }); + + it('delegates addListener() to on()', function() { + eventstore = new EventStore({ + storageDirectory, + streamMetadata: { 'foo': { allowedRoles: ['user'] } } + }); + eventstore.commit('foo', [{ type: 'FooCreated' }]); + const calls = []; + eventstore.addListener('preRead', (position, metadata) => calls.push(metadata)); + Array.from(eventstore.getEventStream('foo')); + expect(calls).to.have.length(1); + expect(calls[0].allowedRoles).to.eql(['user']); + }); + + it('delegates removeListener() to off() for non-hook events', function() { + eventstore = new EventStore({ storageDirectory }); + let callCount = 0; + const handler = () => callCount++; + eventstore.on('ready', handler); + eventstore.removeListener('ready', handler); + // No assertion needed — just must not throw + }); + }); }); From afe42f54632747ef3412043012739d96d0aaddd0 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Sat, 21 Mar 2026 14:48:34 +0000 Subject: [PATCH 12/19] Initial plan From d4ba2fcb80ef6576a0ddb740e8ebc2c7668d7dfb Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Sat, 21 Mar 2026 15:00:28 +0000 Subject: [PATCH 13/19] Add bench-read-scenarios.js benchmark file --- bench/bench-read-scenarios.js | 69 +++++++++++++++++++++++++++++++++++ 1 file changed, 69 insertions(+) create mode 100644 bench/bench-read-scenarios.js diff --git a/bench/bench-read-scenarios.js b/bench/bench-read-scenarios.js new file mode 100644 index 0000000..9bf10b2 --- /dev/null +++ b/bench/bench-read-scenarios.js @@ -0,0 +1,69 @@ +const Benchmark = require('benchmark'); +const benchmarks = require('beautify-benchmark'); +const fs = require('fs-extra'); + +const Suite = new Benchmark.Suite('read-scenarios'); +Suite.on('cycle', (event) => benchmarks.add(event.target)); +Suite.on('complete', () => { benchmarks.log(); process.exit(0); }); +Suite.on('error', (e) => console.log(e.target.error)); + +const Stable = require('event-storage'); +const Latest = require('../index'); + +const EVENTS = 10000; +const STREAM_1 = 'stream-1'; +const STREAM_2 = 'stream-2'; +// Keep each event document close to 512 bytes +const EVENT_DOC = { type: 'SomeEvent', payload: 'x'.repeat(460) }; + +function countAll(iter) { + let n = 0; + for (const _ of iter) n++; // jshint ignore:line + return n; +} + +function populateStore(EventStore, directory) { + return new Promise((resolve, reject) => { + fs.emptyDirSync(directory); + const store = new EventStore('bench', { storageDirectory: directory }); + store.once('ready', () => { + for (let i = 0; i < EVENTS; i++) { + store.commit(i % 2 === 0 ? STREAM_1 : STREAM_2, Object.assign({ seq: i }, EVENT_DOC)); + } + store.close(); + resolve(); + }); + store.once('error', reject); + }); +} + +function openReadOnly(EventStore, directory) { + return new Promise((resolve, reject) => { + const store = new EventStore('bench', { storageDirectory: directory, readOnly: true }); + store.once('ready', () => resolve(store)); + store.once('error', reject); + }); +} + +populateStore(Stable, 'data/stable') + .then(() => populateStore(Latest, 'data/latest')) + .then(() => Promise.all([ + openReadOnly(Stable, 'data/stable'), + openReadOnly(Latest, 'data/latest'), + ])) + .then(([stableStore, latestStore]) => { + const third = Math.ceil(EVENTS / 3); + const twoThirds = Math.floor(2 * EVENTS / 3); + + Suite + .add('1 - forward full scan [stable]', () => countAll(stableStore.getAllEvents())) + .add('1 - forward full scan [latest]', () => countAll(latestStore.getAllEvents())) + .add('2 - backwards full scan [stable]', () => countAll(stableStore.getAllEvents(-1, 1))) + .add('2 - backwards full scan [latest]', () => countAll(latestStore.getAllEvents(-1, 1))) + .add('3 - join stream [stable]', () => countAll(stableStore.fromStreams('join', [STREAM_1, STREAM_2]))) + .add('3 - join stream [latest]', () => countAll(latestStore.fromStreams('join', [STREAM_1, STREAM_2]))) + .add('4 - range scan [stable]', () => countAll(stableStore.getAllEvents(third, twoThirds))) + .add('4 - range scan [latest]', () => countAll(latestStore.getAllEvents(third, twoThirds))) + .run(); + }) + .catch((e) => { console.error(e); process.exit(1); }); From 9cb6502f15a72b13f5e25bd6a1b99952e21c2bd1 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Sat, 21 Mar 2026 15:54:48 +0000 Subject: [PATCH 14/19] Initial plan From e727ea1ce0c313147f45afadd4f744a72ae7680a Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Sat, 21 Mar 2026 16:09:53 +0000 Subject: [PATCH 15/19] Fix 3x scan performance regression: guard WatchesFile.open() watcher recreation and emit calls Co-authored-by: albe <4259532+albe@users.noreply.github.com> Agent-Logs-Url: https://github.com/albe/node-event-storage/sessions/30186b59-6016-4170-a527-bc2a9e7c36bb --- src/Storage/ReadableStorage.js | 4 +++- src/Storage/WritableStorage.js | 4 +++- src/WatchesFile.js | 4 +++- 3 files changed, 9 insertions(+), 3 deletions(-) diff --git a/src/Storage/ReadableStorage.js b/src/Storage/ReadableStorage.js index 59f1558..f2d5b56 100644 --- a/src/Storage/ReadableStorage.js +++ b/src/Storage/ReadableStorage.js @@ -220,7 +220,9 @@ class ReadableStorage extends events.EventEmitter { */ readFrom(partitionId, position, size) { const partition = this.getPartition(partitionId); - this.emit('preRead', position, partition.metadata); + if (this.listenerCount('preRead') > 0) { + this.emit('preRead', position, partition.metadata); + } const data = partition.readFrom(position, size); return this.serializer.deserialize(data); } diff --git a/src/Storage/WritableStorage.js b/src/Storage/WritableStorage.js index e14ffce..596e52c 100644 --- a/src/Storage/WritableStorage.js +++ b/src/Storage/WritableStorage.js @@ -231,7 +231,9 @@ class WritableStorage extends ReadableStorage { const partitionName = this.partitioner(document, this.index.length + 1); const partition = this.getPartition(partitionName); - this.emit('preCommit', document, partition.metadata); + if (this.listenerCount('preCommit') > 0) { + this.emit('preCommit', document, partition.metadata); + } const position = partition.write(data, this.length, callback); assert(position !== false, 'Error writing document.'); diff --git a/src/WatchesFile.js b/src/WatchesFile.js index ca46b71..a9f06fe 100644 --- a/src/WatchesFile.js +++ b/src/WatchesFile.js @@ -35,7 +35,9 @@ const WatchesFile = Base => class extends Base { */ open() { if (super.open()) { - this.watchFile(); + if (!this.watcher) { + this.watchFile(); + } return true; } return false; From 5a7ab4b4f8e988439fe5e906523e31b3f05ad933 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Sat, 21 Mar 2026 09:21:06 +0000 Subject: [PATCH 16/19] Initial plan From 71ea7ee6068190c9a33a7a9f6d6f37c608e2969d Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Fri, 20 Mar 2026 16:48:09 +0000 Subject: [PATCH 17/19] Initial plan From e5783c7286c783dd47608791a70547db02230f3d Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Sat, 21 Mar 2026 14:48:34 +0000 Subject: [PATCH 18/19] Initial plan From a16905c8f2c14f94f497de9c61972b4cfcafed20 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Sat, 21 Mar 2026 15:54:48 +0000 Subject: [PATCH 19/19] Initial plan