Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1152,4 +1152,149 @@ protected CreateTopic createInvalidRequestForBulk(TestNamespace ns) {
request.setName(ns.prefix("invalid_topic"));
return request;
}

// ===================================================================
// TOPIC SCHEMA FIELD PATCH TESTS
// Regression tests: the shouldCompare optimization in EntityUpdater
// silently dropped nested schema field changes during PATCH because
// the field name prefix ("schemaFields") didn't match the patchedField
// ("messageSchema"). Uses raw HTTP PATCH (not SDK update/PUT) since
// the bug only affects JSON Patch operations.
// ===================================================================

private static final java.net.http.HttpClient HTTP_CLIENT =
java.net.http.HttpClient.newHttpClient();

private Topic createTopicWithNestedSchema(TestNamespace ns, String suffix) {
MessagingService service = MessagingServiceTestFactory.createKafka(ns);
CreateTopic request = new CreateTopic();
request.setName(ns.prefix("topic_schema_patch_" + suffix));
request.setService(service.getFullyQualifiedName());
request.setPartitions(1);
request.setMessageSchema(
new MessageSchema()
.withSchemaType(SchemaType.JSON)
.withSchemaText("{\"test\": \"string\"}")
.withSchemaFields(
Arrays.asList(
new Field().withName("id").withDataType(FieldDataType.STRING),
new Field()
.withName("record")
.withDataType(FieldDataType.RECORD)
.withChildren(
Arrays.asList(
new Field().withName("id").withDataType(FieldDataType.INT),
new Field()
.withName("name")
.withDataType(FieldDataType.STRING))))));
return createEntity(request);
}

private java.net.http.HttpResponse<String> sendJsonPatch(String topicId, String patchBody)
throws Exception {
String url = SdkClients.getServerUrl() + "/v1/topics/" + topicId;
java.net.http.HttpRequest request =
java.net.http.HttpRequest.newBuilder()
.uri(java.net.URI.create(url))
.header("Authorization", "Bearer " + SdkClients.getAdminToken())
.header("Content-Type", "application/json-patch+json")
.method("PATCH", java.net.http.HttpRequest.BodyPublishers.ofString(patchBody))
.build();
return HTTP_CLIENT.send(request, java.net.http.HttpResponse.BodyHandlers.ofString());
}

private Field findSchemaField(Topic topic, String parentName, String childName) {
Field parent =
topic.getMessageSchema().getSchemaFields().stream()
.filter(f -> f.getName().equals(parentName))
.findFirst()
.orElseThrow();
return parent.getChildren().stream()
.filter(f -> f.getName().equals(childName))
.findFirst()
.orElseThrow();
}

@Test
void patch_nestedSchemaFieldDescription_200_ok(TestNamespace ns) throws Exception {
Topic topic = createTopicWithNestedSchema(ns, "desc");

String patchBody =
"[{\"op\": \"add\", "
+ "\"path\": \"/messageSchema/schemaFields/1/children/1/description\", "
+ "\"value\": \"Name of the record\"}]";

var response = sendJsonPatch(topic.getId().toString(), patchBody);
assertEquals(200, response.statusCode(), "PATCH should succeed: " + response.body());

Topic fetched = getEntityWithFields(topic.getId().toString(), "messageSchema");
assertEquals(
"Name of the record",
findSchemaField(fetched, "record", "name").getDescription(),
"Nested schema field description must be persisted after PATCH");
}

@Test
void patch_nestedSchemaFieldDisplayName_200_ok(TestNamespace ns) throws Exception {
Topic topic = createTopicWithNestedSchema(ns, "display");

String patchBody =
"[{\"op\": \"add\", "
+ "\"path\": \"/messageSchema/schemaFields/1/children/1/displayName\", "
+ "\"value\": \"Record Name\"}]";

var response = sendJsonPatch(topic.getId().toString(), patchBody);
assertEquals(200, response.statusCode(), "PATCH should succeed: " + response.body());

Topic fetched = getEntityWithFields(topic.getId().toString(), "messageSchema");
assertEquals(
"Record Name",
findSchemaField(fetched, "record", "name").getDisplayName(),
"Nested schema field displayName must be persisted after PATCH");
}

@Test
void patch_topLevelSchemaFieldDescription_200_ok(TestNamespace ns) throws Exception {
Topic topic = createTopicWithNestedSchema(ns, "toplevel");

String patchBody =
"[{\"op\": \"add\", "
+ "\"path\": \"/messageSchema/schemaFields/0/description\", "
+ "\"value\": \"Unique identifier\"}]";

var response = sendJsonPatch(topic.getId().toString(), patchBody);
assertEquals(200, response.statusCode(), "PATCH should succeed: " + response.body());

Topic fetched = getEntityWithFields(topic.getId().toString(), "messageSchema");
Field idField =
fetched.getMessageSchema().getSchemaFields().stream()
.filter(f -> f.getName().equals("id"))
.findFirst()
.orElseThrow();
assertEquals(
"Unique identifier",
idField.getDescription(),
"Top-level schema field description must be persisted after PATCH");
}

@Test
void patch_schemaFieldDescription_incrementsVersion(TestNamespace ns) throws Exception {
Topic topic = createTopicWithNestedSchema(ns, "version");
double originalVersion = topic.getVersion();

String patchBody =
"[{\"op\": \"add\", "
+ "\"path\": \"/messageSchema/schemaFields/1/children/1/description\", "
+ "\"value\": \"Should bump version\"}]";

sendJsonPatch(topic.getId().toString(), patchBody);

Topic fetched = getEntityWithFields(topic.getId().toString(), "messageSchema");
assertTrue(
fetched.getVersion() > originalVersion,
"Version should increment after schema field PATCH. Was "
+ originalVersion
+ ", now "
+ fetched.getVersion());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6034,6 +6034,11 @@ public class EntityUpdater {
private ChangeSource changeSource;
private final boolean useOptimisticLocking;
@Setter private Set<String> patchedFields;

protected Set<String> getPatchedFields() {
return patchedFields == null ? null : Set.copyOf(patchedFields);
}

private final List<Runnable> deferredReactOperations = new ArrayList<>();
private boolean deferredReactExecuted;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -662,13 +662,32 @@ public void entitySpecificUpdate(boolean consolidatingChanges) {
? null
: original.getMessageSchema().getSchemaType(),
updated.getMessageSchema().getSchemaType());
updateSchemaFields(
"messageSchema.schemaFields",
original.getMessageSchema() == null
? new ArrayList<>()
: listOrEmpty(original.getMessageSchema().getSchemaFields()),
listOrEmpty(updated.getMessageSchema().getSchemaFields()),
EntityUtil.schemaFieldMatch);
// Temporarily extend patchedFields to include "schemaFields" so that
// shouldCompare allows nested schema field changes through recordChange.
// EntityUtil.getSchemaField(Topic, ...) produces field names starting with
// "schemaFields." (e.g., "schemaFields.record.name.description"), which
// don't match the patchedField "messageSchema" extracted from the JSON patch
// path. Adding "schemaFields" unblocks only the schema field tracking without
// disabling shouldCompare filtering for other fields.
// TODO: The proper fix is to update EntityUtil.getSchemaField(Topic, ...) to
// return paths under "messageSchema.schemaFields" so they naturally match.
Set<String> saved = getPatchedFields();
if (saved != null) {
Set<String> extended = new HashSet<>(saved);
extended.add("schemaFields");
setPatchedFields(extended);
}
try {
updateSchemaFields(
"messageSchema.schemaFields",
original.getMessageSchema() == null
? new ArrayList<>()
: listOrEmpty(original.getMessageSchema().getSchemaFields()),
listOrEmpty(updated.getMessageSchema().getSchemaFields()),
EntityUtil.schemaFieldMatch);
} finally {
setPatchedFields(saved);
}
}
});
compareAndUpdate(
Expand Down
Loading
Loading