Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
98 changes: 98 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -600,3 +600,101 @@ specify an own unique random secret for this in production.

Alternatively you should always explicitly specify your matchers when opening an existing index, since that will
check the specified matcher matches the one in the index file.

### Partition Metadata and Access Control Hooks

Each stream (partition) can carry an arbitrary metadata object that is written once into the partition's file header
at creation time and cannot be changed afterwards. This makes it a stable anchor for access control policies that
need to be defined when a stream is first created.

Two handlers can be registered on the `EventStore` (or directly on the `Storage` instance) to intercept reads and
writes, using the standard Node.js `EventEmitter` API (`on`, `once`, `off`) or the convenience wrapper methods:

- **`on('preCommit', handler)`** — handler called with `(event, partitionMetadata)` *before* the event is written. Throw from the handler to abort the write.
- **`on('preRead', handler)`** — handler called with `(position, partitionMetadata)` *before* the event is read from disk. Throw from the handler to abort the read.

Multiple handlers can be registered for the same event; they all run on every operation in registration order.
Individual handlers can be removed with `off('preCommit', handler)` / `off('preRead', handler)`, and `once()` is
available for one-time handlers.

> **Performance note:** Both handlers are invoked synchronously on *every* read and write operation. Keep the handler
> logic as cheap and fast as possible — avoid I/O, async operations, or any non-trivial computation inside a handler,
> as the overhead will be paid on every event accessed.

#### Using the `EventStore` API

At the `EventStore` level the unit of work is a *stream*. Use `config.streamMetadata` to attach metadata per stream.
It accepts either:
- A **function** `(streamName) => object` — called once per stream at creation time, so each stream can have its own
metadata (shown in the example below).
- A **plain object** `{ streamName: metadataObject, ... }` — each key is a stream name whose value becomes that
stream's metadata; streams not present in the object receive an empty metadata object `{}`.

Register handlers using `eventstore.on('preCommit', ...)` / `eventstore.on('preRead', ...)`,
or via the convenience methods `eventstore.preCommit(handler)` / `eventstore.preRead(handler)`.

```javascript
const EventStore = require('event-storage');

// Application-owned context — not part of the library
const globalContext = { authorizedRoles: ['user'] };

const eventstore = new EventStore('my-event-store', {
storageDirectory: './data',
// Called once per stream at creation time; the result is persisted in the file header
streamMetadata: (streamName) => ({
allowedRoles: streamName === 'admin-stream' ? ['admin'] : ['user']
})
});

eventstore.on('ready', () => {
// Reject writes to streams whose allowedRoles don't overlap with the caller's roles
eventstore.on('preCommit', (event, streamMetadata) => {
if (!streamMetadata.allowedRoles.some(role => globalContext.authorizedRoles.includes(role))) {
throw new Error(
'Not authorized to write to this stream with roles ' +
JSON.stringify(globalContext.authorizedRoles)
);
}
});

// Reject reads from streams whose allowedRoles don't overlap with the caller's roles
eventstore.on('preRead', (position, streamMetadata) => {
if (!streamMetadata.allowedRoles.some(role => globalContext.authorizedRoles.includes(role))) {
throw new Error(
'Not authorized to read from this stream with roles ' +
JSON.stringify(globalContext.authorizedRoles)
);
}
});

// This write succeeds — 'user' is in allowedRoles for 'user-stream'
eventstore.commit('user-stream', [{ type: 'UserCreated', id: 1 }], 0);

// This write throws — 'admin' is NOT in globalContext.authorizedRoles
eventstore.commit('admin-stream', [{ type: 'AdminAction' }], 0);
});
```

#### Using the `Storage` API directly

If you work with `Storage` directly (bypassing `EventStore`), use `config.metadata` — a function
`(partitionName) => object` called whenever a new partition is created — and register handlers using
`storage.on('preCommit', ...)` / `storage.on('preRead', ...)` or the equivalent convenience methods:

```javascript
const Storage = require('event-storage').Storage;

const storage = new Storage('events', {
partitioner: (doc) => doc.stream,
metadata: (partitionName) => ({
allowedRoles: partitionName === 'admin' ? ['admin'] : ['user']
})
});

storage.on('preCommit', (document, partitionMetadata) => { /* ... */ });
storage.on('preRead', (position, partitionMetadata) => { /* ... */ });
```

The `globalContext` object is entirely application-owned. The library only calls the handler with the position (or
event for `preCommit`) and the stored metadata — everything else is up to the application.
69 changes: 69 additions & 0 deletions bench/bench-read-scenarios.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
const Benchmark = require('benchmark');
const benchmarks = require('beautify-benchmark');
const fs = require('fs-extra');

const Suite = new Benchmark.Suite('read-scenarios');
Suite.on('cycle', (event) => benchmarks.add(event.target));
Suite.on('complete', () => { benchmarks.log(); process.exit(0); });
Suite.on('error', (e) => console.log(e.target.error));

const Stable = require('event-storage');
const Latest = require('../index');

const EVENTS = 10000;
const STREAM_1 = 'stream-1';
const STREAM_2 = 'stream-2';
// Keep each event document close to 512 bytes
const EVENT_DOC = { type: 'SomeEvent', payload: 'x'.repeat(460) };

function countAll(iter) {
let n = 0;
for (const _ of iter) n++; // jshint ignore:line
return n;
}

function populateStore(EventStore, directory) {
return new Promise((resolve, reject) => {
fs.emptyDirSync(directory);
const store = new EventStore('bench', { storageDirectory: directory });
store.once('ready', () => {
for (let i = 0; i < EVENTS; i++) {
store.commit(i % 2 === 0 ? STREAM_1 : STREAM_2, Object.assign({ seq: i }, EVENT_DOC));
}
store.close();
resolve();
});
store.once('error', reject);
});
}

function openReadOnly(EventStore, directory) {
return new Promise((resolve, reject) => {
const store = new EventStore('bench', { storageDirectory: directory, readOnly: true });
store.once('ready', () => resolve(store));
store.once('error', reject);
});
}

populateStore(Stable, 'data/stable')
.then(() => populateStore(Latest, 'data/latest'))
.then(() => Promise.all([
openReadOnly(Stable, 'data/stable'),
openReadOnly(Latest, 'data/latest'),
]))
.then(([stableStore, latestStore]) => {
const third = Math.ceil(EVENTS / 3);
const twoThirds = Math.floor(2 * EVENTS / 3);

Suite
.add('1 - forward full scan [stable]', () => countAll(stableStore.getAllEvents()))
.add('1 - forward full scan [latest]', () => countAll(latestStore.getAllEvents()))
.add('2 - backwards full scan [stable]', () => countAll(stableStore.getAllEvents(-1, 1)))
.add('2 - backwards full scan [latest]', () => countAll(latestStore.getAllEvents(-1, 1)))
.add('3 - join stream [stable]', () => countAll(stableStore.fromStreams('join', [STREAM_1, STREAM_2])))
.add('3 - join stream [latest]', () => countAll(latestStore.fromStreams('join', [STREAM_1, STREAM_2])))
.add('4 - range scan [stable]', () => countAll(stableStore.getAllEvents(third, twoThirds)))
.add('4 - range scan [latest]', () => countAll(latestStore.getAllEvents(third, twoThirds)))
.run();
})
.catch((e) => { console.error(e); process.exit(1); });
2 changes: 1 addition & 1 deletion bench/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
"name": "event-storage-bench",
"version": "0.0.1",
"scripts": {
"bench": "node bench-index.js && node bench-storage.js && node bench-eventstore.js"
"bench": "node bench-index.js && node bench-storage.js && node bench-eventstore.js && node bench-read-scenarios.js"
},
"dependencies": {
"beautify-benchmark": "^0.2.4",
Expand Down
112 changes: 112 additions & 0 deletions src/EventStore.js
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,10 @@ class EventStore extends events.EventEmitter {
* @param {string} [config.streamsDirectory] The directory where the streams should be stored. Default '{storageDirectory}/streams'.
* @param {object} [config.storageConfig] Additional config options given to the storage backend. See `Storage`.
* @param {boolean} [config.readOnly] If the storage should be mounted in read-only mode.
* @param {object|function(string): object} [config.streamMetadata] A metadata object or a function `(streamName) => object`
* that is called whenever a new stream partition is created. The returned object is stored once in the partition
* file header and surfaced to `preCommit` / `preRead` hooks. Takes precedence only when
* `config.storageConfig.metadata` is not also set.
*/
constructor(storeName = 'eventstore', config = {}) {
super();
Expand All @@ -44,6 +48,17 @@ class EventStore extends events.EventEmitter {
readOnly: config.readOnly || false
};
const storageConfig = Object.assign(defaults, config.storageConfig);

// Translate the high-level streamMetadata option into the storage-level metadata function,
// but only when the caller has not already provided a lower-level storageConfig.metadata.
if (config.streamMetadata !== undefined && storageConfig.metadata === undefined) {
if (typeof config.streamMetadata === 'function') {
storageConfig.metadata = config.streamMetadata;
} else {
storageConfig.metadata = (streamName) => config.streamMetadata[streamName] || {};
}
}

this.initialize(storeName, storageConfig);
}

Expand Down Expand Up @@ -145,6 +160,103 @@ class EventStore extends events.EventEmitter {
this.storage.close();
}

/**
* Override EventEmitter.on() to delegate 'preCommit' and 'preRead' event registrations
* to the underlying storage, so that `eventstore.on('preCommit', handler)` works naturally.
* All other events are handled by the default EventEmitter.
*
* @param {string} event
* @param {function} listener
* @returns {this}
*/
on(event, listener) {
if (event === 'preCommit' || event === 'preRead') {
if (event === 'preCommit') {
assert(!(this.storage instanceof Storage.ReadOnly), 'The storage was opened in read-only mode. Can not register a preCommit handler on it.');
}
this.storage.on(event, listener);
return this;
}
return super.on(event, listener);
}

/**
* @inheritDoc
*/
addListener(event, listener) {
return this.on(event, listener);
}

/**
* Override EventEmitter.once() to delegate 'preCommit' and 'preRead' to the underlying storage.
*
* @param {string} event
* @param {function} listener
* @returns {this}
*/
once(event, listener) {
if (event === 'preCommit' || event === 'preRead') {
if (event === 'preCommit') {
assert(!(this.storage instanceof Storage.ReadOnly), 'The storage was opened in read-only mode. Can not register a preCommit handler on it.');
}
this.storage.once(event, listener);
return this;
}
return super.once(event, listener);
}

/**
* Override EventEmitter.off() / removeListener() to delegate 'preCommit' and 'preRead'
* to the underlying storage.
*
* @param {string} event
* @param {function} listener
* @returns {this}
*/
off(event, listener) {
if (event === 'preCommit' || event === 'preRead') {
this.storage.off(event, listener);
return this;
}
return super.off(event, listener);
}

/**
* @inheritDoc
*/
removeListener(event, listener) {
return this.off(event, listener);
}

/**
* Convenience method to register a handler called before an event is committed to storage.
* Equivalent to `eventstore.on('preCommit', hook)`.
* The handler receives `(event, partitionMetadata)` and may throw to abort the write.
* Multiple handlers can be registered; all run on every write in registration order.
* The handler is invoked on every write, so its logic should be cheap, fast, and synchronous.
*
* @api
* @param {function(object, object): void} hook A function receiving (event, partitionMetadata).
* @throws {Error} If the storage was opened in read-only mode.
*/
preCommit(hook) {
this.on('preCommit', hook);
}

/**
* Convenience method to register a handler called before an event is read from storage.
* Equivalent to `eventstore.on('preRead', hook)`.
* The handler receives `(position, partitionMetadata)` and may throw to abort the read.
* Multiple handlers can be registered; all run on every read in registration order.
* The handler is invoked on every read, so its logic should be cheap, fast, and synchronous.
*
* @api
* @param {function(number, object): void} hook A function receiving (position, partitionMetadata).
*/
preRead(hook) {
this.on('preRead', hook);
}

/**
* Get the number of events stored.
*
Expand Down
11 changes: 9 additions & 2 deletions src/Index/ReadOnlyIndex.js
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,16 @@ class ReadOnlyIndex extends watchesFile(ReadableIndex) {
if (!this.fd) {
return;
}
const prevLength = this.data.length;
const prevLength = this._length;
const newLength = this.readFileLength();
this.data.length = newLength;
if (newLength < prevLength) {
// Clear ring buffer slots for the removed entries to avoid stale reads
const oldCacheStart = Math.max(0, prevLength - this.cacheSize);
for (let i = Math.max(oldCacheStart, newLength); i < prevLength; i++) {
this.cache[i % this.cacheSize] = null;
}
}
this._length = newLength;
if (newLength > prevLength) {
this.emit('append', prevLength, newLength);
}
Expand Down
Loading