Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3403,6 +3403,135 @@ void testCreateOrReplaceRefreshesSchemaOnDroppedColumn() throws Exception {
}
}

@Nested
@DisplayName("Key-Specific Bulk Update Operations")
class KeySpecificBulkUpdateTests {

@Test
@DisplayName("Should update multiple keys with all operator types in a single batch")
void testBulkUpdateAllOperatorTypes() throws Exception {
Map<Key, java.util.Collection<SubDocumentUpdate>> updates = new LinkedHashMap<>();
updates.put(
rawKey("1"),
List.of(
SubDocumentUpdate.of("item", "UpdatedSoap"),
SubDocumentUpdate.builder()
.subDocument("price")
.operator(UpdateOperator.ADD)
.subDocumentValue(SubDocumentValue.of(5))
.build(),
SubDocumentUpdate.builder()
.subDocument("props.brand")
.operator(UpdateOperator.SET)
.subDocumentValue(SubDocumentValue.of("NewBrand"))
.build()));

updates.put(
rawKey("3"),
List.of(
SubDocumentUpdate.builder()
.subDocument("props.brand")
.operator(UpdateOperator.UNSET)
.build(),
SubDocumentUpdate.builder()
.subDocument("tags")
.operator(UpdateOperator.APPEND_TO_LIST)
.subDocumentValue(SubDocumentValue.of(new String[] {"newTag1", "newTag2"}))
.build()));

updates.put(
rawKey("5"),
List.of(
SubDocumentUpdate.builder()
.subDocument("tags")
.operator(UpdateOperator.ADD_TO_LIST_IF_ABSENT)
.subDocumentValue(SubDocumentValue.of(new String[] {"hygiene", "uniqueTag"}))
.build()));

updates.put(
rawKey("6"),
List.of(
SubDocumentUpdate.builder()
.subDocument("tags")
.operator(UpdateOperator.REMOVE_ALL_FROM_LIST)
.subDocumentValue(SubDocumentValue.of(new String[] {"plastic"}))
.build()));

BulkUpdateResult result = flatCollection.bulkUpdate(updates, UpdateOptions.builder().build());

assertEquals(4, result.getUpdatedCount());

try (CloseableIterator<Document> iter = flatCollection.find(queryById("1"))) {
assertTrue(iter.hasNext());
JsonNode json = OBJECT_MAPPER.readTree(iter.next().toJson());
assertEquals("UpdatedSoap", json.get("item").asText());
assertEquals(15, json.get("price").asInt()); // 10 + 5
assertEquals("NewBrand", json.get("props").get("brand").asText());
assertEquals("M", json.get("props").get("size").asText()); // preserved
}

try (CloseableIterator<Document> iter = flatCollection.find(queryById("3"))) {
assertTrue(iter.hasNext());
JsonNode json = OBJECT_MAPPER.readTree(iter.next().toJson());
assertFalse(json.get("props").has("brand"));
assertEquals("L", json.get("props").get("size").asText()); // preserved
JsonNode tagsNode = json.get("tags");
assertEquals(6, tagsNode.size()); // Original 4 + 2 new
}

try (CloseableIterator<Document> iter = flatCollection.find(queryById("5"))) {
assertTrue(iter.hasNext());
JsonNode json = OBJECT_MAPPER.readTree(iter.next().toJson());
JsonNode tagsNode = json.get("tags");
assertEquals(4, tagsNode.size()); // Original 3 + 1 new unique
Set<String> tags = new HashSet<>();
tagsNode.forEach(n -> tags.add(n.asText()));
assertTrue(tags.contains("uniqueTag"));
}

try (CloseableIterator<Document> iter = flatCollection.find(queryById("6"))) {
assertTrue(iter.hasNext());
JsonNode json = OBJECT_MAPPER.readTree(iter.next().toJson());
JsonNode tagsNode = json.get("tags");
assertEquals(2, tagsNode.size()); // grooming, essential remain
Set<String> tags = new HashSet<>();
tagsNode.forEach(n -> tags.add(n.asText()));
assertFalse(tags.contains("plastic"));
}
}

@Test
@DisplayName("Should handle edge cases: empty map, null map, non-existent keys")
void testBulkUpdateEdgeCases() throws Exception {
UpdateOptions options = UpdateOptions.builder().build();

// Empty map
assertEquals(0, flatCollection.bulkUpdate(new HashMap<>(), options).getUpdatedCount());

// Null map
Map<Key, java.util.Collection<SubDocumentUpdate>> nullUpdates = null;
assertEquals(0, flatCollection.bulkUpdate(nullUpdates, options).getUpdatedCount());

// Non-existent key
Map<Key, java.util.Collection<SubDocumentUpdate>> nonExistent = new LinkedHashMap<>();
nonExistent.put(rawKey("non-existent"), List.of(SubDocumentUpdate.of("item", "X")));
assertEquals(0, flatCollection.bulkUpdate(nonExistent, options).getUpdatedCount());
}

// Creates a key with raw ID (matching test data format)
private Key rawKey(String id) {
return Key.from(id);
}

private Query queryById(String id) {
return Query.builder()
.setFilter(
RelationalExpression.of(
IdentifierExpression.of("id"), RelationalOperator.EQ, ConstantExpression.of(id)))
.build();
}
}

private static void executeInsertStatements() {
PostgresDatastore pgDatastore = (PostgresDatastore) postgresDatastore;
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ public interface Collection {
* store.
*
* <p>Note: This method ensures that all the fields defined in the `Document` are set/created. How
* the existing fields are modified is implementation specific. For example, upserting <code> {
* "foo2": "bar2" }
* the existing fields are modified is implementation specific. For example, upserting <code>
* { "foo2": "bar2" }
* </code> if a document <code>
* { "foo1": "bar1" }
* </code> already exists would ensure that "foo2" is set the value of "bar2" and what happens to
Expand All @@ -42,8 +42,8 @@ public interface Collection {
* store.
*
* <p>Note: This method ensures that all the fields defined in the `Document` are set/created. How
* the existing fields are modified is implementation specific. For example, upserting <code> {
* "foo2": "bar2" }
* the existing fields are modified is implementation specific. For example, upserting <code>
* { "foo2": "bar2" }
* </code> if a document <code>
* { "foo1": "bar1" }
* </code> already exists would ensure that "foo2" is set the value of "bar2" and what happens to
Expand Down Expand Up @@ -398,5 +398,53 @@ CloseableIterator<Document> bulkUpdate(
final UpdateOptions updateOptions)
throws IOException;

/**
* Bulk update sub-documents with key-specific updates. Each key can have its own set of
* SubDocumentUpdate operations, allowing different updates per document.
*
* <p>This method supports all update operators (SET, UNSET, ADD, APPEND_TO_LIST,
* ADD_TO_LIST_IF_ABSENT, REMOVE_ALL_FROM_LIST). Updates for each individual key are applied
* atomically, but there is no atomicity guarantee across different keys - some keys may be
* updated while others fail. Any atomicity guarantees are implementation-specific.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any atomicity guarantees are implementation-specific.

This violates the Liskov's substitution principle ('L' in SOLID). This has severe implications, which breaks the purpose of having the document store abstraction in the first place, because the implementors now have to make their client code work differently for different underlying datastore. And, honestly, if people miss to read this code comment, and expect the same client-code to work perfectly fine in the other world (which is highly likely), it can lead to subtle bugs, hard to debug and reproduce.

The older methods (like update) have in fact violated this leading to the introduction of newer consistent methods. Perhaps, the behaviour can be restrictive (to at least the known databases which has the most restrictive atomicity support), but has to be consistent.

Copy link
Copy Markdown
Contributor Author

@suddendust suddendust Apr 13, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@suresh-prakash Thanks for the detailed comment, totally agree with you. I feel batch-level atomicity could be controlled by adding them in the UpdateOptions. Or, this method's contract can say that batch-level atomicity is not guaranteed, while per-key update atomicity has to be guaranteed. This is the least-restrictive contract that almost any implementation can implement. If any implementation wants to provide batch-level atomicity guarantees, that's even better.

Wdyt?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

batch-level atomicity is not guaranteed, while per-key update atomicity has to be guaranteed

Yes. This makes total sense.

If any implementation wants to provide batch-level atomicity guarantees, that's even better.

Better definitely in terms of performance. But, that can create functional digression, especially when one of the multiple updates is invalid. In one case (supporting per-key atomicity), documents before that key would have been updated. While, in another case (supporting batch-level atomicity), the entire update would fail.

If we are using document-store, all implementors have to be consistent as much as possible in terms of functionality. Performance would (and can) differ based on the underlying database used, indexes created, etc.

From this standpoint, I think, it would make sense to implement Postgres also perform key-level atomic updates (at least by default). If performance is critical the client can opt-in as you mentioned via. UpdateOptions with the implications explicitly known.

Basically, the design goal here is that the caller/client should be aware of the implications even when they missed to read out these comments and hence prefer cross-database consistency over performance (since the performance differences are evident and inevitable).

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense, I have updated the doc.

*
* <p>Example usage:
*
* <pre>{@code
* Map<Key, Collection<SubDocumentUpdate>> updates = new HashMap<>();
*
* // Key 1: SET a field and ADD to a number
* updates.put(key1, List.of(
* SubDocumentUpdate.of("name", "NewName"),
* SubDocumentUpdate.builder()
* .subDocument("count")
* .operator(UpdateOperator.ADD)
* .subDocumentValue(SubDocumentValue.of(5))
* .build()
* ));
*
* // Key 2: APPEND to an array
* updates.put(key2, List.of(
* SubDocumentUpdate.builder()
* .subDocument("tags")
* .operator(UpdateOperator.APPEND_TO_LIST)
* .subDocumentValue(SubDocumentValue.of(new String[]{"newTag"}))
* .build()
* ));
*
* BulkUpdateResult result = collection.bulkUpdate(updates, UpdateOptions.builder().build());
* }</pre>
*
* @param updates Map of Key to Collection of SubDocumentUpdate operations. Each key's updates are
* applied atomically, but no cross-key atomicity is guaranteed.
* @param updateOptions Options for the update operation
* @return BulkUpdateResult containing the count of successfully updated documents
* @throws IOException if the update operation fails
*/
default BulkUpdateResult bulkUpdate(
Map<Key, java.util.Collection<SubDocumentUpdate>> updates, UpdateOptions updateOptions)
throws IOException {
throw new UnsupportedOperationException("bulkUpdate is not supported!");
}

String UNSUPPORTED_QUERY_OPERATION = "Query operation is not supported";
}
Loading
Loading