diff --git a/README.md b/README.md index 0cee1ac..6b29670 100644 --- a/README.md +++ b/README.md @@ -600,3 +600,101 @@ specify an own unique random secret for this in production. Alternatively you should always explicitly specify your matchers when opening an existing index, since that will check the specified matcher matches the one in the index file. + +### Partition Metadata and Access Control Hooks + +Each stream (partition) can carry an arbitrary metadata object that is written once into the partition's file header +at creation time and cannot be changed afterwards. This makes it a stable anchor for access control policies that +need to be defined when a stream is first created. + +Two handlers can be registered on the `EventStore` (or directly on the `Storage` instance) to intercept reads and +writes, using the standard Node.js `EventEmitter` API (`on`, `once`, `off`) or the convenience wrapper methods: + +- **`on('preCommit', handler)`** — handler called with `(event, partitionMetadata)` *before* the event is written. Throw from the handler to abort the write. +- **`on('preRead', handler)`** — handler called with `(position, partitionMetadata)` *before* the event is read from disk. Throw from the handler to abort the read. + +Multiple handlers can be registered for the same event; they all run on every operation in registration order. +Individual handlers can be removed with `off('preCommit', handler)` / `off('preRead', handler)`, and `once()` is +available for one-time handlers. + +> **Performance note:** Both handlers are invoked synchronously on *every* read and write operation. Keep the handler +> logic as cheap and fast as possible — avoid I/O, async operations, or any non-trivial computation inside a handler, +> as the overhead will be paid on every event accessed. + +#### Using the `EventStore` API + +At the `EventStore` level the unit of work is a *stream*. Use `config.streamMetadata` to attach metadata per stream. +It accepts either: +- A **function** `(streamName) => object` — called once per stream at creation time, so each stream can have its own + metadata (shown in the example below). +- A **plain object** `{ streamName: metadataObject, ... }` — each key is a stream name whose value becomes that + stream's metadata; streams not present in the object receive an empty metadata object `{}`. + +Register handlers using `eventstore.on('preCommit', ...)` / `eventstore.on('preRead', ...)`, +or via the convenience methods `eventstore.preCommit(handler)` / `eventstore.preRead(handler)`. + +```javascript +const EventStore = require('event-storage'); + +// Application-owned context — not part of the library +const globalContext = { authorizedRoles: ['user'] }; + +const eventstore = new EventStore('my-event-store', { + storageDirectory: './data', + // Called once per stream at creation time; the result is persisted in the file header + streamMetadata: (streamName) => ({ + allowedRoles: streamName === 'admin-stream' ? ['admin'] : ['user'] + }) +}); + +eventstore.on('ready', () => { + // Reject writes to streams whose allowedRoles don't overlap with the caller's roles + eventstore.on('preCommit', (event, streamMetadata) => { + if (!streamMetadata.allowedRoles.some(role => globalContext.authorizedRoles.includes(role))) { + throw new Error( + 'Not authorized to write to this stream with roles ' + + JSON.stringify(globalContext.authorizedRoles) + ); + } + }); + + // Reject reads from streams whose allowedRoles don't overlap with the caller's roles + eventstore.on('preRead', (position, streamMetadata) => { + if (!streamMetadata.allowedRoles.some(role => globalContext.authorizedRoles.includes(role))) { + throw new Error( + 'Not authorized to read from this stream with roles ' + + JSON.stringify(globalContext.authorizedRoles) + ); + } + }); + + // This write succeeds — 'user' is in allowedRoles for 'user-stream' + eventstore.commit('user-stream', [{ type: 'UserCreated', id: 1 }], 0); + + // This write throws — 'admin' is NOT in globalContext.authorizedRoles + eventstore.commit('admin-stream', [{ type: 'AdminAction' }], 0); +}); +``` + +#### Using the `Storage` API directly + +If you work with `Storage` directly (bypassing `EventStore`), use `config.metadata` — a function +`(partitionName) => object` called whenever a new partition is created — and register handlers using +`storage.on('preCommit', ...)` / `storage.on('preRead', ...)` or the equivalent convenience methods: + +```javascript +const Storage = require('event-storage').Storage; + +const storage = new Storage('events', { + partitioner: (doc) => doc.stream, + metadata: (partitionName) => ({ + allowedRoles: partitionName === 'admin' ? ['admin'] : ['user'] + }) +}); + +storage.on('preCommit', (document, partitionMetadata) => { /* ... */ }); +storage.on('preRead', (position, partitionMetadata) => { /* ... */ }); +``` + +The `globalContext` object is entirely application-owned. The library only calls the handler with the position (or +event for `preCommit`) and the stored metadata — everything else is up to the application. diff --git a/bench/bench-read-scenarios.js b/bench/bench-read-scenarios.js new file mode 100644 index 0000000..9bf10b2 --- /dev/null +++ b/bench/bench-read-scenarios.js @@ -0,0 +1,69 @@ +const Benchmark = require('benchmark'); +const benchmarks = require('beautify-benchmark'); +const fs = require('fs-extra'); + +const Suite = new Benchmark.Suite('read-scenarios'); +Suite.on('cycle', (event) => benchmarks.add(event.target)); +Suite.on('complete', () => { benchmarks.log(); process.exit(0); }); +Suite.on('error', (e) => console.log(e.target.error)); + +const Stable = require('event-storage'); +const Latest = require('../index'); + +const EVENTS = 10000; +const STREAM_1 = 'stream-1'; +const STREAM_2 = 'stream-2'; +// Keep each event document close to 512 bytes +const EVENT_DOC = { type: 'SomeEvent', payload: 'x'.repeat(460) }; + +function countAll(iter) { + let n = 0; + for (const _ of iter) n++; // jshint ignore:line + return n; +} + +function populateStore(EventStore, directory) { + return new Promise((resolve, reject) => { + fs.emptyDirSync(directory); + const store = new EventStore('bench', { storageDirectory: directory }); + store.once('ready', () => { + for (let i = 0; i < EVENTS; i++) { + store.commit(i % 2 === 0 ? STREAM_1 : STREAM_2, Object.assign({ seq: i }, EVENT_DOC)); + } + store.close(); + resolve(); + }); + store.once('error', reject); + }); +} + +function openReadOnly(EventStore, directory) { + return new Promise((resolve, reject) => { + const store = new EventStore('bench', { storageDirectory: directory, readOnly: true }); + store.once('ready', () => resolve(store)); + store.once('error', reject); + }); +} + +populateStore(Stable, 'data/stable') + .then(() => populateStore(Latest, 'data/latest')) + .then(() => Promise.all([ + openReadOnly(Stable, 'data/stable'), + openReadOnly(Latest, 'data/latest'), + ])) + .then(([stableStore, latestStore]) => { + const third = Math.ceil(EVENTS / 3); + const twoThirds = Math.floor(2 * EVENTS / 3); + + Suite + .add('1 - forward full scan [stable]', () => countAll(stableStore.getAllEvents())) + .add('1 - forward full scan [latest]', () => countAll(latestStore.getAllEvents())) + .add('2 - backwards full scan [stable]', () => countAll(stableStore.getAllEvents(-1, 1))) + .add('2 - backwards full scan [latest]', () => countAll(latestStore.getAllEvents(-1, 1))) + .add('3 - join stream [stable]', () => countAll(stableStore.fromStreams('join', [STREAM_1, STREAM_2]))) + .add('3 - join stream [latest]', () => countAll(latestStore.fromStreams('join', [STREAM_1, STREAM_2]))) + .add('4 - range scan [stable]', () => countAll(stableStore.getAllEvents(third, twoThirds))) + .add('4 - range scan [latest]', () => countAll(latestStore.getAllEvents(third, twoThirds))) + .run(); + }) + .catch((e) => { console.error(e); process.exit(1); }); diff --git a/bench/package.json b/bench/package.json index a75fcd0..9a61e5a 100644 --- a/bench/package.json +++ b/bench/package.json @@ -2,7 +2,7 @@ "name": "event-storage-bench", "version": "0.0.1", "scripts": { - "bench": "node bench-index.js && node bench-storage.js && node bench-eventstore.js" + "bench": "node bench-index.js && node bench-storage.js && node bench-eventstore.js && node bench-read-scenarios.js" }, "dependencies": { "beautify-benchmark": "^0.2.4", diff --git a/src/EventStore.js b/src/EventStore.js index 8fe66ac..2a50595 100644 --- a/src/EventStore.js +++ b/src/EventStore.js @@ -28,6 +28,10 @@ class EventStore extends events.EventEmitter { * @param {string} [config.streamsDirectory] The directory where the streams should be stored. Default '{storageDirectory}/streams'. * @param {object} [config.storageConfig] Additional config options given to the storage backend. See `Storage`. * @param {boolean} [config.readOnly] If the storage should be mounted in read-only mode. + * @param {object|function(string): object} [config.streamMetadata] A metadata object or a function `(streamName) => object` + * that is called whenever a new stream partition is created. The returned object is stored once in the partition + * file header and surfaced to `preCommit` / `preRead` hooks. Takes precedence only when + * `config.storageConfig.metadata` is not also set. */ constructor(storeName = 'eventstore', config = {}) { super(); @@ -44,6 +48,17 @@ class EventStore extends events.EventEmitter { readOnly: config.readOnly || false }; const storageConfig = Object.assign(defaults, config.storageConfig); + + // Translate the high-level streamMetadata option into the storage-level metadata function, + // but only when the caller has not already provided a lower-level storageConfig.metadata. + if (config.streamMetadata !== undefined && storageConfig.metadata === undefined) { + if (typeof config.streamMetadata === 'function') { + storageConfig.metadata = config.streamMetadata; + } else { + storageConfig.metadata = (streamName) => config.streamMetadata[streamName] || {}; + } + } + this.initialize(storeName, storageConfig); } @@ -145,6 +160,103 @@ class EventStore extends events.EventEmitter { this.storage.close(); } + /** + * Override EventEmitter.on() to delegate 'preCommit' and 'preRead' event registrations + * to the underlying storage, so that `eventstore.on('preCommit', handler)` works naturally. + * All other events are handled by the default EventEmitter. + * + * @param {string} event + * @param {function} listener + * @returns {this} + */ + on(event, listener) { + if (event === 'preCommit' || event === 'preRead') { + if (event === 'preCommit') { + assert(!(this.storage instanceof Storage.ReadOnly), 'The storage was opened in read-only mode. Can not register a preCommit handler on it.'); + } + this.storage.on(event, listener); + return this; + } + return super.on(event, listener); + } + + /** + * @inheritDoc + */ + addListener(event, listener) { + return this.on(event, listener); + } + + /** + * Override EventEmitter.once() to delegate 'preCommit' and 'preRead' to the underlying storage. + * + * @param {string} event + * @param {function} listener + * @returns {this} + */ + once(event, listener) { + if (event === 'preCommit' || event === 'preRead') { + if (event === 'preCommit') { + assert(!(this.storage instanceof Storage.ReadOnly), 'The storage was opened in read-only mode. Can not register a preCommit handler on it.'); + } + this.storage.once(event, listener); + return this; + } + return super.once(event, listener); + } + + /** + * Override EventEmitter.off() / removeListener() to delegate 'preCommit' and 'preRead' + * to the underlying storage. + * + * @param {string} event + * @param {function} listener + * @returns {this} + */ + off(event, listener) { + if (event === 'preCommit' || event === 'preRead') { + this.storage.off(event, listener); + return this; + } + return super.off(event, listener); + } + + /** + * @inheritDoc + */ + removeListener(event, listener) { + return this.off(event, listener); + } + + /** + * Convenience method to register a handler called before an event is committed to storage. + * Equivalent to `eventstore.on('preCommit', hook)`. + * The handler receives `(event, partitionMetadata)` and may throw to abort the write. + * Multiple handlers can be registered; all run on every write in registration order. + * The handler is invoked on every write, so its logic should be cheap, fast, and synchronous. + * + * @api + * @param {function(object, object): void} hook A function receiving (event, partitionMetadata). + * @throws {Error} If the storage was opened in read-only mode. + */ + preCommit(hook) { + this.on('preCommit', hook); + } + + /** + * Convenience method to register a handler called before an event is read from storage. + * Equivalent to `eventstore.on('preRead', hook)`. + * The handler receives `(position, partitionMetadata)` and may throw to abort the read. + * Multiple handlers can be registered; all run on every read in registration order. + * The handler is invoked on every read, so its logic should be cheap, fast, and synchronous. + * + * @api + * @param {function(number, object): void} hook A function receiving (position, partitionMetadata). + */ + preRead(hook) { + this.on('preRead', hook); + } + /** * Get the number of events stored. * diff --git a/src/Index/ReadOnlyIndex.js b/src/Index/ReadOnlyIndex.js index fa1db0d..0b01019 100644 --- a/src/Index/ReadOnlyIndex.js +++ b/src/Index/ReadOnlyIndex.js @@ -24,9 +24,16 @@ class ReadOnlyIndex extends watchesFile(ReadableIndex) { if (!this.fd) { return; } - const prevLength = this.data.length; + const prevLength = this._length; const newLength = this.readFileLength(); - this.data.length = newLength; + if (newLength < prevLength) { + // Clear ring buffer slots for the removed entries to avoid stale reads + const oldCacheStart = Math.max(0, prevLength - this.cacheSize); + for (let i = Math.max(oldCacheStart, newLength); i < prevLength; i++) { + this.cache[i % this.cacheSize] = null; + } + } + this._length = newLength; if (newLength > prevLength) { this.emit('append', prevLength, newLength); } diff --git a/src/Index/ReadableIndex.js b/src/Index/ReadableIndex.js index c998d4d..e1e6467 100644 --- a/src/Index/ReadableIndex.js +++ b/src/Index/ReadableIndex.js @@ -38,6 +38,7 @@ class ReadableIndex extends events.EventEmitter { * @param {object} [options] An object with additional index options. * @param {typeof EntryInterface} [options.EntryClass] The entry class to use for index items. Must implement the EntryInterface methods. * @param {string} [options.dataDirectory] The directory to store the index file in. Default '.'. + * @param {number} [options.cacheSize] The number of most-recent index entries to keep in memory. Default 1024. * @param {number} [options.writeBufferSize] The number of bytes to use for the write buffer. Default 4096. * @param {number} [options.flushDelay] How many ms to delay the write buffer flush to optimize throughput. Default 100. * @param {object} [options.metadata] An object containing the metadata information for this index. Will be written on initial creation and checked on subsequent openings. @@ -65,8 +66,10 @@ class ReadableIndex extends events.EventEmitter { * @param {object} options */ initialize(options) { - /* @type Array */ - this.data = []; + this._length = 0; + this.cacheSize = options.cacheSize !== undefined ? Math.max(1, options.cacheSize >>> 0) : 1024; // jshint ignore:line + /* @type Array Ring buffer holding at most cacheSize entries */ + this.cache = new Array(this.cacheSize); this.fd = null; this.fileMode = 'r'; this.EntryClass = options.EntryClass; @@ -100,7 +103,7 @@ class ReadableIndex extends events.EventEmitter { * @returns {number} */ get length() { - return this.data.length; + return this._length; } /** @@ -155,7 +158,7 @@ class ReadableIndex extends events.EventEmitter { const length = this.readFileLength(); if (length > 0) { - this.data = new Array(length); + this._length = length; // Read last item to get the index started this.read(length); } @@ -232,8 +235,9 @@ class ReadableIndex extends events.EventEmitter { * @api */ close() { - this.data = []; + this._length = 0; this.readUntil = -1; + this.cache.fill(null); this.readBuffer.fill(0); if (this.fd) { fs.closeSync(this.fd); @@ -250,21 +254,27 @@ class ReadableIndex extends events.EventEmitter { * @returns {Entry} The index entry at the given position. */ read(index) { - index = Number(index) - 1; + const i = Number(index) - 1; // 0-based - fs.readSync(this.fd, this.readBuffer, 0, this.EntryClass.size, this.headerSize + index * this.EntryClass.size); - if (index === this.readUntil + 1) { + fs.readSync(this.fd, this.readBuffer, 0, this.EntryClass.size, this.headerSize + i * this.EntryClass.size); + if (i === this.readUntil + 1) { this.readUntil++; } - this.data[index] = this.EntryClass.fromBuffer(this.readBuffer); - - return this.data[index]; + const entry = this.EntryClass.fromBuffer(this.readBuffer); + // Store in ring buffer only if within the current cache window + if (i >= this._length - this.cacheSize) { + this.cache[i % this.cacheSize] = entry; + } + return entry; } /** * Read a range of entries from disk. This method will not do any range checks. * It will however optimize to prevent reading entries that have already been read sequentially from start. * + * Entries within the ring buffer cache window are stored in the cache; entries outside the window + * (older than cacheSize) are read from disk for the return value but not cached. + * * @private * @param {number} from The 1-based index position from where to read from (inclusive). * @param {number} until The 1-based index position until which to read to (inclusive). @@ -275,25 +285,61 @@ class ReadableIndex extends events.EventEmitter { return [this.read(from)]; } - from--; - until--; - - const readFrom = Math.max(this.readUntil + 1, from); - const amount = (until - readFrom + 1); - - const readBuffer = Buffer.allocUnsafe(amount * this.EntryClass.size); - let readSize = fs.readSync(this.fd, readBuffer, 0, readBuffer.byteLength, this.headerSize + readFrom * this.EntryClass.size); - let index = 0; - while (index < amount && readSize > 0) { - this.data[index + readFrom] = this.EntryClass.fromBuffer(readBuffer, index * this.EntryClass.size); - readSize -= this.EntryClass.size; - index++; + const f = from - 1; // 0-based + const u = until - 1; // 0-based + const cacheStart = Math.max(0, this._length - this.cacheSize); + + // Build the result array up front + const result = new Array(u - f + 1); + + // Part 1: Out-of-window entries [f, min(cacheStart-1, u)] — read from disk, do not cache + const outEnd = Math.min(cacheStart - 1, u); + if (f < cacheStart && outEnd >= f) { + const count = outEnd - f + 1; + const outBuf = Buffer.allocUnsafe(count * this.EntryClass.size); + const bytesRead = fs.readSync(this.fd, outBuf, 0, outBuf.byteLength, this.headerSize + f * this.EntryClass.size); + const entries = Math.floor(bytesRead / this.EntryClass.size); + for (let idx = 0; idx < entries; idx++) { + result[idx] = this.EntryClass.fromBuffer(outBuf, idx * this.EntryClass.size); + } } - if (from <= this.readUntil + 1) { - this.readUntil = Math.max(this.readUntil, until); + + // Part 2: In-window entries [max(cacheStart, f), u] — use cache + disk for uncached ones + // All indices accessed below satisfy i >= inStart >= cacheStart, so each slot i % cacheSize + // is exclusive to index i within the window and cannot hold a stale entry. + const inStart = Math.max(cacheStart, f); + if (inStart <= u) { + // Optimisation: skip entries already loaded sequentially into the cache + const readFrom = Math.max(this.readUntil + 1, inStart); + + // Trim trailing entries already present in the cache. + // readUntil >= readFrom >= cacheStart throughout, so all slots checked are in-window. + let readUntil = u; + while (readUntil >= readFrom && this.cache[readUntil % this.cacheSize]) { + readUntil--; + } + + if (readFrom <= readUntil) { + const count = readUntil - readFrom + 1; + const inBuf = Buffer.allocUnsafe(count * this.EntryClass.size); + const bytesRead = fs.readSync(this.fd, inBuf, 0, inBuf.byteLength, this.headerSize + readFrom * this.EntryClass.size); + const entries = Math.floor(bytesRead / this.EntryClass.size); + for (let idx = 0; idx < entries; idx++) { + const i = readFrom + idx; + this.cache[i % this.cacheSize] = this.EntryClass.fromBuffer(inBuf, idx * this.EntryClass.size); + } + if (inStart <= this.readUntil + 1) { + this.readUntil = Math.max(this.readUntil, readUntil); + } + } + + // Fill the result from the ring buffer for the in-window portion + for (let i = inStart; i <= u; i++) { + result[i - f] = this.cache[i % this.cacheSize]; + } } - return this.data.slice(from, until + 1); + return result; } /** @@ -318,13 +364,18 @@ class ReadableIndex extends events.EventEmitter { * @returns {Entry|boolean} The entry at the given index position or false if out of bounds. */ get(index) { - index = wrapAndCheck(index, this.length); + index = wrapAndCheck(index, this._length); if (index <= 0) { return false; } - if (this.data[index - 1]) { - return this.data[index - 1]; + const i = index - 1; // 0-based + // The ring buffer window is [_length - cacheSize, _length - 1]. + // Within this window every index maps to a unique slot (no two indices share a slot), + // so a non-null slot is guaranteed to belong to index i and cannot be stale. + if (i >= this._length - this.cacheSize) { + const cached = this.cache[i % this.cacheSize]; + if (cached) return cached; } return this.read(index); @@ -354,24 +405,44 @@ class ReadableIndex extends events.EventEmitter { * @returns {Array|boolean} An array of entries for the given range or false on error. */ range(from, until = -1) { - from = wrapAndCheck(from, this.length); - until = wrapAndCheck(until, this.length); + from = wrapAndCheck(from, this._length); + until = wrapAndCheck(until, this._length); if (from <= 0 || until < from) { return false; } - const readFrom = Math.max(this.readUntil + 1, from); - let readUntil = until; - while (readUntil >= readFrom && this.data[readUntil - 1]) { - readUntil--; + const f = from - 1; // 0-based + const u = until - 1; // 0-based + const cacheStart = Math.max(0, this._length - this.cacheSize); + + // Determine if any disk reads are required + const hasOutOfWindow = f < cacheStart; + const inStart = Math.max(cacheStart, f); + // Entries in [inStart, readUntil] are assumed cached (sequential read guarantee). + // All indices in [readFrom, u] satisfy >= inStart >= cacheStart — unique, non-stale slots. + const readFrom = Math.max(this.readUntil + 1, inStart); + let needsDiskRead = hasOutOfWindow; + if (!needsDiskRead && inStart <= u) { + // Scan backwards for uncached in-window tail entries (all >= cacheStart, no stale slots). + let scanUntil = u; + while (scanUntil >= readFrom && this.cache[scanUntil % this.cacheSize]) { + scanUntil--; + } + needsDiskRead = readFrom <= scanUntil; } - if (readFrom <= readUntil) { - this.readRange(readFrom, readUntil); + if (needsDiskRead) { + return this.readRange(from, until); } - return this.data.slice(from - 1, until); + // All required entries are already in the ring buffer — build result directly. + // f >= cacheStart here (hasOutOfWindow is false), so all slots are in-window and valid. + const result = new Array(u - f + 1); + for (let i = f; i <= u; i++) { + result[i - f] = this.cache[i % this.cacheSize]; + } + return result; } /** diff --git a/src/Index/WritableIndex.js b/src/Index/WritableIndex.js index 6ba1edc..df477cb 100644 --- a/src/Index/WritableIndex.js +++ b/src/Index/WritableIndex.js @@ -20,6 +20,7 @@ class WritableIndex extends ReadableIndex { * @param {object} [options] An object with additional index options. * @param {EntryInterface} [options.EntryClass] The entry class to use for index items. Must implement the EntryInterface methods. * @param {string} [options.dataDirectory] The directory to store the index file in. Default '.'. + * @param {number} [options.cacheSize] The number of most-recent index entries to keep in memory. Default 1024. * @param {number} [options.writeBufferSize] The number of bytes to use for the write buffer. Default 4096. * @param {number} [options.flushDelay] How many ms to delay the write buffer flush to optimize throughput. Default 100. * @param {object} [options.metadata] An object containing the metadata information for this index. Will be written on initial creation and checked on subsequent openings. @@ -187,22 +188,23 @@ class WritableIndex extends ReadableIndex { assertEqual(entry.constructor.name, this.EntryClass.name, `Wrong entry object.`); assertEqual(entry.constructor.size, this.EntryClass.size, `Invalid entry size.`); - if (this.readUntil === this.data.length - 1) { + if (this.readUntil === this._length - 1) { this.readUntil++; } - this.data[this.data.length] = entry; + this.cache[this._length % this.cacheSize] = entry; + this._length++; if (this.writeBufferCursor === 0) { this.flushTimeout = setTimeout(() => this.flush(), this.flushDelay); } this.writeBufferCursor += entry.toBuffer(this.writeBuffer, this.writeBufferCursor); - this.onFlush(callback, this.length); + this.onFlush(callback, this._length); if (this.writeBufferCursor >= this.writeBuffer.byteLength) { this.flush(); } - return this.length; + return this._length; } /** @@ -225,7 +227,13 @@ class WritableIndex extends ReadableIndex { return; } fs.truncateSync(this.fileName, truncatePosition); - this.data.splice(after); + + // Clear ring buffer slots for the removed entries to avoid stale reads + const oldCacheStart = Math.max(0, this._length - this.cacheSize); + for (let i = Math.max(oldCacheStart, after); i < this._length; i++) { + this.cache[i % this.cacheSize] = null; + } + this._length = after; this.readUntil = Math.min(this.readUntil, after); } } diff --git a/src/Storage/ReadableStorage.js b/src/Storage/ReadableStorage.js index 7cc3ef7..f2d5b56 100644 --- a/src/Storage/ReadableStorage.js +++ b/src/Storage/ReadableStorage.js @@ -197,6 +197,19 @@ class ReadableStorage extends events.EventEmitter { return this.partitions[partitionIdentifier]; } + /** + * Register a handler that is called before a document is read from a partition. + * The handler receives the position and the partition metadata and may throw to abort the read. + * Multiple handlers can be registered; all run on every read in registration order. + * Equivalent to `storage.on('preRead', hook)`. + * + * @api + * @param {function(number, object): void} hook A function receiving (position, partitionMetadata). + */ + preRead(hook) { + this.on('preRead', hook); + } + /** * @protected * @param {number} partitionId The partition to read from. @@ -207,6 +220,9 @@ class ReadableStorage extends events.EventEmitter { */ readFrom(partitionId, position, size) { const partition = this.getPartition(partitionId); + if (this.listenerCount('preRead') > 0) { + this.emit('preRead', position, partition.metadata); + } const data = partition.readFrom(position, size); return this.serializer.deserialize(data); } @@ -255,7 +271,7 @@ class ReadableStorage extends events.EventEmitter { if (readFrom > readUntil) { const batchSize = 10; let batchUntil = readFrom; - while (batchUntil > readUntil) { + while (batchUntil >= readUntil) { const batchFrom = Math.max(readUntil, batchUntil - batchSize); yield* reverse(this.iterateRange(batchFrom, batchUntil, index)); batchUntil = batchFrom - 1; diff --git a/src/Storage/WritableStorage.js b/src/Storage/WritableStorage.js index 34572ab..596e52c 100644 --- a/src/Storage/WritableStorage.js +++ b/src/Storage/WritableStorage.js @@ -178,6 +178,19 @@ class WritableStorage extends ReadableStorage { return entry; } + /** + * Register a handler that is called before a document is written to storage. + * The handler receives the document and the partition metadata and may throw to abort the write. + * Multiple handlers can be registered; all run on every write in registration order. + * Equivalent to `storage.on('preCommit', hook)`. + * + * @api + * @param {function(object, object): void} hook A function receiving (document, partitionMetadata). + */ + preCommit(hook) { + this.on('preCommit', hook); + } + /** * Get a partition either by name or by id. * If a partition with the given name does not exist, a new one will be created. @@ -190,10 +203,14 @@ class WritableStorage extends ReadableStorage { */ getPartition(partitionIdentifier) { if (typeof partitionIdentifier === 'string') { + const partitionShortName = partitionIdentifier; const partitionName = this.storageFile + (partitionIdentifier.length ? '.' + partitionIdentifier : ''); partitionIdentifier = WritablePartition.idFor(partitionName); if (!this.partitions[partitionIdentifier]) { - this.partitions[partitionIdentifier] = this.createPartition(partitionName, this.partitionConfig); + const partitionConfig = typeof this.partitionConfig.metadata === 'function' + ? { ...this.partitionConfig, metadata: this.partitionConfig.metadata(partitionShortName) } + : this.partitionConfig; + this.partitions[partitionIdentifier] = this.createPartition(partitionName, partitionConfig); this.emit('partition-created', partitionIdentifier); } this.partitions[partitionIdentifier].open(); @@ -214,6 +231,9 @@ class WritableStorage extends ReadableStorage { const partitionName = this.partitioner(document, this.index.length + 1); const partition = this.getPartition(partitionName); + if (this.listenerCount('preCommit') > 0) { + this.emit('preCommit', document, partition.metadata); + } const position = partition.write(data, this.length, callback); assert(position !== false, 'Error writing document.'); diff --git a/src/WatchesFile.js b/src/WatchesFile.js index ca46b71..a9f06fe 100644 --- a/src/WatchesFile.js +++ b/src/WatchesFile.js @@ -35,7 +35,9 @@ const WatchesFile = Base => class extends Base { */ open() { if (super.open()) { - this.watchFile(); + if (!this.watcher) { + this.watchFile(); + } return true; } return false; diff --git a/test/EventStore.spec.js b/test/EventStore.spec.js index eea0e08..3c0415f 100644 --- a/test/EventStore.spec.js +++ b/test/EventStore.spec.js @@ -963,4 +963,328 @@ describe('EventStore', function() { }); + + describe('preCommit', function() { + + it('calls the hook before writing with the event and partition metadata', function() { + eventstore = new EventStore({ + storageDirectory, + streamMetadata: { 'foo': { allowedRoles: ['admin'] } } + }); + const calls = []; + eventstore.preCommit((event, metadata) => calls.push({ event, metadata })); + eventstore.commit('foo', [{ type: 'FooCreated' }]); + expect(calls).to.have.length(1); + expect(calls[0].metadata.allowedRoles).to.eql(['admin']); + expect(calls[0].event.payload.type).to.be('FooCreated'); + }); + + it('aborts the write when the hook throws', function() { + eventstore = new EventStore({ + storageDirectory, + streamMetadata: { 'foo': { allowedRoles: ['admin'] } } + }); + eventstore.preCommit((event, metadata) => { + if (!metadata.allowedRoles.includes('user')) { + throw new Error('Not authorized'); + } + }); + expect(() => eventstore.commit('foo', [{ type: 'FooCreated' }])).to.throwError(/Not authorized/); + expect(eventstore.length).to.be(0); + }); + + it('supports per-stream metadata via a function', function() { + eventstore = new EventStore({ + storageDirectory, + streamMetadata: (streamName) => ({ streamName }) + }); + const calls = []; + eventstore.preCommit((event, metadata) => calls.push(metadata)); + eventstore.commit('foo', [{ type: 'A' }]); + eventstore.commit('bar', [{ type: 'B' }]); + expect(calls[0].streamName).to.be('foo'); + expect(calls[1].streamName).to.be('bar'); + }); + + it('uses an empty metadata object for streams not in the streamMetadata map', function() { + eventstore = new EventStore({ + storageDirectory, + streamMetadata: { 'known-stream': { allowedRoles: ['admin'] } } + }); + const calls = []; + eventstore.preCommit((event, metadata) => calls.push(metadata)); + // 'unknown-stream' is not in the object — should receive {} (no allowedRoles) + eventstore.commit('unknown-stream', [{ type: 'A' }]); + expect(calls[0].allowedRoles).to.be(undefined); + }); + + it('throws when the storage is opened in read-only mode', function(done) { + eventstore = new EventStore({ + storageDirectory, + streamMetadata: { 'foo': { allowedRoles: ['admin'] } } + }); + eventstore.commit('foo', [{ type: 'FooCreated' }]); + eventstore.close(); + + const readOnly = new EventStore({ + storageDirectory, + storageConfig: { readOnly: true } + }); + readOnly.on('ready', () => { + expect(() => readOnly.preCommit(() => {})).to.throwError(); + readOnly.close(); + eventstore = null; + done(); + }); + }); + + it('supports eventstore.on("preCommit", handler) style', function() { + eventstore = new EventStore({ + storageDirectory, + streamMetadata: { 'foo': { allowedRoles: ['admin'] } } + }); + const calls = []; + eventstore.on('preCommit', (event, metadata) => calls.push({ event, metadata })); + eventstore.commit('foo', [{ type: 'FooCreated' }]); + expect(calls).to.have.length(1); + expect(calls[0].metadata.allowedRoles).to.eql(['admin']); + }); + + it('supports multiple handlers registered via on()', function() { + eventstore = new EventStore({ + storageDirectory, + streamMetadata: { 'foo': { allowedRoles: ['admin'] } } + }); + const order = []; + eventstore.on('preCommit', () => order.push('first')); + eventstore.on('preCommit', () => order.push('second')); + eventstore.commit('foo', [{ type: 'FooCreated' }]); + expect(order).to.eql(['first', 'second']); + }); + + it('supports removal of a handler via off()', function() { + eventstore = new EventStore({ + storageDirectory, + streamMetadata: { 'foo': { allowedRoles: ['admin'] } } + }); + let callCount = 0; + const handler = () => callCount++; + eventstore.on('preCommit', handler); + eventstore.commit('foo', [{ type: 'FooCreated' }]); + expect(callCount).to.be(1); + eventstore.off('preCommit', handler); + eventstore.commit('foo', [{ type: 'FooUpdated' }]); + expect(callCount).to.be(1); + }); + + it('supports once() for a one-time handler', function() { + eventstore = new EventStore({ + storageDirectory, + streamMetadata: { 'foo': { allowedRoles: ['admin'] } } + }); + let callCount = 0; + eventstore.once('preCommit', () => callCount++); + eventstore.commit('foo', [{ type: 'FooCreated' }]); + eventstore.commit('foo', [{ type: 'FooUpdated' }]); + expect(callCount).to.be(1); + }); + + it('throws when registering preCommit via on() on a read-only store', function(done) { + eventstore = new EventStore({ + storageDirectory, + streamMetadata: { 'foo': { allowedRoles: ['admin'] } } + }); + eventstore.commit('foo', [{ type: 'FooCreated' }]); + eventstore.close(); + + const readOnly = new EventStore({ + storageDirectory, + storageConfig: { readOnly: true } + }); + readOnly.on('ready', () => { + expect(() => readOnly.on('preCommit', () => {})).to.throwError(); + readOnly.close(); + eventstore = null; + done(); + }); + }); + + }); + + describe('preRead', function() { + + it('calls the hook before reading with the position and partition metadata', function() { + eventstore = new EventStore({ + storageDirectory, + streamMetadata: { 'foo': { allowedRoles: ['user'] } } + }); + eventstore.commit('foo', [{ type: 'FooCreated' }]); + const calls = []; + eventstore.preRead((position, metadata) => calls.push({ position, metadata })); + + const stream = eventstore.getEventStream('foo'); + const events = Array.from(stream); + expect(calls).to.have.length(1); + expect(calls[0].metadata.allowedRoles).to.eql(['user']); + expect(typeof calls[0].position).to.be('number'); + }); + + it('aborts the read when the hook throws', function() { + eventstore = new EventStore({ + storageDirectory, + streamMetadata: { 'foo': { allowedRoles: ['admin'] } } + }); + eventstore.commit('foo', [{ type: 'FooCreated' }]); + eventstore.preRead((position, metadata) => { + if (!metadata.allowedRoles.includes('user')) { + throw new Error('Not authorized to read'); + } + }); + + // EventStream.next() catches iterator errors, so iteration stops with no events + const stream = eventstore.getEventStream('foo'); + const result = Array.from(stream); + expect(result).to.have.length(0); + }); + + it('works on a read-only store', function(done) { + eventstore = new EventStore({ + storageDirectory, + streamMetadata: { 'foo': { allowedRoles: ['user'] } } + }); + eventstore.commit('foo', [{ type: 'FooCreated' }]); + eventstore.close(); + + const readOnly = new EventStore({ + storageDirectory, + storageConfig: { readOnly: true } + }); + readOnly.on('ready', () => { + const calls = []; + expect(() => readOnly.preRead((pos, meta) => calls.push(meta))).to.not.throwError(); + const stream = readOnly.getEventStream('foo'); + Array.from(stream); + expect(calls).to.have.length(1); + readOnly.close(); + eventstore = null; + done(); + }); + }); + + it('supports eventstore.on("preRead", handler) style', function() { + eventstore = new EventStore({ + storageDirectory, + streamMetadata: { 'foo': { allowedRoles: ['user'] } } + }); + eventstore.commit('foo', [{ type: 'FooCreated' }]); + const calls = []; + eventstore.on('preRead', (position, metadata) => calls.push({ position, metadata })); + + const stream = eventstore.getEventStream('foo'); + Array.from(stream); + expect(calls).to.have.length(1); + expect(calls[0].metadata.allowedRoles).to.eql(['user']); + }); + + it('supports multiple read handlers via on()', function() { + eventstore = new EventStore({ + storageDirectory, + streamMetadata: { 'foo': { allowedRoles: ['user'] } } + }); + eventstore.commit('foo', [{ type: 'FooCreated' }]); + const order = []; + eventstore.on('preRead', () => order.push('first')); + eventstore.on('preRead', () => order.push('second')); + + Array.from(eventstore.getEventStream('foo')); + expect(order).to.eql(['first', 'second']); + }); + + it('supports once() for a one-time preRead handler', function() { + eventstore = new EventStore({ + storageDirectory, + streamMetadata: { 'foo': { allowedRoles: ['user'] } } + }); + eventstore.commit('foo', [{ type: 'FooCreated' }, { type: 'FooUpdated' }]); + let callCount = 0; + eventstore.once('preRead', () => callCount++); + Array.from(eventstore.getEventStream('foo')); + expect(callCount).to.be(1); + }); + + it('supports removal of a read handler via off()', function() { + eventstore = new EventStore({ + storageDirectory, + streamMetadata: { 'foo': { allowedRoles: ['user'] } } + }); + eventstore.commit('foo', [{ type: 'FooCreated' }, { type: 'FooUpdated' }]); + let callCount = 0; + const handler = () => callCount++; + eventstore.on('preRead', handler); + Array.from(eventstore.getEventStream('foo')); + expect(callCount).to.be(2); + eventstore.off('preRead', handler); + Array.from(eventstore.getEventStream('foo').reset()); + expect(callCount).to.be(2); + }); + + it('supports removal of a read handler via removeListener()', function() { + eventstore = new EventStore({ + storageDirectory, + streamMetadata: { 'foo': { allowedRoles: ['user'] } } + }); + eventstore.commit('foo', [{ type: 'FooCreated' }, { type: 'FooUpdated' }]); + let callCount = 0; + const handler = () => callCount++; + eventstore.on('preRead', handler); + Array.from(eventstore.getEventStream('foo')); + expect(callCount).to.be(2); + eventstore.removeListener('preRead', handler); + Array.from(eventstore.getEventStream('foo').reset()); + expect(callCount).to.be(2); + }); + + it('delegates non-hook once() to EventEmitter', function(done) { + eventstore = new EventStore({ storageDirectory }); + let callCount = 0; + eventstore.once('ready', () => { + callCount++; + expect(callCount).to.be(1); + done(); + }); + }); + + it('delegates non-hook off() to EventEmitter', function() { + eventstore = new EventStore({ storageDirectory }); + let callCount = 0; + const handler = () => callCount++; + eventstore.on('ready', handler); + eventstore.off('ready', handler); + // No assertion needed — just must not throw + }); + + it('delegates addListener() to on()', function() { + eventstore = new EventStore({ + storageDirectory, + streamMetadata: { 'foo': { allowedRoles: ['user'] } } + }); + eventstore.commit('foo', [{ type: 'FooCreated' }]); + const calls = []; + eventstore.addListener('preRead', (position, metadata) => calls.push(metadata)); + Array.from(eventstore.getEventStream('foo')); + expect(calls).to.have.length(1); + expect(calls[0].allowedRoles).to.eql(['user']); + }); + + it('delegates removeListener() to off() for non-hook events', function() { + eventstore = new EventStore({ storageDirectory }); + let callCount = 0; + const handler = () => callCount++; + eventstore.on('ready', handler); + eventstore.removeListener('ready', handler); + // No assertion needed — just must not throw + }); + + }); + }); diff --git a/test/Index.spec.js b/test/Index.spec.js index 7652282..a3de214 100644 --- a/test/Index.spec.js +++ b/test/Index.spec.js @@ -488,6 +488,71 @@ describe('Index', function() { }); + describe('cacheSize / ring buffer', function() { + + it('limits in-memory entries to cacheSize (old entries still readable via disk)', function() { + const cacheSize = 5; + index = setupIndexWithEntries(20, { cacheSize }); + index.close(); + index.open(); + // All entries must be readable even though only the last `cacheSize` are ever in memory. + for (let i = 1; i <= 20; i++) { + const entry = index.get(i); + expect(entry).not.to.be(false); + expect(entry.number).to.be(i); + } + }); + + it('can still read entries outside the cache window from disk', function() { + const cacheSize = 5; + index = setupIndexWithEntries(15, { cacheSize }); + // Entry 1 is outside the cache window (window = [11, 15]) + const entry = index.get(1); + expect(entry).not.to.be(false); + expect(entry.number).to.be(1); + }); + + it('can read a range that spans both cached and uncached entries', function() { + const cacheSize = 5; + index = setupIndexWithEntries(15, { cacheSize }); + const entries = index.range(8, 13); // crosses the boundary + expect(entries.length).to.be(6); + for (let i = 0; i < entries.length; i++) { + expect(entries[i].number).to.be(8 + i); + } + }); + + it('truncation followed by re-adding entries returns the new values, not stale ones', function() { + const cacheSize = 10; + index = setupIndexWithEntries(15, { cacheSize }); + index.truncate(7); // keep entries 1-7 + // Add new entries at positions 8-10 with different numbers + for (let i = 8; i <= 10; i++) { + index.add(new Index.Entry(i + 100, i)); + } + index.flush(); + // New entries must return the just-added values, not the old truncated ones + for (let i = 8; i <= 10; i++) { + const entry = index.get(i); + expect(entry.number).to.be(i + 100); + } + }); + + it('remains fully functional after many adds that cycle the ring buffer', function() { + const cacheSize = 8; + index = setupIndexWithEntries(32, { cacheSize }); // 4 full cycles + index.close(); + index.open(); + // All entries readable via disk reads + const entries = index.all(); + expect(entries.length).to.be(32); + for (let i = 0; i < entries.length; i++) { + expect(entries[i].number).to.be(i + 1); + } + }); + + }); + describe('flush', function(){ it('returns false on a closed index', function(){ diff --git a/test/Storage.spec.js b/test/Storage.spec.js index 4eee296..ba55812 100644 --- a/test/Storage.spec.js +++ b/test/Storage.spec.js @@ -296,6 +296,26 @@ describe('Storage', function() { expect(i).to.be(0); }); + it('reads all items in reverse when range length is a multiple of batch size', function() { + // 12 items: sequence 12, 1 (step 11) → old code yielded 2..12 then exited, + // missing item 1. Regression test for the >= readUntil boundary fix. + storage = createStorage(); + storage.open(); + + for (let i = 1; i <= 12; i++) { + storage.write({ foo: i }); + } + storage.close(); + storage.open(); + + let i = 12; + let documents = storage.readRange(i, 1); + for (let doc of documents) { + expect(doc).to.eql({ foo: i-- }); + } + expect(i).to.be(0); + }); + it('can read a sub range', function() { storage = createStorage(); storage.open(); @@ -1020,4 +1040,123 @@ describe('Storage', function() { storage.getPartition(''); }); }); + + describe('preCommit', function() { + + it('calls the hook before writing with the document and partition metadata', function() { + storage = createStorage({ metadata: { allowedRoles: ['admin'] } }); + storage.open(); + + let hookDocument, hookMetadata; + storage.preCommit((document, partitionMetadata) => { + hookDocument = document; + hookMetadata = partitionMetadata; + }); + + storage.write({ foo: 'bar' }); + + expect(hookDocument).to.eql({ foo: 'bar' }); + expect(hookMetadata.allowedRoles).to.eql(['admin']); + }); + + it('aborts the write when the hook throws', function() { + storage = createStorage(); + storage.open(); + + storage.preCommit(() => { + throw new Error('not allowed'); + }); + + expect(() => storage.write({ foo: 'bar' })).to.throwError(/not allowed/); + expect(storage.length).to.be(0); + }); + + it('allows writes when the hook does not throw', function() { + storage = createStorage({ metadata: { allowedRoles: ['admin'] } }); + storage.open(); + + const globalContext = { authorizedRoles: ['admin', 'user'] }; + storage.preCommit((document, partitionMetadata) => { + if (!partitionMetadata.allowedRoles.some(role => globalContext.authorizedRoles.includes(role))) { + throw new Error('Not allowed'); + } + }); + + expect(storage.write({ foo: 'bar' })).to.be(1); + }); + + it('uses per-partition metadata when config.metadata is a function', function() { + storage = createStorage({ + partitioner: (doc) => doc.type, + metadata: (partitionName) => ({ allowedRoles: partitionName === 'admin' ? ['admin'] : ['user'] }) + }); + storage.open(); + + const calls = []; + storage.preCommit((document, partitionMetadata) => { + calls.push({ document, metadata: partitionMetadata }); + }); + + storage.write({ foo: 1, type: 'admin' }); + storage.write({ foo: 2, type: 'user' }); + + expect(calls.length).to.be(2); + expect(calls[0].metadata.allowedRoles).to.eql(['admin']); + expect(calls[1].metadata.allowedRoles).to.eql(['user']); + }); + + }); + + describe('preRead', function() { + + it('calls the hook before reading with the position and partition metadata', function() { + storage = createStorage({ metadata: { allowedRoles: ['admin'] } }); + storage.open(); + storage.write({ foo: 'bar' }); + + let hookPosition, hookMetadata; + storage.preRead((position, partitionMetadata) => { + hookPosition = position; + hookMetadata = partitionMetadata; + }); + + const result = storage.read(1); + + expect(result).to.eql({ foo: 'bar' }); + expect(typeof hookPosition).to.be('number'); + expect(hookMetadata.allowedRoles).to.eql(['admin']); + }); + + it('aborts the read when the hook throws', function() { + storage = createStorage(); + storage.open(); + storage.write({ foo: 'bar' }); + + storage.preRead(() => { + throw new Error('read not allowed'); + }); + + expect(() => storage.read(1)).to.throwError(/read not allowed/); + }); + + it('calls the hook for each document in a range read', function() { + storage = createStorage({ metadata: { allowedRoles: ['user'] } }); + storage.open(); + + for (let i = 1; i <= 3; i++) { + storage.write({ foo: i }); + } + + const hookCalls = []; + storage.preRead((position, partitionMetadata) => { + hookCalls.push({ position, metadata: partitionMetadata }); + }); + + const docs = Array.from(storage.readRange(1, 3)); + expect(docs.length).to.be(3); + expect(hookCalls.length).to.be(3); + expect(hookCalls[0].metadata.allowedRoles).to.eql(['user']); + }); + + }); });