diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml
index 67c15a8458a..753d9d77645 100644
--- a/.github/workflows/main.yml
+++ b/.github/workflows/main.yml
@@ -964,6 +964,7 @@ jobs:
- run: npm run test:replication-google-drive
- run: npm run test:replication-microsoft-onedrive
+ - run: npm run test:replication-electric-sql
# supabase
- run: npm run supabase:start
diff --git a/docs-src/docs/replication-electric-sql.md b/docs-src/docs/replication-electric-sql.md
new file mode 100644
index 00000000000..84dd2ed8515
--- /dev/null
+++ b/docs-src/docs/replication-electric-sql.md
@@ -0,0 +1,203 @@
+---
+title: Electric-SQL Replication Plugin for RxDB
+slug: replication-electric-sql.html
+description: Sync data from PostgreSQL to RxDB using Electric-SQL. Stream shapes from your database to client-side collections with real-time updates.
+---
+
+import {Steps} from '@site/src/components/steps';
+
+# Electric-SQL Replication Plugin for RxDB
+
+
+
+
+
+The **Electric-SQL Replication Plugin** for RxDB synchronizes data from a PostgreSQL database to RxDB collections using [Electric-SQL](https://electric-sql.com/) (also known as Electric). Electric provides a read-path sync layer that streams "shapes" (subsets of your PostgreSQL data) to client applications in real-time via HTTP.
+
+Under the hood, this plugin is powered by the RxDB [Sync Engine](./replication.md). It uses Electric's HTTP API for incremental pulls with offset-based checkpointing and Electric's live mode for real-time change detection.
+
+## Key Features
+
+- **PostgreSQL to client sync** using Electric's shape-based streaming
+- **Incremental pull** with offset-based checkpoint tracking
+- **Real-time updates** via Electric's live long-polling mode
+- **Offline-first** with RxDB's built-in retry and conflict handling
+- **No external client dependency** required: communicates with Electric over plain HTTP
+- **Flexible push** via user-provided handlers for writing back to PostgreSQL
+
+## How It Works
+
+Electric-SQL provides a read-path sync layer between PostgreSQL and clients. Data flows as:
+
+1. **PostgreSQL** stores the source of truth
+2. **Electric** watches for changes and provides an HTTP shape API
+3. **RxDB** pulls shapes from Electric and stores them locally
+
+For writes, Electric does not include a built-in write path. You must provide your own backend API (REST, GraphQL, etc.) that writes to PostgreSQL. Once the write is committed to PostgreSQL, Electric syncs the change to all connected clients.
+
+## Setting Up RxDB with Electric-SQL
+
+
+
+### Prerequisites
+
+You need a running Electric-SQL service connected to your PostgreSQL database. See the [Electric-SQL documentation](https://electric-sql.com/docs) for setup instructions.
+
+### Install RxDB
+
+```bash
+npm install rxdb
+```
+
+### Create an RxDB Database and Collection
+
+Create an RxDB database and add a collection whose schema matches your PostgreSQL table structure. The primary key must match the table's primary key column.
+
+```ts
+import { createRxDatabase } from 'rxdb/plugins/core';
+import { getRxStorageMemory } from 'rxdb/plugins/storage-memory';
+
+const db = await createRxDatabase({
+ name: 'mydb',
+ storage: getRxStorageMemory()
+});
+
+await db.addCollections({
+ items: {
+ schema: {
+ version: 0,
+ primaryKey: 'id',
+ type: 'object',
+ properties: {
+ id: { type: 'string', maxLength: 100 },
+ name: { type: 'string' },
+ value: { type: 'number' }
+ },
+ required: ['id', 'name']
+ }
+ }
+});
+```
+
+### Start Replication
+
+Connect your RxDB collection to Electric-SQL. The `url` should point to your Electric shape endpoint, and `params.table` specifies the PostgreSQL table to sync.
+
+```ts
+import { replicateElectricSQL } from 'rxdb/plugins/replication-electric-sql';
+
+const replicationState = replicateElectricSQL({
+ collection: db.items,
+ replicationIdentifier: 'items-electric',
+ url: 'http://localhost:3000/v1/shape',
+ params: {
+ table: 'items'
+ },
+ live: true,
+ pull: {
+ batchSize: 100
+ }
+});
+
+replicationState.error$.subscribe(err => console.error('[replication]', err));
+await replicationState.awaitInitialReplication();
+```
+
+### Add a Push Handler (Optional)
+
+Since Electric-SQL only provides a read path, you need to supply your own push handler to write changes back to your PostgreSQL database through your backend API.
+
+```ts
+import { replicateElectricSQL } from 'rxdb/plugins/replication-electric-sql';
+
+const replicationState = replicateElectricSQL({
+ collection: db.items,
+ replicationIdentifier: 'items-electric',
+ url: 'http://localhost:3000/v1/shape',
+ params: {
+ table: 'items'
+ },
+ live: true,
+ pull: {
+ batchSize: 100
+ },
+ push: {
+ async handler(rows) {
+ const conflicts = [];
+ for (const row of rows) {
+ const response = await fetch('https://your-backend.com/api/items', {
+ method: 'POST',
+ headers: { 'Content-Type': 'application/json' },
+ body: JSON.stringify(row.newDocumentState)
+ });
+ if (!response.ok) {
+ // handle conflict: fetch current server state and add to conflicts
+ }
+ }
+ return conflicts;
+ },
+ batchSize: 10
+ }
+});
+```
+
+
+
+
+## Configuration Options
+
+### Shape Parameters
+
+Use the `params` object to configure the Electric shape. You can filter rows with a `where` clause or select specific columns:
+
+```ts
+const replicationState = replicateElectricSQL({
+ collection: db.items,
+ replicationIdentifier: 'items-electric',
+ url: 'http://localhost:3000/v1/shape',
+ params: {
+ table: 'items',
+ where: 'status=\'active\'',
+ columns: 'id,name,value',
+ replica: 'full'
+ },
+ pull: {}
+});
+```
+
+### Custom Headers
+
+Pass custom HTTP headers for authentication or other purposes:
+
+```ts
+const replicationState = replicateElectricSQL({
+ // ...
+ headers: {
+ 'Authorization': 'Bearer your-token'
+ },
+ pull: {}
+});
+```
+
+### Custom Fetch Function
+
+Provide a custom fetch implementation, for example when using a custom HTTP client or when running in an environment without a global `fetch`:
+
+```ts
+const replicationState = replicateElectricSQL({
+ // ...
+ fetch: myCustomFetch,
+ pull: {}
+});
+```
+
+## Handling Deletes
+
+Electric-SQL reports physical deletes from PostgreSQL. The plugin maps Electric's `delete` operations to RxDB's soft-delete mechanism by setting `_deleted: true` on the document. If your PostgreSQL table uses soft deletes (a boolean column), those are synced as regular field updates.
+
+## Follow Up
+
+- **Replication API Reference:** Learn the core concepts and lifecycle hooks: [Replication](./replication.md)
+- **Electric-SQL Documentation:** [electric-sql.com/docs](https://electric-sql.com/docs)
+- **Offline-First Guide:** Caching, retries, and conflict strategies: [Local-First](./articles/local-first-future.md)
+- **Community:** Questions or feedback? Join our Discord: [Chat](./chat)
diff --git a/docs-src/sidebars.js b/docs-src/sidebars.js
index 5b9ad80d47f..474fa09e810 100644
--- a/docs-src/sidebars.js
+++ b/docs-src/sidebars.js
@@ -314,6 +314,11 @@ const sidebars = {
id: 'replication-nats',
label: 'NATS Replication'
},
+ {
+ type: 'doc',
+ id: 'replication-electric-sql',
+ label: 'Electric-SQL Replication'
+ },
{
type: 'doc',
id: 'replication-appwrite',
diff --git a/docs-src/static/files/icons/electric-sql.svg b/docs-src/static/files/icons/electric-sql.svg
new file mode 100644
index 00000000000..cb93063928f
--- /dev/null
+++ b/docs-src/static/files/icons/electric-sql.svg
@@ -0,0 +1,4 @@
+
diff --git a/package.json b/package.json
index 370cf71a48d..3405804d527 100644
--- a/package.json
+++ b/package.json
@@ -330,6 +330,12 @@
"import": "./dist/esm/plugins/replication-couchdb/index.js",
"default": "./dist/esm/plugins/replication-couchdb/index.js"
},
+ "./plugins/replication-electric-sql": {
+ "types": "./dist/types/plugins/replication-electric-sql/index.d.ts",
+ "require": "./dist/cjs/plugins/replication-electric-sql/index.js",
+ "import": "./dist/esm/plugins/replication-electric-sql/index.js",
+ "default": "./dist/esm/plugins/replication-electric-sql/index.js"
+ },
"./plugins/replication-nats": {
"types": "./dist/types/plugins/replication-nats/index.d.ts",
"require": "./dist/cjs/plugins/replication-nats/index.js",
@@ -468,6 +474,7 @@
"test:replication-supabase": "npm run transpile && cross-env DEFAULT_STORAGE=dexie mocha --config ./config/.mocharc.cjs ./test_tmp/replication-supabase.test.js",
"test:replication-google-drive": "npm run transpile && cross-env DEFAULT_STORAGE=dexie tsx ./node_modules/mocha/bin/mocha --config ./config/.mocharc.cjs ./test/replication-google-drive.test.ts",
"test:replication-microsoft-onedrive": "npm run transpile && cross-env DEFAULT_STORAGE=dexie tsx ./node_modules/mocha/bin/mocha --config ./config/.mocharc.cjs ./test/replication-microsoft-onedrive.test.ts",
+ "test:replication-electric-sql": "npm run transpile && cross-env DEFAULT_STORAGE=memory mocha --config ./config/.mocharc.cjs ./test_tmp/replication-electric-sql.test.js",
"test:core": "npm run transpile && mocha ./test_tmp/unit/core.node.js",
"test:full": "npm run transpile && mocha ./test_tmp/unit/full.node.js",
"test:typings": "npm run build:plugins && tsc --allowImportingTsExtensions --noEmit --skipLibCheck --lib \"ES2022,DOM\" ./test/typings.test.ts",
diff --git a/plugins/replication-electric-sql/index.cjs b/plugins/replication-electric-sql/index.cjs
new file mode 100644
index 00000000000..a672d7c2291
--- /dev/null
+++ b/plugins/replication-electric-sql/index.cjs
@@ -0,0 +1,2 @@
+const pkg = require('../../dist/cjs/plugins/replication-electric-sql/index.js');
+module.exports = pkg;
diff --git a/plugins/replication-electric-sql/index.d.cts b/plugins/replication-electric-sql/index.d.cts
new file mode 100644
index 00000000000..1e21727714b
--- /dev/null
+++ b/plugins/replication-electric-sql/index.d.cts
@@ -0,0 +1 @@
+export * from '../../dist/types/plugins/replication-electric-sql/index';
diff --git a/plugins/replication-electric-sql/index.d.mts b/plugins/replication-electric-sql/index.d.mts
new file mode 100644
index 00000000000..1e21727714b
--- /dev/null
+++ b/plugins/replication-electric-sql/index.d.mts
@@ -0,0 +1 @@
+export * from '../../dist/types/plugins/replication-electric-sql/index';
diff --git a/plugins/replication-electric-sql/index.mjs b/plugins/replication-electric-sql/index.mjs
new file mode 100644
index 00000000000..56ff2eb5280
--- /dev/null
+++ b/plugins/replication-electric-sql/index.mjs
@@ -0,0 +1 @@
+export * from '../../dist/esm/plugins/replication-electric-sql/index.js';
diff --git a/plugins/replication-electric-sql/index.ts b/plugins/replication-electric-sql/index.ts
new file mode 100644
index 00000000000..1e21727714b
--- /dev/null
+++ b/plugins/replication-electric-sql/index.ts
@@ -0,0 +1 @@
+export * from '../../dist/types/plugins/replication-electric-sql/index';
diff --git a/plugins/replication-electric-sql/package.json b/plugins/replication-electric-sql/package.json
new file mode 100644
index 00000000000..68c2b8e1856
--- /dev/null
+++ b/plugins/replication-electric-sql/package.json
@@ -0,0 +1,18 @@
+{
+ "name": "rxdb-plugins-replication-electric-sql",
+ "description": "This package.json file is generated by the \"npm run build:plugins\" script, do not edit it manually!",
+ "sideEffects": false,
+ "types": "../../dist/types/plugins/replication-electric-sql/index.d.ts",
+ "exports": {
+ ".": {
+ "default": {
+ "types": "./index.d.ts",
+ "import": "./index.mjs",
+ "default": "./index.cjs"
+ }
+ },
+ "./package.json": "./package.json"
+ },
+ "main": "./index.cjs",
+ "module": "./index.mjs"
+}
\ No newline at end of file
diff --git a/src/plugins/replication-electric-sql/electric-sql-helper.ts b/src/plugins/replication-electric-sql/electric-sql-helper.ts
new file mode 100644
index 00000000000..8677f47dfa0
--- /dev/null
+++ b/src/plugins/replication-electric-sql/electric-sql-helper.ts
@@ -0,0 +1,73 @@
+import type { WithDeleted } from '../../types/index.d.ts';
+import type { ElectricSQLShapeParams } from './electric-sql-types.ts';
+
+export type ElectricSQLMessage = {
+ offset?: string;
+ key?: string;
+ value?: Record;
+ headers: {
+ operation?: 'insert' | 'update' | 'delete';
+ control?: 'up-to-date' | 'must-refetch';
+ };
+};
+
+/**
+ * Builds the URL for an Electric shape request.
+ */
+export function buildElectricUrl(
+ baseUrl: string,
+ params: ElectricSQLShapeParams,
+ offset: string,
+ handle?: string,
+ live?: boolean
+): string {
+ const urlObj = new URL(baseUrl);
+
+ for (const [key, value] of Object.entries(params)) {
+ if (value !== undefined) {
+ urlObj.searchParams.set(key, value);
+ }
+ }
+
+ urlObj.searchParams.set('offset', offset);
+
+ if (handle) {
+ urlObj.searchParams.set('handle', handle);
+ }
+
+ if (live) {
+ urlObj.searchParams.set('live', 'true');
+ }
+
+ return urlObj.toString();
+}
+
+/**
+ * Converts an Electric-SQL change message to an RxDB document.
+ * Returns null for control messages.
+ */
+export function electricMessageToRxDBDocData(
+ message: ElectricSQLMessage,
+ primaryPath: string
+): WithDeleted | null {
+ if (!message.headers.operation || !message.value) {
+ return null;
+ }
+
+ const doc: any = { ...message.value };
+
+ if (message.headers.operation === 'delete') {
+ doc._deleted = true;
+ } else {
+ doc._deleted = false;
+ }
+
+ return doc as WithDeleted;
+}
+
+/**
+ * Checks if a list of Electric-SQL messages contains a 'must-refetch' control message.
+ */
+export function hasMustRefetch(messages: ElectricSQLMessage[]): boolean {
+ return messages.some(m => m.headers?.control === 'must-refetch');
+}
diff --git a/src/plugins/replication-electric-sql/electric-sql-types.ts b/src/plugins/replication-electric-sql/electric-sql-types.ts
new file mode 100644
index 00000000000..5327fda6478
--- /dev/null
+++ b/src/plugins/replication-electric-sql/electric-sql-types.ts
@@ -0,0 +1,54 @@
+import type {
+ ReplicationOptions,
+ ReplicationPullOptions,
+ ReplicationPushOptions
+} from '../../types/index.d.ts';
+
+export type ElectricSQLCheckpointType = {
+ offset: string;
+ handle: string;
+};
+
+export type ElectricSQLShapeParams = {
+ table: string;
+ where?: string;
+ columns?: string;
+ replica?: 'default' | 'full';
+ [key: string]: string | undefined;
+};
+
+export type ElectricSQLSyncPullOptions =
+ Omit, 'handler' | 'stream$'>;
+
+export type ElectricSQLSyncPushOptions = ReplicationPushOptions;
+
+export type SyncOptionsElectricSQL = Omit<
+ ReplicationOptions,
+ 'pull' | 'push'
+> & {
+ /**
+ * URL to the Electric shape endpoint.
+ * Example: 'http://localhost:3000/v1/shape'
+ */
+ url: string;
+ /**
+ * Parameters for the Electric shape request.
+ * Must include 'table' at minimum.
+ */
+ params: ElectricSQLShapeParams;
+ /**
+ * Custom HTTP headers sent with each request to the Electric endpoint.
+ */
+ headers?: Record;
+ /**
+ * Custom fetch function. Defaults to the global fetch.
+ */
+ fetch?: typeof fetch;
+ pull?: ElectricSQLSyncPullOptions;
+ /**
+ * Push handler for writing data back to PostgreSQL.
+ * Electric-SQL only provides a read path (syncing data from PostgreSQL),
+ * so you must provide your own push handler that writes to your backend API.
+ */
+ push?: ElectricSQLSyncPushOptions;
+};
diff --git a/src/plugins/replication-electric-sql/index.ts b/src/plugins/replication-electric-sql/index.ts
new file mode 100644
index 00000000000..da773b61912
--- /dev/null
+++ b/src/plugins/replication-electric-sql/index.ts
@@ -0,0 +1,294 @@
+import {
+ ensureNotFalsy,
+ errorToPlainJson,
+ flatClone,
+ promiseWait
+} from '../../plugins/utils/index.ts';
+
+import { RxDBLeaderElectionPlugin } from '../leader-election/index.ts';
+import type {
+ RxCollection,
+ ReplicationPullOptions,
+ ReplicationPushOptions,
+ RxReplicationPullStreamItem,
+ WithDeleted
+} from '../../types/index.d.ts';
+import {
+ RxReplicationState,
+ startReplicationOnLeaderShip
+} from '../replication/index.ts';
+import {
+ addRxPlugin,
+ newRxError
+} from '../../index.ts';
+
+import type {
+ ElectricSQLCheckpointType,
+ SyncOptionsElectricSQL
+} from './electric-sql-types.ts';
+import { Subject } from 'rxjs';
+import {
+ buildElectricUrl,
+ electricMessageToRxDBDocData,
+ hasMustRefetch,
+ type ElectricSQLMessage
+} from './electric-sql-helper.ts';
+
+export * from './electric-sql-helper.ts';
+export * from './electric-sql-types.ts';
+
+export class RxElectricSQLReplicationState extends RxReplicationState {
+ constructor(
+ public readonly replicationIdentifier: string,
+ public readonly collection: RxCollection,
+ public readonly pull?: ReplicationPullOptions,
+ public readonly push?: ReplicationPushOptions,
+ public readonly live: boolean = true,
+ public retryTime: number = 1000 * 5,
+ public autoStart: boolean = true
+ ) {
+ super(
+ replicationIdentifier,
+ collection,
+ '_deleted',
+ pull,
+ push,
+ live,
+ retryTime,
+ autoStart
+ );
+ }
+}
+
+export function replicateElectricSQL(
+ options: SyncOptionsElectricSQL
+): RxElectricSQLReplicationState {
+ options = flatClone(options);
+ const collection = options.collection;
+ const primaryPath = collection.schema.primaryPath;
+ addRxPlugin(RxDBLeaderElectionPlugin);
+
+ options.live = typeof options.live === 'undefined' ? true : options.live;
+ options.waitForLeadership = typeof options.waitForLeadership === 'undefined' ? true : options.waitForLeadership;
+
+ const useFetch = options.fetch || fetch;
+ const pullStream$: Subject> = new Subject();
+
+ /**
+ * Shared state so that the live-polling loop knows the offset
+ * the pull handler has reached.
+ */
+ let liveOffset = '';
+ let liveHandle = '';
+
+ let replicationPrimitivesPull: ReplicationPullOptions | undefined;
+
+ if (options.pull) {
+ replicationPrimitivesPull = {
+ async handler(
+ lastPulledCheckpoint: ElectricSQLCheckpointType | undefined,
+ batchSize: number
+ ) {
+ let offset = lastPulledCheckpoint?.offset ?? '-1';
+ let handle = lastPulledCheckpoint?.handle ?? '';
+
+ const url = buildElectricUrl(
+ options.url,
+ options.params,
+ offset,
+ handle || undefined
+ );
+
+ const response = await useFetch(url, {
+ headers: options.headers || {}
+ });
+
+ if (!response.ok) {
+ throw newRxError('RC_PULL', {
+ args: { url, status: response.status }
+ });
+ }
+
+ let messages: ElectricSQLMessage[] = await response.json();
+
+ let electricOffset = response.headers.get('electric-offset') || offset;
+ let electricHandle = response.headers.get('electric-handle') || handle;
+
+ /**
+ * When Electric sends a must-refetch control message,
+ * the shape has changed and we must start over from
+ * offset -1. We reset and immediately re-fetch inside
+ * the handler so the replication framework never sees
+ * a confusing empty-but-not-done response.
+ */
+ if (hasMustRefetch(messages)) {
+ offset = '-1';
+ handle = '';
+ liveOffset = '';
+ liveHandle = '';
+
+ const retryUrl = buildElectricUrl(
+ options.url,
+ options.params,
+ offset,
+ undefined
+ );
+ const retryResponse = await useFetch(retryUrl, {
+ headers: options.headers || {}
+ });
+ if (!retryResponse.ok) {
+ throw newRxError('RC_PULL', {
+ args: { url: retryUrl, status: retryResponse.status }
+ });
+ }
+ messages = await retryResponse.json();
+ electricOffset = retryResponse.headers.get('electric-offset') || offset;
+ electricHandle = retryResponse.headers.get('electric-handle') || handle;
+ }
+
+ liveOffset = electricOffset;
+ liveHandle = electricHandle;
+
+ const documents: WithDeleted[] = [];
+ for (const message of messages) {
+ const doc = electricMessageToRxDBDocData(message, primaryPath);
+ if (doc) {
+ documents.push(doc);
+ }
+ }
+
+ const newCheckpoint: ElectricSQLCheckpointType = {
+ offset: electricOffset,
+ handle: electricHandle
+ };
+
+ return {
+ documents,
+ checkpoint: newCheckpoint
+ };
+ },
+ batchSize: ensureNotFalsy(options.pull).batchSize,
+ modifier: ensureNotFalsy(options.pull).modifier,
+ stream$: pullStream$.asObservable(),
+ initialCheckpoint: options.pull.initialCheckpoint
+ };
+ }
+
+ const replicationPrimitivesPush: ReplicationPushOptions | undefined = options.push ? {
+ handler: options.push.handler,
+ batchSize: options.push.batchSize,
+ modifier: options.push.modifier,
+ initialCheckpoint: options.push.initialCheckpoint
+ } : undefined;
+
+
+ const replicationState = new RxElectricSQLReplicationState(
+ options.replicationIdentifier,
+ collection,
+ replicationPrimitivesPull,
+ replicationPrimitivesPush,
+ options.live,
+ options.retryTime,
+ options.autoStart
+ );
+
+ /**
+ * Use Electric's live mode for real-time change detection.
+ * After the initial sync completes (liveHandle is set), we long-poll
+ * the Electric endpoint with live=true. When changes are detected,
+ * we deliver documents through the pullStream$.
+ */
+ if (options.live && options.pull) {
+ const startBefore = replicationState.start.bind(replicationState);
+ const cancelBefore = replicationState.cancel.bind(replicationState);
+
+ replicationState.start = () => {
+ let isCanceled = false;
+ let abortController: AbortController | null = null;
+
+ const poll = async () => {
+ while (!isCanceled && !liveHandle) {
+ await promiseWait(100);
+ }
+
+ while (!isCanceled) {
+ try {
+ abortController = new AbortController();
+ const url = buildElectricUrl(
+ options.url,
+ options.params,
+ liveOffset,
+ liveHandle,
+ true
+ );
+
+ const response = await useFetch(url, {
+ headers: options.headers || {},
+ signal: abortController.signal
+ });
+
+ if (!response.ok) {
+ throw newRxError('RC_STREAM', {
+ error: errorToPlainJson(new Error('Live polling failed: ' + response.status))
+ });
+ }
+
+ const messages: ElectricSQLMessage[] = await response.json();
+
+ const newOffset = response.headers.get('electric-offset');
+ if (newOffset) liveOffset = newOffset;
+ const newHandle = response.headers.get('electric-handle');
+ if (newHandle) liveHandle = newHandle;
+
+ if (hasMustRefetch(messages)) {
+ pullStream$.next('RESYNC');
+ liveOffset = '';
+ liveHandle = '';
+ continue;
+ }
+
+ const documents: WithDeleted[] = [];
+ for (const message of messages) {
+ const doc = electricMessageToRxDBDocData(message, primaryPath);
+ if (doc) {
+ documents.push(doc);
+ }
+ }
+
+ if (documents.length > 0) {
+ pullStream$.next({
+ documents,
+ checkpoint: {
+ offset: liveOffset,
+ handle: liveHandle
+ }
+ });
+ }
+ } catch (err: any) {
+ if (isCanceled) break;
+ replicationState.subjects.error.next(
+ newRxError('RC_STREAM', {
+ error: errorToPlainJson(err)
+ })
+ );
+ await promiseWait(replicationState.retryTime);
+ }
+ }
+ };
+
+ poll();
+
+ replicationState.cancel = () => {
+ isCanceled = true;
+ if (abortController) abortController.abort();
+ return cancelBefore();
+ };
+
+ return startBefore();
+ };
+ }
+
+ startReplicationOnLeaderShip(options.waitForLeadership, replicationState);
+
+ return replicationState;
+}
diff --git a/test/replication-electric-sql.test.ts b/test/replication-electric-sql.test.ts
new file mode 100644
index 00000000000..929a6618399
--- /dev/null
+++ b/test/replication-electric-sql.test.ts
@@ -0,0 +1,655 @@
+import assert from 'assert';
+
+import {
+ randomToken,
+ addRxPlugin
+} from '../plugins/core/index.mjs';
+
+import {
+ schemaObjects,
+ humansCollection,
+ ensureReplicationHasNoErrors,
+ HumanWithTimestampDocumentType
+} from '../plugins/test-utils/index.mjs';
+
+import { RxDBDevModePlugin } from '../plugins/dev-mode/index.mjs';
+import config from './unit/config.ts';
+import { wait, waitUntil } from 'async-test-util';
+
+import {
+ replicateElectricSQL,
+ RxElectricSQLReplicationState,
+ buildElectricUrl,
+ electricMessageToRxDBDocData,
+ hasMustRefetch,
+ type ElectricSQLMessage
+} from '../plugins/replication-electric-sql/index.mjs';
+
+/**
+ * The tests for the Electric-SQL replication plugin
+ * use a mock fetch function because Electric-SQL
+ * communicates over plain HTTP, so no real backend is needed.
+ */
+describe('replication-electric-sql.test.ts', function () {
+ this.timeout(1000 * 20);
+ addRxPlugin(RxDBDevModePlugin);
+ config.storage.init?.();
+
+ type TestDocType = HumanWithTimestampDocumentType;
+
+ /**
+ * Use a low batchSize in all tests
+ * to make it easier to test boundaries.
+ */
+ const batchSize = 5;
+
+ function createMockResponse(
+ messages: ElectricSQLMessage[],
+ offset: string = '0_0',
+ handle: string = 'test-handle',
+ status: number = 200
+ ): Response {
+ const headers = new Headers();
+ headers.set('electric-offset', offset);
+ headers.set('electric-handle', handle);
+ return new Response(JSON.stringify(messages), {
+ status,
+ headers
+ });
+ }
+
+ function makeInsertMessage(doc: TestDocType): ElectricSQLMessage {
+ return {
+ offset: '0_0',
+ key: doc.id,
+ value: doc as any,
+ headers: {
+ operation: 'insert'
+ }
+ };
+ }
+
+ function makeDeleteMessage(doc: TestDocType): ElectricSQLMessage {
+ return {
+ offset: '0_0',
+ key: doc.id,
+ value: doc as any,
+ headers: {
+ operation: 'delete'
+ }
+ };
+ }
+
+ function makeUpToDateMessage(): ElectricSQLMessage {
+ return {
+ headers: {
+ control: 'up-to-date'
+ }
+ };
+ }
+
+ function makeMustRefetchMessage(): ElectricSQLMessage {
+ return {
+ headers: {
+ control: 'must-refetch'
+ }
+ };
+ }
+
+ describe('helper functions', () => {
+ describe('buildElectricUrl()', () => {
+ it('should build a basic URL with table and offset', () => {
+ const url = buildElectricUrl(
+ 'http://localhost:3000/v1/shape',
+ { table: 'items' },
+ '-1'
+ );
+ assert.ok(url.includes('table=items'));
+ assert.ok(url.includes('offset=-1'));
+ assert.ok(!url.includes('handle='));
+ assert.ok(!url.includes('live='));
+ });
+ it('should include handle when provided', () => {
+ const url = buildElectricUrl(
+ 'http://localhost:3000/v1/shape',
+ { table: 'items' },
+ '0_0',
+ 'my-handle'
+ );
+ assert.ok(url.includes('handle=my-handle'));
+ });
+ it('should include live=true when live mode is enabled', () => {
+ const url = buildElectricUrl(
+ 'http://localhost:3000/v1/shape',
+ { table: 'items' },
+ '0_0',
+ 'my-handle',
+ true
+ );
+ assert.ok(url.includes('live=true'));
+ });
+ it('should include where and columns params', () => {
+ const url = buildElectricUrl(
+ 'http://localhost:3000/v1/shape',
+ { table: 'items', where: 'status=active', columns: 'id,name' },
+ '-1'
+ );
+ assert.ok(url.includes('where='));
+ assert.ok(url.includes('columns='));
+ });
+ });
+ describe('electricMessageToRxDBDocData()', () => {
+ it('should convert an insert message to a doc with _deleted=false', () => {
+ const message: ElectricSQLMessage = {
+ offset: '0_0',
+ key: 'doc1',
+ value: { id: 'doc1', name: 'Alice', age: 30, updatedAt: 1000 },
+ headers: { operation: 'insert' }
+ };
+ const doc = electricMessageToRxDBDocData(message, 'id');
+ assert.ok(doc);
+ assert.strictEqual(doc._deleted, false);
+ assert.strictEqual((doc as any).id, 'doc1');
+ assert.strictEqual((doc as any).name, 'Alice');
+ });
+ it('should convert an update message to a doc with _deleted=false', () => {
+ const message: ElectricSQLMessage = {
+ offset: '0_1',
+ key: 'doc1',
+ value: { id: 'doc1', name: 'Alice Updated', age: 31, updatedAt: 2000 },
+ headers: { operation: 'update' }
+ };
+ const doc = electricMessageToRxDBDocData(message, 'id');
+ assert.ok(doc);
+ assert.strictEqual(doc._deleted, false);
+ assert.strictEqual((doc as any).name, 'Alice Updated');
+ });
+ it('should convert a delete message to a doc with _deleted=true', () => {
+ const message: ElectricSQLMessage = {
+ offset: '0_2',
+ key: 'doc1',
+ value: { id: 'doc1', name: 'Alice', age: 30, updatedAt: 3000 },
+ headers: { operation: 'delete' }
+ };
+ const doc = electricMessageToRxDBDocData(message, 'id');
+ assert.ok(doc);
+ assert.strictEqual(doc._deleted, true);
+ });
+ it('should return null for control messages', () => {
+ const message: ElectricSQLMessage = {
+ headers: { control: 'up-to-date' }
+ };
+ const doc = electricMessageToRxDBDocData(message, 'id');
+ assert.strictEqual(doc, null);
+ });
+ });
+ describe('hasMustRefetch()', () => {
+ it('should return true when must-refetch is present', () => {
+ const messages: ElectricSQLMessage[] = [
+ makeMustRefetchMessage()
+ ];
+ assert.strictEqual(hasMustRefetch(messages), true);
+ });
+ it('should return false when no must-refetch is present', () => {
+ const messages: ElectricSQLMessage[] = [
+ makeUpToDateMessage()
+ ];
+ assert.strictEqual(hasMustRefetch(messages), false);
+ });
+ it('should return false for empty array', () => {
+ assert.strictEqual(hasMustRefetch([]), false);
+ });
+ });
+ });
+
+ describe('replication', () => {
+ describe('pull', () => {
+ it('should pull documents from the mock server', async () => {
+ const collection = await humansCollection.createHumanWithTimestamp(0, undefined, false);
+
+ const testDocs = [
+ schemaObjects.humanWithTimestampData(),
+ schemaObjects.humanWithTimestampData()
+ ];
+
+ let fetchCallCount = 0;
+ function mockFetch(): Promise {
+ fetchCallCount++;
+ if (fetchCallCount === 1) {
+ return Promise.resolve(createMockResponse(
+ [
+ ...testDocs.map(d => makeInsertMessage(d)),
+ makeUpToDateMessage()
+ ],
+ '0_2',
+ 'handle-1'
+ ));
+ }
+ return Promise.resolve(createMockResponse(
+ [makeUpToDateMessage()],
+ '0_2',
+ 'handle-1'
+ ));
+ }
+
+ const replicationState = replicateElectricSQL({
+ replicationIdentifier: 'test-pull-' + randomToken(10),
+ collection,
+ url: 'http://localhost:3000/v1/shape',
+ params: { table: 'humans' },
+ live: false,
+ pull: { batchSize },
+ fetch: mockFetch as typeof fetch,
+ waitForLeadership: false
+ });
+ ensureReplicationHasNoErrors(replicationState);
+ await replicationState.awaitInitialReplication();
+
+ const docsInDb = await collection.find().exec();
+ assert.strictEqual(docsInDb.length, 2);
+
+ for (const testDoc of testDocs) {
+ const found = docsInDb.find(d => d.primary === testDoc.id);
+ assert.ok(found, 'Document ' + testDoc.id + ' should be in the collection');
+ assert.strictEqual(found.name, testDoc.name);
+ assert.strictEqual(found.age, testDoc.age);
+ }
+
+ await replicationState.cancel();
+ await collection.database.close();
+ });
+ it('should track checkpoint across pulls', async () => {
+ const collection = await humansCollection.createHumanWithTimestamp(0, undefined, false);
+
+ const doc1 = schemaObjects.humanWithTimestampData();
+ const doc2 = schemaObjects.humanWithTimestampData();
+
+ let fetchCallCount = 0;
+ const capturedUrls: string[] = [];
+ function mockFetch(url: string | URL | Request): Promise {
+ capturedUrls.push(typeof url === 'string' ? url : url instanceof URL ? url.toString() : url.url);
+ fetchCallCount++;
+
+ if (fetchCallCount === 1) {
+ return Promise.resolve(createMockResponse(
+ [makeInsertMessage(doc1)],
+ '0_1',
+ 'handle-1'
+ ));
+ }
+ if (fetchCallCount === 2) {
+ return Promise.resolve(createMockResponse(
+ [makeInsertMessage(doc2)],
+ '0_2',
+ 'handle-1'
+ ));
+ }
+ return Promise.resolve(createMockResponse(
+ [makeUpToDateMessage()],
+ '0_2',
+ 'handle-1'
+ ));
+ }
+
+ const replicationState = replicateElectricSQL({
+ replicationIdentifier: 'test-checkpoint-' + randomToken(10),
+ collection,
+ url: 'http://localhost:3000/v1/shape',
+ params: { table: 'humans' },
+ live: false,
+ pull: { batchSize: 1 },
+ fetch: mockFetch as typeof fetch,
+ waitForLeadership: false
+ });
+ ensureReplicationHasNoErrors(replicationState);
+ await replicationState.awaitInitialReplication();
+
+ const docsInDb = await collection.find().exec();
+ assert.strictEqual(docsInDb.length, 2);
+
+ // second request should contain the offset from the first
+ assert.ok(capturedUrls[1].includes('offset=0_1'), 'Second URL should use checkpoint offset');
+ assert.ok(capturedUrls[1].includes('handle=handle-1'), 'Second URL should include handle');
+
+ await replicationState.cancel();
+ await collection.database.close();
+ });
+ it('should handle delete operations', async () => {
+ const collection = await humansCollection.createHumanWithTimestamp(0, undefined, false);
+
+ const doc1 = schemaObjects.humanWithTimestampData();
+ const doc2 = schemaObjects.humanWithTimestampData();
+
+ let fetchCallCount = 0;
+ function mockFetch(): Promise {
+ fetchCallCount++;
+ if (fetchCallCount === 1) {
+ /**
+ * Return two inserts and one delete in a single batch.
+ * This matches how Electric-SQL delivers the initial shape:
+ * the full history is compacted, so a deleted row arrives
+ * as a single delete message.
+ */
+ return Promise.resolve(createMockResponse(
+ [
+ makeInsertMessage(doc1),
+ makeDeleteMessage(doc2),
+ makeUpToDateMessage()
+ ],
+ '0_2',
+ 'handle-1'
+ ));
+ }
+ return Promise.resolve(createMockResponse(
+ [makeUpToDateMessage()],
+ '0_2',
+ 'handle-1'
+ ));
+ }
+
+ const replicationState = replicateElectricSQL({
+ replicationIdentifier: 'test-delete-' + randomToken(10),
+ collection,
+ url: 'http://localhost:3000/v1/shape',
+ params: { table: 'humans' },
+ live: false,
+ pull: { batchSize },
+ fetch: mockFetch as typeof fetch,
+ waitForLeadership: false
+ });
+ ensureReplicationHasNoErrors(replicationState);
+ await replicationState.awaitInitialReplication();
+
+ const docsInDb = await collection.find().exec();
+ /**
+ * doc1 should be present (inserted), doc2 should not
+ * be visible (it was deleted).
+ */
+ assert.strictEqual(docsInDb.length, 1, 'Only the non-deleted doc should be in results');
+ assert.strictEqual(docsInDb[0].primary, doc1.id);
+
+ await replicationState.cancel();
+ await collection.database.close();
+ });
+ it('should handle must-refetch by re-fetching from scratch', async () => {
+ const collection = await humansCollection.createHumanWithTimestamp(0, undefined, false);
+ const doc1 = schemaObjects.humanWithTimestampData();
+
+ let fetchCallCount = 0;
+ const capturedUrls: string[] = [];
+ function mockFetch(url: string | URL | Request): Promise {
+ fetchCallCount++;
+ capturedUrls.push(typeof url === 'string' ? url : url instanceof URL ? url.toString() : url.url);
+ if (fetchCallCount === 1) {
+ return Promise.resolve(createMockResponse(
+ [makeMustRefetchMessage()],
+ '0_0',
+ 'handle-1'
+ ));
+ }
+ if (fetchCallCount === 2) {
+ return Promise.resolve(createMockResponse(
+ [makeInsertMessage(doc1), makeUpToDateMessage()],
+ '0_1',
+ 'handle-2'
+ ));
+ }
+ return Promise.resolve(createMockResponse(
+ [makeUpToDateMessage()],
+ '0_1',
+ 'handle-2'
+ ));
+ }
+
+ const replicationState = replicateElectricSQL({
+ replicationIdentifier: 'test-refetch-' + randomToken(10),
+ collection,
+ url: 'http://localhost:3000/v1/shape',
+ params: { table: 'humans' },
+ live: false,
+ pull: { batchSize },
+ fetch: mockFetch as typeof fetch,
+ waitForLeadership: false
+ });
+ ensureReplicationHasNoErrors(replicationState);
+ await replicationState.awaitInitialReplication();
+
+ const docsInDb = await collection.find().exec();
+ assert.strictEqual(docsInDb.length, 1);
+ assert.strictEqual(docsInDb[0].primary, doc1.id);
+
+ // The second URL (retry after must-refetch) should start from offset -1
+ assert.ok(
+ capturedUrls[1].includes('offset=-1'),
+ 'Retry URL should start from offset -1'
+ );
+
+ await replicationState.cancel();
+ await collection.database.close();
+ });
+ it('should pass custom headers to fetch', async () => {
+ const collection = await humansCollection.createHumanWithTimestamp(0, undefined, false);
+
+ let capturedHeaders: HeadersInit | undefined;
+ function mockFetch(_url: string | URL | Request, init?: RequestInit): Promise {
+ capturedHeaders = init?.headers;
+ return Promise.resolve(createMockResponse(
+ [makeUpToDateMessage()],
+ '0_0',
+ 'handle-1'
+ ));
+ }
+
+ const replicationState = replicateElectricSQL({
+ replicationIdentifier: 'test-headers-' + randomToken(10),
+ collection,
+ url: 'http://localhost:3000/v1/shape',
+ params: { table: 'humans' },
+ headers: { 'Authorization': 'Bearer test-token' },
+ live: false,
+ pull: { batchSize },
+ fetch: mockFetch as typeof fetch,
+ waitForLeadership: false
+ });
+ ensureReplicationHasNoErrors(replicationState);
+ await replicationState.awaitInitialReplication();
+
+ assert.ok(capturedHeaders);
+ assert.strictEqual((capturedHeaders as any)['Authorization'], 'Bearer test-token');
+
+ await replicationState.cancel();
+ await collection.database.close();
+ });
+ });
+
+ describe('push', () => {
+ it('should call the push handler with document changes', async () => {
+ const collection = await humansCollection.createHumanWithTimestamp(0, undefined, false);
+
+ const pushRows: any[] = [];
+ function mockFetch(): Promise {
+ return Promise.resolve(createMockResponse(
+ [makeUpToDateMessage()],
+ '0_0',
+ 'handle-1'
+ ));
+ }
+
+ const replicationState = replicateElectricSQL({
+ replicationIdentifier: 'test-push-' + randomToken(10),
+ collection,
+ url: 'http://localhost:3000/v1/shape',
+ params: { table: 'humans' },
+ live: false,
+ pull: { batchSize },
+ push: {
+ batchSize,
+ handler(rows) {
+ pushRows.push(...rows);
+ return Promise.resolve([]);
+ }
+ },
+ fetch: mockFetch as typeof fetch,
+ waitForLeadership: false
+ });
+ ensureReplicationHasNoErrors(replicationState);
+
+ // insert a document
+ const testDoc = schemaObjects.humanWithTimestampData();
+ await collection.insert(testDoc);
+ await replicationState.awaitInSync();
+
+ assert.ok(pushRows.length > 0, 'Push handler should have been called');
+ const pushedDoc = pushRows[0].newDocumentState;
+ assert.strictEqual(pushedDoc.id, testDoc.id);
+ assert.strictEqual(pushedDoc.name, testDoc.name);
+
+ await replicationState.cancel();
+ await collection.database.close();
+ });
+ });
+
+ describe('live replication', () => {
+ it('should receive live updates through polling', async () => {
+ const collection = await humansCollection.createHumanWithTimestamp(0, undefined, false);
+
+ const doc1 = schemaObjects.humanWithTimestampData();
+ const doc2 = schemaObjects.humanWithTimestampData();
+
+ let fetchCallCount = 0;
+ async function mockFetch(url: string | URL | Request): Promise {
+ fetchCallCount++;
+ const urlStr = typeof url === 'string' ? url : url instanceof URL ? url.toString() : url.url;
+
+ if (!urlStr.includes('live=true')) {
+ if (fetchCallCount === 1) {
+ return createMockResponse(
+ [makeInsertMessage(doc1), makeUpToDateMessage()],
+ '0_1',
+ 'handle-1'
+ );
+ }
+ return createMockResponse(
+ [makeUpToDateMessage()],
+ '0_1',
+ 'handle-1'
+ );
+ } else {
+ if (fetchCallCount <= 4) {
+ return createMockResponse(
+ [makeInsertMessage(doc2), makeUpToDateMessage()],
+ '0_2',
+ 'handle-1'
+ );
+ }
+ await wait(200);
+ return createMockResponse(
+ [makeUpToDateMessage()],
+ '0_2',
+ 'handle-1'
+ );
+ }
+ }
+
+ const replicationState = replicateElectricSQL({
+ replicationIdentifier: 'test-live-' + randomToken(10),
+ collection,
+ url: 'http://localhost:3000/v1/shape',
+ params: { table: 'humans' },
+ live: true,
+ pull: { batchSize },
+ fetch: mockFetch as typeof fetch,
+ waitForLeadership: false,
+ autoStart: true
+ });
+ ensureReplicationHasNoErrors(replicationState);
+ await replicationState.awaitInitialReplication();
+
+ let docsInDb = await collection.find().exec();
+ assert.strictEqual(docsInDb.length, 1);
+ assert.strictEqual(docsInDb[0].primary, doc1.id);
+
+ await waitUntil(async () => {
+ const docs = await collection.find().exec();
+ return docs.length >= 2;
+ }, 5000);
+
+ docsInDb = await collection.find().exec();
+ assert.strictEqual(docsInDb.length, 2);
+ const doc2InDb = docsInDb.find(d => d.primary === doc2.id);
+ assert.ok(doc2InDb, 'doc2 should arrive via live polling');
+
+ await replicationState.cancel();
+ await collection.database.close();
+ });
+ });
+
+ describe('error handling', () => {
+ it('should emit error on HTTP error during pull', async () => {
+ const collection = await humansCollection.createHumanWithTimestamp(0, undefined, false);
+ const errors: any[] = [];
+
+ let fetchCallCount = 0;
+ function mockFetch(): Promise {
+ fetchCallCount++;
+ if (fetchCallCount <= 2) {
+ return Promise.resolve(new Response('Server Error', { status: 500 }));
+ }
+ return Promise.resolve(createMockResponse(
+ [makeUpToDateMessage()],
+ '0_0',
+ 'handle-1'
+ ));
+ }
+
+ const replicationState = replicateElectricSQL({
+ replicationIdentifier: 'test-error-' + randomToken(10),
+ collection,
+ url: 'http://localhost:3000/v1/shape',
+ params: { table: 'humans' },
+ live: false,
+ pull: { batchSize },
+ fetch: mockFetch as typeof fetch,
+ waitForLeadership: false,
+ retryTime: 100
+ });
+ replicationState.error$.subscribe(err => errors.push(err));
+ await replicationState.awaitInitialReplication();
+
+ assert.ok(errors.length > 0, 'Should have received at least one error');
+
+ await replicationState.cancel();
+ await collection.database.close();
+ });
+ });
+
+ describe('RxElectricSQLReplicationState', () => {
+ it('should return an instance of RxElectricSQLReplicationState', async () => {
+ const collection = await humansCollection.createHumanWithTimestamp(0, undefined, false);
+
+ function mockFetch(): Promise {
+ return Promise.resolve(createMockResponse(
+ [makeUpToDateMessage()],
+ '0_0',
+ 'handle-1'
+ ));
+ }
+
+ const replicationState = replicateElectricSQL({
+ replicationIdentifier: 'test-instance-' + randomToken(10),
+ collection,
+ url: 'http://localhost:3000/v1/shape',
+ params: { table: 'humans' },
+ live: false,
+ pull: { batchSize },
+ fetch: mockFetch as typeof fetch,
+ waitForLeadership: false
+ });
+ assert.ok(replicationState instanceof RxElectricSQLReplicationState);
+
+ await replicationState.cancel();
+ await collection.database.close();
+ });
+ });
+ });
+});