Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -961,6 +961,7 @@ jobs:

- run: npm run test:replication-google-drive
- run: npm run test:replication-microsoft-onedrive
- run: npm run test:replication-electric-sql



Expand Down
203 changes: 203 additions & 0 deletions docs-src/docs/replication-electric-sql.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,203 @@
---
title: Electric-SQL Replication Plugin for RxDB
slug: replication-electric-sql.html
description: Sync data from PostgreSQL to RxDB using Electric-SQL. Stream shapes from your database to client-side collections with real-time updates.
---

import {Steps} from '@site/src/components/steps';

# Electric-SQL Replication Plugin for RxDB

<p align="center">
<img src="./files/icons/electric-sql.svg" alt="Electric-SQL" height="60" className="img-padding img-in-text-right" />
</p>

The **Electric-SQL Replication Plugin** for RxDB synchronizes data from a PostgreSQL database to RxDB collections using [Electric-SQL](https://electric-sql.com/) (also known as Electric). Electric provides a read-path sync layer that streams "shapes" (subsets of your PostgreSQL data) to client applications in real-time via HTTP.

Under the hood, this plugin is powered by the RxDB [Sync Engine](./replication.md). It uses Electric's HTTP API for incremental pulls with offset-based checkpointing and Electric's live mode for real-time change detection.

## Key Features

- **PostgreSQL to client sync** using Electric's shape-based streaming
- **Incremental pull** with offset-based checkpoint tracking
- **Real-time updates** via Electric's live long-polling mode
- **Offline-first** with RxDB's built-in retry and conflict handling
- **No external client dependency** required: communicates with Electric over plain HTTP
- **Flexible push** via user-provided handlers for writing back to PostgreSQL

## How It Works

Electric-SQL provides a read-path sync layer between PostgreSQL and clients. Data flows as:

1. **PostgreSQL** stores the source of truth
2. **Electric** watches for changes and provides an HTTP shape API
3. **RxDB** pulls shapes from Electric and stores them locally

For writes, Electric does not include a built-in write path. You must provide your own backend API (REST, GraphQL, etc.) that writes to PostgreSQL. Once the write is committed to PostgreSQL, Electric syncs the change to all connected clients.

## Setting Up RxDB with Electric-SQL

<Steps>

### Prerequisites

You need a running Electric-SQL service connected to your PostgreSQL database. See the [Electric-SQL documentation](https://electric-sql.com/docs) for setup instructions.

### Install RxDB

```bash
npm install rxdb
```

### Create an RxDB Database and Collection

Create an RxDB database and add a collection whose schema matches your PostgreSQL table structure. The primary key must match the table's primary key column.

```ts
import { createRxDatabase } from 'rxdb/plugins/core';
import { getRxStorageMemory } from 'rxdb/plugins/storage-memory';

const db = await createRxDatabase({
name: 'mydb',
storage: getRxStorageMemory()
});

await db.addCollections({
items: {
schema: {
version: 0,
primaryKey: 'id',
type: 'object',
properties: {
id: { type: 'string', maxLength: 100 },
name: { type: 'string' },
value: { type: 'number' }
},
required: ['id', 'name']
}
}
});
```

### Start Replication

Connect your RxDB collection to Electric-SQL. The `url` should point to your Electric shape endpoint, and `params.table` specifies the PostgreSQL table to sync.

```ts
import { replicateElectricSQL } from 'rxdb/plugins/replication-electric-sql';

const replicationState = replicateElectricSQL({
collection: db.items,
replicationIdentifier: 'items-electric',
url: 'http://localhost:3000/v1/shape',
params: {
table: 'items'
},
live: true,
pull: {
batchSize: 100
}
});

replicationState.error$.subscribe(err => console.error('[replication]', err));
await replicationState.awaitInitialReplication();
```

### Add a Push Handler (Optional)

Since Electric-SQL only provides a read path, you need to supply your own push handler to write changes back to your PostgreSQL database through your backend API.

```ts
import { replicateElectricSQL } from 'rxdb/plugins/replication-electric-sql';

const replicationState = replicateElectricSQL({
collection: db.items,
replicationIdentifier: 'items-electric',
url: 'http://localhost:3000/v1/shape',
params: {
table: 'items'
},
live: true,
pull: {
batchSize: 100
},
push: {
async handler(rows) {
const conflicts = [];
for (const row of rows) {
const response = await fetch('https://your-backend.com/api/items', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify(row.newDocumentState)
});
if (!response.ok) {
// handle conflict: fetch current server state and add to conflicts
}
}
return conflicts;
},
batchSize: 10
}
});
```

</Steps>


## Configuration Options

### Shape Parameters

Use the `params` object to configure the Electric shape. You can filter rows with a `where` clause or select specific columns:

```ts
const replicationState = replicateElectricSQL({
collection: db.items,
replicationIdentifier: 'items-electric',
url: 'http://localhost:3000/v1/shape',
params: {
table: 'items',
where: 'status=\'active\'',
columns: 'id,name,value',
replica: 'full'
},
pull: {}
});
```

### Custom Headers

Pass custom HTTP headers for authentication or other purposes:

```ts
const replicationState = replicateElectricSQL({
// ...
headers: {
'Authorization': 'Bearer your-token'
},
pull: {}
});
```

### Custom Fetch Function

Provide a custom fetch implementation, for example when using a custom HTTP client or when running in an environment without a global `fetch`:

```ts
const replicationState = replicateElectricSQL({
// ...
fetch: myCustomFetch,
pull: {}
});
```

## Handling Deletes

Electric-SQL reports physical deletes from PostgreSQL. The plugin maps Electric's `delete` operations to RxDB's soft-delete mechanism by setting `_deleted: true` on the document. If your PostgreSQL table uses soft deletes (a boolean column), those are synced as regular field updates.

## Follow Up

- **Replication API Reference:** Learn the core concepts and lifecycle hooks: [Replication](./replication.md)
- **Electric-SQL Documentation:** [electric-sql.com/docs](https://electric-sql.com/docs)
- **Offline-First Guide:** Caching, retries, and conflict strategies: [Local-First](./articles/local-first-future.md)
- **Community:** Questions or feedback? Join our Discord: [Chat](./chat)
5 changes: 5 additions & 0 deletions docs-src/sidebars.js
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,11 @@ const sidebars = {
id: 'replication-nats',
label: 'NATS Replication'
},
{
type: 'doc',
id: 'replication-electric-sql',
label: 'Electric-SQL Replication'
},
{
type: 'doc',
id: 'replication-appwrite',
Expand Down
4 changes: 4 additions & 0 deletions docs-src/static/files/icons/electric-sql.svg
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
7 changes: 7 additions & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -330,6 +330,12 @@
"import": "./dist/esm/plugins/replication-couchdb/index.js",
"default": "./dist/esm/plugins/replication-couchdb/index.js"
},
"./plugins/replication-electric-sql": {
"types": "./dist/types/plugins/replication-electric-sql/index.d.ts",
"require": "./dist/cjs/plugins/replication-electric-sql/index.js",
"import": "./dist/esm/plugins/replication-electric-sql/index.js",
"default": "./dist/esm/plugins/replication-electric-sql/index.js"
},
"./plugins/replication-nats": {
"types": "./dist/types/plugins/replication-nats/index.d.ts",
"require": "./dist/cjs/plugins/replication-nats/index.js",
Expand Down Expand Up @@ -468,6 +474,7 @@
"test:replication-supabase": "npm run transpile && cross-env DEFAULT_STORAGE=dexie mocha --config ./config/.mocharc.cjs ./test_tmp/replication-supabase.test.js",
"test:replication-google-drive": "npm run transpile && cross-env DEFAULT_STORAGE=dexie tsx ./node_modules/mocha/bin/mocha --config ./config/.mocharc.cjs ./test/replication-google-drive.test.ts",
"test:replication-microsoft-onedrive": "npm run transpile && cross-env DEFAULT_STORAGE=dexie tsx ./node_modules/mocha/bin/mocha --config ./config/.mocharc.cjs ./test/replication-microsoft-onedrive.test.ts",
"test:replication-electric-sql": "npm run transpile && cross-env DEFAULT_STORAGE=memory mocha --config ./config/.mocharc.cjs ./test_tmp/replication-electric-sql.test.js",
"test:core": "npm run transpile && mocha ./test_tmp/unit/core.node.js",
"test:full": "npm run transpile && mocha ./test_tmp/unit/full.node.js",
"test:typings": "npm run build:plugins && tsc --allowImportingTsExtensions --noEmit --skipLibCheck --lib \"ES2022,DOM\" ./test/typings.test.ts",
Expand Down
2 changes: 2 additions & 0 deletions plugins/replication-electric-sql/index.cjs
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
const pkg = require('../../dist/cjs/plugins/replication-electric-sql/index.js');
module.exports = pkg;
1 change: 1 addition & 0 deletions plugins/replication-electric-sql/index.d.cts
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
export * from '../../dist/types/plugins/replication-electric-sql/index';
1 change: 1 addition & 0 deletions plugins/replication-electric-sql/index.d.mts
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
export * from '../../dist/types/plugins/replication-electric-sql/index';
1 change: 1 addition & 0 deletions plugins/replication-electric-sql/index.mjs
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
export * from '../../dist/esm/plugins/replication-electric-sql/index.js';
1 change: 1 addition & 0 deletions plugins/replication-electric-sql/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
export * from '../../dist/types/plugins/replication-electric-sql/index';
18 changes: 18 additions & 0 deletions plugins/replication-electric-sql/package.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
{
"name": "rxdb-plugins-replication-electric-sql",
"description": "This package.json file is generated by the \"npm run build:plugins\" script, do not edit it manually!",
"sideEffects": false,
"types": "../../dist/types/plugins/replication-electric-sql/index.d.ts",
"exports": {
".": {
"default": {
"types": "./index.d.ts",
"import": "./index.mjs",
"default": "./index.cjs"
}
},
"./package.json": "./package.json"
},
"main": "./index.cjs",
"module": "./index.mjs"
}
73 changes: 73 additions & 0 deletions src/plugins/replication-electric-sql/electric-sql-helper.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
import type { WithDeleted } from '../../types/index.d.ts';
import type { ElectricSQLShapeParams } from './electric-sql-types.ts';

export type ElectricSQLMessage<RxDocType> = {
offset?: string;
key?: string;
value?: Record<string, any>;
headers: {
operation?: 'insert' | 'update' | 'delete';
control?: 'up-to-date' | 'must-refetch';
};
};

/**
* Builds the URL for an Electric shape request.
*/
export function buildElectricUrl(
baseUrl: string,
params: ElectricSQLShapeParams,
offset: string,
handle?: string,
live?: boolean
): string {
const urlObj = new URL(baseUrl);

for (const [key, value] of Object.entries(params)) {
if (value !== undefined) {
urlObj.searchParams.set(key, value);
}
}

urlObj.searchParams.set('offset', offset);

if (handle) {
urlObj.searchParams.set('handle', handle);
}

if (live) {
urlObj.searchParams.set('live', 'true');
}

return urlObj.toString();
}

/**
* Converts an Electric-SQL change message to an RxDB document.
* Returns null for control messages.
*/
export function electricMessageToRxDBDocData<RxDocType>(
message: ElectricSQLMessage<RxDocType>,
primaryPath: string
): WithDeleted<RxDocType> | null {
if (!message.headers.operation || !message.value) {
return null;
}

const doc: any = { ...message.value };

if (message.headers.operation === 'delete') {
doc._deleted = true;
} else {
doc._deleted = false;
}

return doc as WithDeleted<RxDocType>;
}

/**
* Checks if a list of Electric-SQL messages contains a 'must-refetch' control message.
*/
export function hasMustRefetch<RxDocType>(messages: ElectricSQLMessage<RxDocType>[]): boolean {
return messages.some(m => m.headers?.control === 'must-refetch');
}
Loading
Loading