Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs-src/docs/releases/17.0.0.md
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ To improve vibe-coding when working with RxDB directly we:
- **ADD** [Google Drive Replication](../replication-google-drive.md) plugin to replicate client data to a clients Google Drive folder without any server.
- **ADD** [Microsoft OneDrive Replication](../replication-microsoft-onedrive.md) plugin to replicate client data to a clients OneDrive folder without any server.
- **ADD** [react-hooks plugin](../react.md).
- **ADD** [Flutter Plugin](../articles/flutter-database.md) improvements: `findOne()`, `bulkInsert()`, `bulkRemove()`, `upsert()`, `count()`, `patch()`, `incrementalPatch()`, `incrementalRemove()`, `RxDatabase.close()`, and multi-database support.

### 🔁 Reactivity & APIs

Expand Down
206 changes: 192 additions & 14 deletions src/plugins/flutter/dart/lib/src/rxdb_base.dart
Original file line number Diff line number Diff line change
Expand Up @@ -125,12 +125,14 @@ Future<RxDatabase> getRxDatabase(String jsFilePath, String databaseName) async {

class RxChangeEvent<RxDocType> {
String operation;
dynamic documentData;
dynamic previousDocumentData;
String documentId;
String? collectionName;
bool isLocal;
RxChangeEvent(
this.operation,
this.documentData,
this.previousDocumentData,
this.documentId,
this.collectionName,
Expand All @@ -140,6 +142,7 @@ class RxChangeEvent<RxDocType> {
static RxChangeEvent<RxDocType> fromJSON<RxDocType>(dynamic json) {
RxChangeEvent<RxDocType> ret = RxChangeEvent<RxDocType>(
json['operation'],
json['documentData'],
json['previousDocumentData'],
json['documentId'],
json['collectionName'],
Expand All @@ -159,6 +162,8 @@ class RxChangeEventBulk<RxDocType> {
List<RxChangeEvent<RxDocType>> events;
dynamic checkpoint;
String context;
int startTime;
int endTime;

RxChangeEventBulk(
this.collectionName,
Expand All @@ -168,7 +173,9 @@ class RxChangeEventBulk<RxDocType> {
this.internal,
this.events,
this.checkpoint,
this.context);
this.context,
this.startTime,
this.endTime);

static RxChangeEventBulk<RxDocType> fromJSON<RxDocType>(dynamic json) {
List<dynamic> eventsJson = json['events'];
Expand All @@ -185,7 +192,9 @@ class RxChangeEventBulk<RxDocType> {
json['internal'],
events,
json['checkpoint'],
json['context']);
json['context'],
json['startTime'] ?? 0,
json['endTime'] ?? 0);
return ret;
}
}
Expand All @@ -196,8 +205,12 @@ class RxDatabase<CollectionsOfDatabase> {
List<dynamic> collectionMeta;
Map<String, RxCollection<dynamic>> collections = {};
ReplaySubject<RxChangeEventBulk<dynamic>> eventBulks$;
bool closed = false;
RxDatabase(this.name, this.engine, this.eventBulks$, this.collectionMeta);

String get _jsDbRef =>
"process.databases[" + jsonEncode(name) + "].db";

RxCollection<RxDocType> getCollection<RxDocType>(String name) {
var meta = collectionMeta.firstWhere((meta) => meta['name'] == name);
String collectionName = meta['name'];
Expand All @@ -214,6 +227,14 @@ class RxDatabase<CollectionsOfDatabase> {
return collections[name] as RxCollection<RxDocType>;
}
}

Future<void> close() async {
if (closed) return;
await engine.evaluate('process.close(' + jsonEncode(name) + ');');
closed = true;
await eventBulks$.close();
engine.close();
}
}

class RxCollection<RxDocType> {
Expand All @@ -229,34 +250,142 @@ class RxCollection<RxDocType> {
docCache = DocCache<RxDocType>(this);
}

String get _jsCollRef =>
database._jsDbRef + "[" + jsonEncode(name) + "]";

RxQuery<RxDocType> find(dynamic query) {
var rxQuery = RxQuery<RxDocType>(query, this);
return rxQuery;
}

RxQuerySingle<RxDocType> findOne([dynamic queryOrPrimaryKey]) {
var rxQuery = RxQuerySingle<RxDocType>(queryOrPrimaryKey, this);
return rxQuery;
}

Future<RxDocument<RxDocType>> insert(data) async {
dynamic result = await database.engine.evaluate("process.db['" +
name +
"'].insert(" +
dynamic result = await database.engine.evaluate(_jsCollRef +
".insert(" +
jsonEncode(data) +
").then(d => d.toJSON(true));");
var document = docCache.getByDocData(result);
return document;
}

Future<List<RxDocument<RxDocType>>> bulkInsert(List<dynamic> docs) async {
List<dynamic> result = await database.engine.evaluate(_jsCollRef +
".bulkInsert(" +
jsonEncode(docs) +
").then(r => r.success.map(d => d.toJSON(true)));");
return result.map((docData) {
return docCache.getByDocData(docData);
}).toList();
}

Future<List<RxDocument<RxDocType>>> bulkRemove(List<String> ids) async {
List<dynamic> result = await database.engine.evaluate(_jsCollRef +
".bulkRemove(" +
jsonEncode(ids) +
").then(r => r.success.map(d => d.toJSON(true)));");
return result.map((docData) {
return docCache.getByDocData(docData);
}).toList();
}

Future<RxDocument<RxDocType>> upsert(dynamic data) async {
dynamic result = await database.engine.evaluate(_jsCollRef +
".upsert(" +
jsonEncode(data) +
").then(d => d.toJSON(true));");
var document = docCache.getByDocData(result);
return document;
}

Future<int> count([dynamic query]) async {
String queryStr = query != null ? jsonEncode(query) : '{}';
dynamic result = await database.engine.evaluate(_jsCollRef +
".count(" +
queryStr +
").exec();");
return (result as num).toInt();
}

Future<void> remove() async {
await database.engine.evaluate(_jsCollRef +
".remove();");
}
}

class RxDocument<RxDocType> {
RxCollection<RxDocType> collection;
dynamic data;
RxDocument(this.collection, this.data);

String get primary => data[collection.primaryKey].toString();

bool get deleted => data['_deleted'] == true;

Map<String, dynamic> toJSON() {
return Map<String, dynamic>.from(data);
}

dynamic get(String fieldName) {
return data[fieldName];
}

/// Sets the value of a field in the local document data.
/// This does not persist the change to the database.
/// Use [patch] or [incrementalPatch] to persist changes.
void set(String fieldName, dynamic value) {
data[fieldName] = value;
}

Future<RxDocument<RxDocType>> patch(Map<String, dynamic> patchData) async {
String id = primary;
dynamic result = await collection.database.engine.evaluate(
collection._jsCollRef +
".findOne(" +
jsonEncode(id) +
").exec().then(d => d.patch(" +
jsonEncode(patchData) +
")).then(d => d.toJSON(true));");
data = result;
collection.docCache.updateDocData(id, data);
return this;
}

Future<RxDocument<RxDocType>> incrementalPatch(
Map<String, dynamic> patchData) async {
String id = primary;
dynamic result = await collection.database.engine.evaluate(
collection._jsCollRef +
".findOne(" +
jsonEncode(id) +
").exec().then(d => d.incrementalPatch(" +
jsonEncode(patchData) +
")).then(d => d.toJSON(true));");
data = result;
collection.docCache.updateDocData(id, data);
return this;
}

Future<RxDocument<RxDocType>> remove() async {
String id = data[collection.primaryKey];
await collection.database.engine.evaluate("process.db['" +
collection.name +
"'].findOne('" +
id +
"').exec().then(d => d.remove());");
String id = primary;
await collection.database.engine.evaluate(
collection._jsCollRef +
".findOne(" +
jsonEncode(id) +
").exec().then(d => d.remove());");
return this;
}

Future<RxDocument<RxDocType>> incrementalRemove() async {
String id = primary;
await collection.database.engine.evaluate(
collection._jsCollRef +
".findOne(" +
jsonEncode(id) +
").exec().then(d => d.incrementalRemove());");
return this;
}
}
Expand All @@ -271,9 +400,8 @@ class RxQuery<RxDocType> {
RxQuery(this.query, this.collection);
Future<List<RxDocument<RxDocType>>> exec() async {
List<dynamic> result = await collection.database.engine.evaluate(
"process.db['" +
collection.name +
"'].find(" +
collection._jsCollRef +
".find(" +
jsonEncode(query) +
").exec().then(docs => docs.map(d => d.toJSON(true)));");

Expand All @@ -300,6 +428,48 @@ class RxQuery<RxDocType> {
}
}

class RxQuerySingle<RxDocType> {
dynamic queryOrPrimaryKey;
RxCollection<RxDocType> collection;

Stream<RxDocument<RxDocType>?> results$ = ReplaySubject();
bool subscribed = false;

RxQuerySingle(this.queryOrPrimaryKey, this.collection);

Future<RxDocument<RxDocType>?> exec() async {
String queryArg;
if (queryOrPrimaryKey == null) {
queryArg = '';
} else {
queryArg = jsonEncode(queryOrPrimaryKey);
}
dynamic result = await collection.database.engine.evaluate(
collection._jsCollRef +
".findOne(" +
queryArg +
").exec().then(d => d ? d.toJSON(true) : null);");
if (result == null) {
return null;
}
return collection.docCache.getByDocData(result);
}

Stream<RxDocument<RxDocType>?> $() {
if (subscribed == false) {
subscribed = true;
results$ = MergeStream<dynamic>([
collection.eventBulks$,
Stream.fromIterable([1])
]).asyncMap((event) async {
var newResult = await exec();
return newResult;
});
}
return results$;
}
}

class DocCache<RxDocType> {
RxCollection<RxDocType> collection;
Map<String, RxDocument<RxDocType>> map = {};
Expand All @@ -310,13 +480,21 @@ class DocCache<RxDocType> {
String id = data[collection.primaryKey];
var docInCache = map[id];
if (docInCache != null) {
docInCache.data = data;
return docInCache;
} else {
var doc = RxDocument<RxDocType>(collection, data);
map[id] = doc;
return doc;
}
}

void updateDocData(String id, dynamic data) {
var docInCache = map[id];
if (docInCache != null) {
docInCache.data = data;
}
}
}

abstract class RxDocTypeParent<RxDocType> {
Expand Down
15 changes: 13 additions & 2 deletions src/plugins/flutter/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,17 @@ export type CreateRxDatabaseFunctionType = (databaseName: string) => Promise<RxD
export function setFlutterRxDatabaseConnector(
createDB: CreateRxDatabaseFunctionType
) {
if (!(process as any).databases) {
(process as any).databases = {};
}
(process as any).init = async (databaseName: string) => {
const db = await createDB(databaseName);
db.eventBulks$.subscribe((eventBulk: RxChangeEventBulk<any>) => {
const eventSub = db.eventBulks$.subscribe((eventBulk: RxChangeEventBulk<any>) => {
// eslint-disable-next-line @typescript-eslint/ban-ts-comment
// @ts-ignore
sendRxDBEvent(JSON.stringify(eventBulk));
});
(process as any).db = db;
(process as any).databases[databaseName] = { db, eventSub };
const collections: { name: string; primaryKey: string; }[] = [];
Object.entries(db.collections).forEach(([collectionName, collection]) => {
collections.push({
Expand All @@ -28,6 +31,14 @@ export function setFlutterRxDatabaseConnector(
collections
};
};
(process as any).close = async (databaseName: string) => {
const entry = (process as any).databases[databaseName];
if (entry) {
entry.eventSub.unsubscribe();
await entry.db.close();
delete (process as any).databases[databaseName];
}
};
}

/**
Expand Down
1 change: 1 addition & 0 deletions test/unit.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -69,5 +69,6 @@ import './unit/leader-election.test.ts';
import './unit/backup.test.ts';
import './unit/import-export.test.ts';
import './unit/database-lifecycle.ts';
import './unit/flutter.test.ts';
import './unit/plugin.test.ts';
import './unit/last.test.ts';
Loading
Loading