Skip to content
Draft
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
207 changes: 193 additions & 14 deletions src/plugins/flutter/dart/lib/src/rxdb_base.dart
Original file line number Diff line number Diff line change
Expand Up @@ -125,12 +125,14 @@ Future<RxDatabase> getRxDatabase(String jsFilePath, String databaseName) async {

class RxChangeEvent<RxDocType> {
String operation;
dynamic documentData;
dynamic previousDocumentData;
String documentId;
String? collectionName;
bool isLocal;
RxChangeEvent(
this.operation,
this.documentData,
this.previousDocumentData,
this.documentId,
this.collectionName,
Expand All @@ -140,6 +142,7 @@ class RxChangeEvent<RxDocType> {
static RxChangeEvent<RxDocType> fromJSON<RxDocType>(dynamic json) {
RxChangeEvent<RxDocType> ret = RxChangeEvent<RxDocType>(
json['operation'],
json['documentData'],
json['previousDocumentData'],
json['documentId'],
json['collectionName'],
Expand All @@ -159,6 +162,8 @@ class RxChangeEventBulk<RxDocType> {
List<RxChangeEvent<RxDocType>> events;
dynamic checkpoint;
String context;
int startTime;
int endTime;

RxChangeEventBulk(
this.collectionName,
Expand All @@ -168,7 +173,9 @@ class RxChangeEventBulk<RxDocType> {
this.internal,
this.events,
this.checkpoint,
this.context);
this.context,
this.startTime,
this.endTime);

static RxChangeEventBulk<RxDocType> fromJSON<RxDocType>(dynamic json) {
List<dynamic> eventsJson = json['events'];
Expand All @@ -185,7 +192,9 @@ class RxChangeEventBulk<RxDocType> {
json['internal'],
events,
json['checkpoint'],
json['context']);
json['context'],
json['startTime'] ?? 0,
json['endTime'] ?? 0);
return ret;
}
}
Expand All @@ -196,6 +205,7 @@ class RxDatabase<CollectionsOfDatabase> {
List<dynamic> collectionMeta;
Map<String, RxCollection<dynamic>> collections = {};
ReplaySubject<RxChangeEventBulk<dynamic>> eventBulks$;
bool closed = false;
RxDatabase(this.name, this.engine, this.eventBulks$, this.collectionMeta);

RxCollection<RxDocType> getCollection<RxDocType>(String name) {
Expand All @@ -214,6 +224,14 @@ class RxDatabase<CollectionsOfDatabase> {
return collections[name] as RxCollection<RxDocType>;
}
}

Future<void> close() async {
if (closed) return;
await engine.evaluate('process.close();');
closed = true;
await eventBulks$.close();
engine.close();
}
}

class RxCollection<RxDocType> {
Expand All @@ -234,29 +252,140 @@ class RxCollection<RxDocType> {
return rxQuery;
}

RxQuerySingle<RxDocType> findOne([dynamic queryOrPrimaryKey]) {
var rxQuery = RxQuerySingle<RxDocType>(queryOrPrimaryKey, this);
return rxQuery;
}

Future<RxDocument<RxDocType>> insert(data) async {
dynamic result = await database.engine.evaluate("process.db['" +
name +
"'].insert(" +
dynamic result = await database.engine.evaluate("process.db[" +
jsonEncode(name) +
"].insert(" +
jsonEncode(data) +
").then(d => d.toJSON(true));");
var document = docCache.getByDocData(result);
return document;
}

Future<List<RxDocument<RxDocType>>> bulkInsert(List<dynamic> docs) async {
List<dynamic> result = await database.engine.evaluate("process.db[" +
jsonEncode(name) +
"].bulkInsert(" +
jsonEncode(docs) +
").then(r => r.success.map(d => d.toJSON(true)));");
return result.map((docData) {
return docCache.getByDocData(docData);
}).toList();
}

Future<List<RxDocument<RxDocType>>> bulkRemove(List<String> ids) async {
List<dynamic> result = await database.engine.evaluate("process.db[" +
jsonEncode(name) +
"].bulkRemove(" +
jsonEncode(ids) +
").then(r => r.success.map(d => d.toJSON(true)));");
return result.map((docData) {
return docCache.getByDocData(docData);
}).toList();
}

Future<RxDocument<RxDocType>> upsert(dynamic data) async {
dynamic result = await database.engine.evaluate("process.db[" +
jsonEncode(name) +
"].upsert(" +
jsonEncode(data) +
").then(d => d.toJSON(true));");
var document = docCache.getByDocData(result);
return document;
}

Future<int> count([dynamic query]) async {
String queryStr = query != null ? jsonEncode(query) : '{}';
dynamic result = await database.engine.evaluate("process.db[" +
jsonEncode(name) +
"].count(" +
queryStr +
").exec();");
return (result as num).toInt();
}

Future<void> remove() async {
await database.engine.evaluate("process.db[" +
jsonEncode(name) +
"].remove();");
}
}

class RxDocument<RxDocType> {
RxCollection<RxDocType> collection;
dynamic data;
RxDocument(this.collection, this.data);

String get primary => data[collection.primaryKey].toString();

bool get deleted => data['_deleted'] == true;

Map<String, dynamic> toJSON() {
return Map<String, dynamic>.from(data);
}

dynamic get(String fieldName) {
return data[fieldName];
}

/// Sets the value of a field in the local document data.
/// This does not persist the change to the database.
/// Use [patch] or [incrementalPatch] to persist changes.
void set(String fieldName, dynamic value) {
data[fieldName] = value;
}

Future<RxDocument<RxDocType>> patch(Map<String, dynamic> patchData) async {
String id = primary;
dynamic result = await collection.database.engine.evaluate("process.db[" +
jsonEncode(collection.name) +
"].findOne(" +
jsonEncode(id) +
").exec().then(d => d.patch(" +
jsonEncode(patchData) +
")).then(d => d.toJSON(true));");
data = result;
collection.docCache.updateDocData(id, data);
return this;
}

Future<RxDocument<RxDocType>> incrementalPatch(
Map<String, dynamic> patchData) async {
String id = primary;
dynamic result = await collection.database.engine.evaluate("process.db[" +
jsonEncode(collection.name) +
"].findOne(" +
jsonEncode(id) +
").exec().then(d => d.incrementalPatch(" +
jsonEncode(patchData) +
")).then(d => d.toJSON(true));");
data = result;
collection.docCache.updateDocData(id, data);
return this;
}

Future<RxDocument<RxDocType>> remove() async {
String id = data[collection.primaryKey];
await collection.database.engine.evaluate("process.db['" +
collection.name +
"'].findOne('" +
id +
"').exec().then(d => d.remove());");
String id = primary;
await collection.database.engine.evaluate("process.db[" +
jsonEncode(collection.name) +
"].findOne(" +
jsonEncode(id) +
").exec().then(d => d.remove());");
return this;
}

Future<RxDocument<RxDocType>> incrementalRemove() async {
String id = primary;
await collection.database.engine.evaluate("process.db[" +
jsonEncode(collection.name) +
"].findOne(" +
jsonEncode(id) +
").exec().then(d => d.incrementalRemove());");
return this;
}
}
Expand All @@ -271,9 +400,9 @@ class RxQuery<RxDocType> {
RxQuery(this.query, this.collection);
Future<List<RxDocument<RxDocType>>> exec() async {
List<dynamic> result = await collection.database.engine.evaluate(
"process.db['" +
collection.name +
"'].find(" +
"process.db[" +
jsonEncode(collection.name) +
"].find(" +
jsonEncode(query) +
").exec().then(docs => docs.map(d => d.toJSON(true)));");

Expand All @@ -300,6 +429,48 @@ class RxQuery<RxDocType> {
}
}

class RxQuerySingle<RxDocType> {
dynamic queryOrPrimaryKey;
RxCollection<RxDocType> collection;

Stream<RxDocument<RxDocType>?> results$ = ReplaySubject();
bool subscribed = false;

RxQuerySingle(this.queryOrPrimaryKey, this.collection);

Future<RxDocument<RxDocType>?> exec() async {
String queryArg;
if (queryOrPrimaryKey == null) {
queryArg = '';
} else {
queryArg = jsonEncode(queryOrPrimaryKey);
}
dynamic result = await collection.database.engine.evaluate("process.db[" +
jsonEncode(collection.name) +
"].findOne(" +
queryArg +
").exec().then(d => d ? d.toJSON(true) : null);");
if (result == null) {
return null;
}
return collection.docCache.getByDocData(result);
}

Stream<RxDocument<RxDocType>?> $() {
if (subscribed == false) {
subscribed = true;
results$ = MergeStream<dynamic>([
collection.eventBulks$,
Stream.fromIterable([1])
]).asyncMap((event) async {
var newResult = await exec();
return newResult;
});
}
return results$;
}
}

class DocCache<RxDocType> {
RxCollection<RxDocType> collection;
Map<String, RxDocument<RxDocType>> map = {};
Expand All @@ -310,13 +481,21 @@ class DocCache<RxDocType> {
String id = data[collection.primaryKey];
var docInCache = map[id];
if (docInCache != null) {
docInCache.data = data;
return docInCache;
} else {
var doc = RxDocument<RxDocType>(collection, data);
map[id] = doc;
return doc;
}
}

void updateDocData(String id, dynamic data) {
var docInCache = map[id];
if (docInCache != null) {
docInCache.data = data;
}
}
}

abstract class RxDocTypeParent<RxDocType> {
Expand Down
6 changes: 5 additions & 1 deletion src/plugins/flutter/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,16 @@ export function setFlutterRxDatabaseConnector(
) {
(process as any).init = async (databaseName: string) => {
const db = await createDB(databaseName);
db.eventBulks$.subscribe(eventBulk => {
const eventSub = db.eventBulks$.subscribe(eventBulk => {
// eslint-disable-next-line @typescript-eslint/ban-ts-comment
// @ts-ignore
sendRxDBEvent(JSON.stringify(eventBulk));
});
(process as any).db = db;
(process as any).close = async () => {
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens if we want more then one database? @copilot

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Databases are now stored in a process.databases map keyed by name instead of a single process.db. Each init call adds to the map, and close(databaseName) targets just that entry. The Dart side uses process.databases[dbName].db[collectionName] for all JS expressions via _jsDbRef/_jsCollRef helpers. Fixed in c57f820.

eventSub.unsubscribe();
await db.close();
};
const collections: { name: string; primaryKey: string; }[] = [];
Object.entries(db.collections).forEach(([collectionName, collection]) => {
collections.push({
Expand Down