From 5eb02fe929351f41c677a3b5d671f94896c04754 Mon Sep 17 00:00:00 2001 From: Ram Narayan Balaji Date: Tue, 24 Mar 2026 12:49:49 +0530 Subject: [PATCH 1/3] =?UTF-8?q?feat(workflows):=20batch=20entity=20process?= =?UTF-8?q?ing=20=E2=80=94=20entityList-only=20for=20automated=20task=20no?= =?UTF-8?q?des?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Phase 1 of batch entity processing in governance workflows. All automated task nodes (checkEntityAttributesTask, setEntityAttributeTask, checkChangeDescriptionTask, rollbackEntityTask, sinkTask, dataCompletenessTask) now process a List of entity links via entityList exclusively. The relatedEntity fallback in getEntityList() is removed — batch nodes no longer have that path. Key changes: - PeriodicBatchEntityTrigger (singleExecutionMode=false): each child process now receives global_entityList via ${entityToListMap[relatedEntity]}. FetchEntitiesImpl pre-builds entityToListMap (entity -> List.of(entity)) so the JUEL expression resolves without static class references. - Batch node impls (6 files): removed relatedEntity fallback from getEntityList() and the now-unused RELATED_ENTITY_VARIABLE import. entityList is the only input path. - BPMN builders: putIfAbsent(ENTITY_LIST_VARIABLE, GLOBAL_NAMESPACE) for all batch-capable nodes; relatedEntity is never added to inputNamespaceMap by builders. - v1130 migration (addEntityListToNamespaceMap): updated to also strip relatedEntity from batch node inputNamespaceMaps, covering both fresh upgrades (add entityList + remove relatedEntity) and instances that already ran the previous migration (entityList present, remove relatedEntity). Migration remains idempotent. - GlossaryApprovalWorkflow.json: removed relatedEntity from all 13 batch node inputNamespaceMaps. userApprovalTask nodes keep relatedEntity. - JSON schemas: entityList added to trigger output and batch node inputNamespaceMap/input definitions. - Integration tests: updated to reflect entityList-first structure across all batch-capable workflow node configurations. Co-Authored-By: Claude Sonnet 4.6 --- .../tests/WorkflowDefinitionResourceIT.java | 138 +++++++------- .../governance/workflows/Workflow.java | 1 + .../workflows/WorkflowEventConsumer.java | 15 +- .../workflows/elements/TriggerFactory.java | 47 +---- .../CheckChangeDescriptionTask.java | 19 +- .../CheckEntityAttributesTask.java | 19 +- .../automatedTask/DataCompletenessTask.java | 21 ++- .../automatedTask/RollbackEntityTask.java | 21 ++- .../automatedTask/SetEntityAttributeTask.java | 19 +- .../impl/CheckChangeDescriptionTaskImpl.java | 37 +++- .../impl/CheckEntityAttributesImpl.java | 48 +++-- .../impl/DataCompletenessImpl.java | 168 ++++++++++-------- .../impl/RollbackEntityImpl.java | 119 ++++++++----- .../impl/SetEntityAttributeImpl.java | 52 +++--- .../automatedTask/sink/SinkTaskDelegate.java | 106 ++++++----- .../triggers/EventBasedEntityTrigger.java | 13 +- .../triggers/PeriodicBatchEntityTrigger.java | 51 +++--- .../triggers/impl/FetchEntitiesImpl.java | 15 +- .../migration/mysql/v1130/Migration.java | 1 + .../migration/postgres/v1130/Migration.java | 1 + .../migration/utils/v1130/MigrationUtil.java | 166 +++++++++++++++++ .../workflows/GlossaryApprovalWorkflow.json | 47 +++-- .../checkChangeDescriptionTask.json | 12 +- .../checkEntityAttributesTask.json | 17 +- .../automatedTask/dataCompletenessTask.json | 9 +- .../automatedTask/rollbackEntityTask.json | 8 +- .../automatedTask/setEntityAttributeTask.json | 8 +- .../nodes/automatedTask/sinkTask.json | 6 +- .../triggers/eventBasedEntityTrigger.json | 3 +- .../triggers/periodicBatchEntityTrigger.json | 3 +- 30 files changed, 764 insertions(+), 426 deletions(-) diff --git a/openmetadata-integration-tests/src/test/java/org/openmetadata/it/tests/WorkflowDefinitionResourceIT.java b/openmetadata-integration-tests/src/test/java/org/openmetadata/it/tests/WorkflowDefinitionResourceIT.java index b192c124d1ba..341146a6a020 100644 --- a/openmetadata-integration-tests/src/test/java/org/openmetadata/it/tests/WorkflowDefinitionResourceIT.java +++ b/openmetadata-integration-tests/src/test/java/org/openmetadata/it/tests/WorkflowDefinitionResourceIT.java @@ -714,11 +714,11 @@ void test_WorkflowValidationEndpoint(TestNamespace ns) { Map checkConfig = new HashMap<>(); checkConfig.put("rules", "{\"!!\":{\"var\":\"description\"}}"); checkNode.put("config", checkConfig); - checkNode.put("input", List.of("relatedEntity")); + checkNode.put("input", List.of("entityList")); Map inputNamespace = new HashMap<>(); - inputNamespace.put("relatedEntity", "global"); + inputNamespace.put("entityList", "global"); checkNode.put("inputNamespaceMap", inputNamespace); - checkNode.put("output", List.of("result")); + checkNode.put("output", List.of("result", "entityList", "falseEntityList")); checkNode.put("branches", List.of("true", "false")); Map endNode = new HashMap<>(); @@ -2087,6 +2087,7 @@ void test_SetTierForMLModels(TestNamespace ns, TestInfo test) throws Exception { }, "output": [ "relatedEntity", + "entityList", "updatedBy" ] }, @@ -2106,13 +2107,15 @@ void test_SetTierForMLModels(TestNamespace ns, TestInfo test) throws Exception { "rules": "{\\"!!\\":{\\"var\\":\\"description\\"}}" }, "input": [ - "relatedEntity" + "entityList" ], "inputNamespaceMap": { - "relatedEntity": "global" + "entityList": "global" }, "output": [ - "result" + "result", + "entityList", + "falseEntityList" ], "branches": [ "true", @@ -2141,11 +2144,11 @@ void test_SetTierForMLModels(TestNamespace ns, TestInfo test) throws Exception { "fieldValue": "Tier.Tier1" }, "input": [ - "relatedEntity", + "entityList", "updatedBy" ], "inputNamespaceMap": { - "relatedEntity": "global", + "entityList": "global", "updatedBy": "global" }, "output": [] @@ -2299,9 +2302,9 @@ void test_PeriodicBatchWorkflowEntityFiltering(TestNamespace ns, TestInfo test) setFieldConfig.put("fieldName", "description"); setFieldConfig.put("fieldValue", "Updated by multi-entity workflow"); setFieldNode.put("config", setFieldConfig); - setFieldNode.put("input", List.of("relatedEntity", "updatedBy")); + setFieldNode.put("input", List.of("entityList", "updatedBy")); Map inputNamespaceMap = new HashMap<>(); - inputNamespaceMap.put("relatedEntity", "global"); + inputNamespaceMap.put("entityList", "global"); inputNamespaceMap.put("updatedBy", "global"); setFieldNode.put("inputNamespaceMap", inputNamespaceMap); setFieldNode.put("output", List.of()); @@ -2329,7 +2332,7 @@ void test_PeriodicBatchWorkflowEntityFiltering(TestNamespace ns, TestInfo test) Map trigger = new HashMap<>(); trigger.put("type", "periodicBatchEntity"); trigger.put("config", triggerConfig); - trigger.put("output", List.of("relatedEntity", "updatedBy")); + trigger.put("output", List.of("relatedEntity", "entityList", "updatedBy")); Map multiEntityRequest = new HashMap<>(); multiEntityRequest.put("name", "EntityFilterWF"); @@ -2415,7 +2418,7 @@ void test_PeriodicBatchWorkflowEntityFiltering(TestNamespace ns, TestInfo test) Map singleEntityTrigger = new HashMap<>(); singleEntityTrigger.put("type", "periodicBatchEntity"); singleEntityTrigger.put("config", singleEntityTriggerConfig); - singleEntityTrigger.put("output", List.of("relatedEntity", "updatedBy")); + singleEntityTrigger.put("output", List.of("relatedEntity", "entityList", "updatedBy")); Map updateSetFieldConfig = new HashMap<>(); updateSetFieldConfig.put("fieldName", "description"); @@ -2427,9 +2430,9 @@ void test_PeriodicBatchWorkflowEntityFiltering(TestNamespace ns, TestInfo test) updateSetFieldNode.put("type", "automatedTask"); updateSetFieldNode.put("subType", "setEntityAttributeTask"); updateSetFieldNode.put("config", updateSetFieldConfig); - updateSetFieldNode.put("input", List.of("relatedEntity", "updatedBy")); + updateSetFieldNode.put("input", List.of("entityList", "updatedBy")); Map updateInputNamespaceMap = new HashMap<>(); - updateInputNamespaceMap.put("relatedEntity", "global"); + updateInputNamespaceMap.put("entityList", "global"); updateInputNamespaceMap.put("updatedBy", "global"); updateSetFieldNode.put("inputNamespaceMap", updateInputNamespaceMap); updateSetFieldNode.put("output", List.of()); @@ -2599,7 +2602,7 @@ void test_WorkflowFieldUpdateDoesNotCreateRedundantChangeEvents(TestNamespace ns "batchSize": 100, "filters": {} }, - "output": ["relatedEntity", "updatedBy"] + "output": ["relatedEntity", "entityList", "updatedBy"] }, "nodes": [ {"type": "startEvent", "subType": "startEvent", "name": "start", "displayName": "start"}, @@ -2612,8 +2615,8 @@ void test_WorkflowFieldUpdateDoesNotCreateRedundantChangeEvents(TestNamespace ns "fieldName": "tags", "fieldValue": "Tier.Tier1" }, - "input": ["relatedEntity", "updatedBy"], - "inputNamespaceMap": {"relatedEntity": "global", "updatedBy": "global"}, + "input": ["entityList", "updatedBy"], + "inputNamespaceMap": {"entityList": "global", "updatedBy": "global"}, "output": [] }, {"type": "endEvent", "subType": "endEvent", "name": "end", "displayName": "end"} @@ -2837,6 +2840,7 @@ void test_MultiEntityPeriodicQueryWithFilters(TestNamespace ns, TestInfo test) }, "output": [ "relatedEntity", + "entityList", "updatedBy" ] }, @@ -2857,11 +2861,11 @@ void test_MultiEntityPeriodicQueryWithFilters(TestNamespace ns, TestInfo test) "fieldValue": "Multi Periodic Entity" }, "input": [ - "relatedEntity", + "entityList", "updatedBy" ], "inputNamespaceMap": { - "relatedEntity": "global", + "entityList": "global", "updatedBy": "global" }, "output": [] @@ -3074,7 +3078,7 @@ void test_EntitySpecificFiltering(TestNamespace ns) throws Exception { "table": "{\\\"!\\\": [{\\\"in\\\": [\\\"production\\\", {\\\"var\\\": \\\"name\\\"}]}]}" } }, - "output": ["relatedEntity", "updatedBy"] + "output": ["relatedEntity", "entityList", "updatedBy"] }, "nodes": [ { @@ -3092,9 +3096,9 @@ void test_EntitySpecificFiltering(TestNamespace ns) throws Exception { "fieldName": "displayName", "fieldValue": "[FILTERED] - Entity passed specific filter" }, - "input": ["relatedEntity", "updatedBy"], + "input": ["entityList", "updatedBy"], "inputNamespaceMap": { - "relatedEntity": "global", + "entityList": "global", "updatedBy": "global" }, "output": [] @@ -3602,11 +3606,11 @@ void test_WorkflowValidationEndpoint() { "config": { "rules": "{\\"!!\\":{\\"var\\":\\"description\\"}}" }, - "input": ["relatedEntity"], + "input": ["entityList"], "inputNamespaceMap": { - "relatedEntity": "global" + "entityList": "global" }, - "output": ["result"], + "output": ["result", "entityList", "falseEntityList"], "branches": ["true", "false"] }, { @@ -3948,7 +3952,7 @@ void test_WorkflowValidationEndpoint() { "entityTypes": ["glossaryTerm"], "events": ["Created"] }, - "output": ["relatedEntity", "updatedBy"] + "output": ["relatedEntity", "entityList", "updatedBy"] }, "nodes": [ { @@ -3980,9 +3984,9 @@ void test_WorkflowValidationEndpoint() { "fieldName": "description", "fieldValue": "Approved" }, - "input": ["relatedEntity", "updatedBy"], + "input": ["entityList", "updatedBy"], "inputNamespaceMap": { - "relatedEntity": "global", + "entityList": "global", "updatedBy": "userApproval" } }, @@ -4552,11 +4556,11 @@ void test_WorkflowValidationEndpoint() { "config": { "rules": "{\\"!!\\":{\\"var\\":\\"description\\"}}" }, - "input": ["relatedEntity"], + "input": ["entityList"], "inputNamespaceMap": { - "relatedEntity": "global" + "entityList": "global" }, - "output": ["result"] + "output": ["result", "entityList", "falseEntityList"] }, { "name": "end", @@ -4690,11 +4694,11 @@ void test_WorkflowValidationEndpoint() { "config": { "rules": "{\\"!!\\":{\\"var\\":\\"description\\"}}" }, - "input": ["relatedEntity"], + "input": ["entityList"], "inputNamespaceMap": { - "relatedEntity": "global" + "entityList": "global" }, - "output": ["result"] + "output": ["result", "entityList", "falseEntityList"] }, { "name": "endTrue", @@ -4995,7 +4999,7 @@ void test_MutualExclusivitySmartReplacement(TestNamespace ns) throws JsonProcess Map trigger = new LinkedHashMap<>(); trigger.put("type", "periodicBatchEntity"); trigger.put("config", triggerConfig); - trigger.put("output", List.of("relatedEntity", "updatedBy")); + trigger.put("output", List.of("relatedEntity", "entityList", "updatedBy")); // ---- nodes ---- Map startNode = new LinkedHashMap<>(); @@ -5014,10 +5018,10 @@ void test_MutualExclusivitySmartReplacement(TestNamespace ns) throws JsonProcess setTagsNode.put("name", "SetEntityAttribute_2"); setTagsNode.put("displayName", "Set Tags"); setTagsNode.put("config", setTagsConfig); - setTagsNode.put("input", List.of("relatedEntity", "updatedBy")); + setTagsNode.put("input", List.of("entityList", "updatedBy")); Map setTagsNamespaceMap = new LinkedHashMap<>(); - setTagsNamespaceMap.put("relatedEntity", "global"); + setTagsNamespaceMap.put("entityList", "global"); setTagsNamespaceMap.put("updatedBy", "global"); setTagsNode.put("inputNamespaceMap", setTagsNamespaceMap); setTagsNode.put("output", List.of()); @@ -5032,10 +5036,10 @@ void test_MutualExclusivitySmartReplacement(TestNamespace ns) throws JsonProcess setGlossaryNode.put("name", "SetEntityAttribute_3"); setGlossaryNode.put("displayName", "Set Glossary Term"); setGlossaryNode.put("config", setGlossaryConfig); - setGlossaryNode.put("input", List.of("relatedEntity", "updatedBy")); + setGlossaryNode.put("input", List.of("entityList", "updatedBy")); Map setGlossaryNamespaceMap = new LinkedHashMap<>(); - setGlossaryNamespaceMap.put("relatedEntity", "global"); + setGlossaryNamespaceMap.put("entityList", "global"); setGlossaryNamespaceMap.put("updatedBy", "global"); setGlossaryNode.put("inputNamespaceMap", setGlossaryNamespaceMap); setGlossaryNode.put("output", List.of()); @@ -5229,7 +5233,7 @@ void test_CustomApprovalWorkflowForNewEntities(TestNamespace ns) "exclude": ["reviewers"], "filter": {} }, - "output": ["relatedEntity", "updatedBy"] + "output": ["relatedEntity", "entityList", "updatedBy"] }, "nodes": [ { @@ -5274,9 +5278,9 @@ void test_CustomApprovalWorkflowForNewEntities(TestNamespace ns) "fieldName": "description", "fieldValue": "Updated by Workflow" }, - "input": ["relatedEntity", "updatedBy"], + "input": ["entityList", "updatedBy"], "inputNamespaceMap": { - "relatedEntity": "global", + "entityList": "global", "updatedBy": "UserApproval" }, "output": [] @@ -5747,7 +5751,7 @@ void test_AutoApprovalForEntitiesWithoutReviewers(TestNamespace ns) "exclude": ["reviewers"], "filter": {} }, - "output": ["relatedEntity", "updatedBy"] + "output": ["relatedEntity", "entityList", "updatedBy"] }, "nodes": [ { @@ -5792,9 +5796,9 @@ void test_AutoApprovalForEntitiesWithoutReviewers(TestNamespace ns) "fieldName": "entityStatus", "fieldValue": "Approved" }, - "input": ["relatedEntity", "updatedBy"], + "input": ["entityList", "updatedBy"], "inputNamespaceMap": { - "relatedEntity": "global", + "entityList": "global", "updatedBy": "UserApproval" }, "output": [] @@ -6418,7 +6422,7 @@ void test_ApiEndpointPeriodicBatchWorkflow(TestNamespace ns) throws JsonProcessi "apiEndpoint": "{\\"query\\":{\\"match\\":{\\"description\\":\\"workflow\\"}}}" } }, - "output": ["relatedEntity", "updatedBy"] + "output": ["relatedEntity", "entityList", "updatedBy"] }, "nodes": [ { @@ -6436,9 +6440,9 @@ void test_ApiEndpointPeriodicBatchWorkflow(TestNamespace ns) throws JsonProcessi "fieldName": "description", "fieldValue": "Processed by workflow - API endpoint updated" }, - "input": ["relatedEntity", "updatedBy"], + "input": ["entityList", "updatedBy"], "inputNamespaceMap": { - "relatedEntity": "global", + "entityList": "global", "updatedBy": "global" }, "output": [] @@ -6831,7 +6835,7 @@ private String buildDataCompletenessWorkflowJson(String workflowName) { "batchSize": 100, "filters": {} }, - "output": ["relatedEntity", "updatedBy"] + "output": ["relatedEntity", "entityList", "updatedBy"] }, "nodes": [ { @@ -6849,9 +6853,9 @@ private String buildDataCompletenessWorkflowJson(String workflowName) { "fieldName": "certification", "fieldValue": "Certification.Gold" }, - "input": ["relatedEntity", "updatedBy"], + "input": ["entityList", "updatedBy"], "inputNamespaceMap": { - "relatedEntity": "global", + "entityList": "global", "updatedBy": "global" }, "output": [] @@ -9156,7 +9160,7 @@ void test_CheckChangeDescriptionTask(TestNamespace ns) throws IOException { "events": ["Updated"], "entityTypes": ["databaseSchema"] }, - "output": ["relatedEntity", "updatedBy"] + "output": ["relatedEntity", "entityList", "updatedBy"] }, "nodes": [ { @@ -9176,9 +9180,9 @@ void test_CheckChangeDescriptionTask(TestNamespace ns) throws IOException { "domains": ["Finance"] } }, - "input": ["relatedEntity"], + "input": ["entityList"], "inputNamespaceMap": { - "relatedEntity": "global" + "entityList": "global" }, "branches": ["true", "false"] }, @@ -9212,9 +9216,9 @@ void test_CheckChangeDescriptionTask(TestNamespace ns) throws IOException { "fieldName": "status", "fieldValue": "Approved" }, - "input": ["relatedEntity", "updatedBy"], + "input": ["entityList", "updatedBy"], "inputNamespaceMap": { - "relatedEntity": "global", + "entityList": "global", "updatedBy": "userApproval" } }, @@ -9227,9 +9231,9 @@ void test_CheckChangeDescriptionTask(TestNamespace ns) throws IOException { "fieldName": "status", "fieldValue": "Rejected" }, - "input": ["relatedEntity", "updatedBy"], + "input": ["entityList", "updatedBy"], "inputNamespaceMap": { - "relatedEntity": "global", + "entityList": "global", "updatedBy": "userApproval" } }, @@ -9242,9 +9246,9 @@ void test_CheckChangeDescriptionTask(TestNamespace ns) throws IOException { "fieldName": "status", "fieldValue": "Draft" }, - "input": ["relatedEntity", "updatedBy"], + "input": ["entityList", "updatedBy"], "inputNamespaceMap": { - "relatedEntity": "global", + "entityList": "global", "updatedBy": "global" } }, @@ -9606,7 +9610,7 @@ void test_SelfApprovalPrevention(TestNamespace ns) throws IOException { "exclude": [], "filter": {} }, - "output": ["relatedEntity", "updatedBy"] + "output": ["relatedEntity", "entityList", "updatedBy"] }, "nodes": [ { @@ -9624,9 +9628,9 @@ void test_SelfApprovalPrevention(TestNamespace ns) throws IOException { "fieldName": "entityStatus", "fieldValue": "In Review" }, - "input": ["relatedEntity", "updatedBy"], + "input": ["entityList", "updatedBy"], "inputNamespaceMap": { - "relatedEntity": "global", + "entityList": "global", "updatedBy": "global" }, "output": [] @@ -9659,9 +9663,9 @@ void test_SelfApprovalPrevention(TestNamespace ns) throws IOException { "fieldName": "entityStatus", "fieldValue": "Approved" }, - "input": ["relatedEntity", "updatedBy"], + "input": ["entityList", "updatedBy"], "inputNamespaceMap": { - "relatedEntity": "global", + "entityList": "global", "updatedBy": "global" }, "output": [] @@ -9675,9 +9679,9 @@ void test_SelfApprovalPrevention(TestNamespace ns) throws IOException { "fieldName": "entityStatus", "fieldValue": "Rejected" }, - "input": ["relatedEntity", "updatedBy"], + "input": ["entityList", "updatedBy"], "inputNamespaceMap": { - "relatedEntity": "global", + "entityList": "global", "updatedBy": "global" }, "output": [] diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/Workflow.java b/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/Workflow.java index 03beae0055b4..97ce054d5db0 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/Workflow.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/Workflow.java @@ -12,6 +12,7 @@ public class Workflow { public static final String INGESTION_PIPELINE_ID_VARIABLE = "ingestionPipelineId"; public static final String RELATED_ENTITY_VARIABLE = "relatedEntity"; public static final String ENTITY_LIST_VARIABLE = "entityList"; + public static final String FALSE_ENTITY_LIST_VARIABLE = "false_entityList"; public static final String BATCH_SINK_PROCESSED_VARIABLE = "batchSinkProcessed"; public static final String TRIGGERING_OBJECT_ID_VARIABLE = "triggeringObjectId"; public static final String RECOGNIZER_FEEDBACK = "recognizerFeedback"; diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/WorkflowEventConsumer.java b/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/WorkflowEventConsumer.java index f73f64203f58..56ff66a90d8d 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/WorkflowEventConsumer.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/WorkflowEventConsumer.java @@ -1,6 +1,7 @@ package org.openmetadata.service.governance.workflows; import static org.openmetadata.schema.entity.events.SubscriptionDestination.SubscriptionType.GOVERNANCE_WORKFLOW_CHANGE_EVENT; +import static org.openmetadata.service.governance.workflows.Workflow.ENTITY_LIST_VARIABLE; import static org.openmetadata.service.governance.workflows.Workflow.GLOBAL_NAMESPACE; import static org.openmetadata.service.governance.workflows.Workflow.RECOGNIZER_FEEDBACK; import static org.openmetadata.service.governance.workflows.Workflow.RELATED_ENTITY_VARIABLE; @@ -223,9 +224,12 @@ public static Map defaultHandler(ChangeEvent event) { MessageParser.EntityLink entityLink = new MessageParser.EntityLink(entityType, entityReference.getFullyQualifiedName()); + String entityLinkString = entityLink.getLinkString(); variables.put( - getNamespacedVariableName(GLOBAL_NAMESPACE, RELATED_ENTITY_VARIABLE), - entityLink.getLinkString()); + getNamespacedVariableName(GLOBAL_NAMESPACE, RELATED_ENTITY_VARIABLE), entityLinkString); + variables.put( + getNamespacedVariableName(GLOBAL_NAMESPACE, ENTITY_LIST_VARIABLE), + List.of(entityLinkString)); // Set the updatedBy variable from the change event userName if (event.getUserName() != null) { @@ -251,9 +255,12 @@ private static Map handleTagRecognizerFeedback(ChangeEvent event MessageParser.EntityLink entityLink = new MessageParser.EntityLink(Entity.TAG, entityReference.getFullyQualifiedName()); + String entityLinkString = entityLink.getLinkString(); + variables.put( + getNamespacedVariableName(GLOBAL_NAMESPACE, RELATED_ENTITY_VARIABLE), entityLinkString); variables.put( - getNamespacedVariableName(GLOBAL_NAMESPACE, RELATED_ENTITY_VARIABLE), - entityLink.getLinkString()); + getNamespacedVariableName(GLOBAL_NAMESPACE, ENTITY_LIST_VARIABLE), + List.of(entityLinkString)); variables.put( getNamespacedVariableName(GLOBAL_NAMESPACE, TRIGGERING_OBJECT_ID_VARIABLE), diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/TriggerFactory.java b/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/TriggerFactory.java index 11acdf109a91..167e9658b5c8 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/TriggerFactory.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/TriggerFactory.java @@ -1,15 +1,10 @@ package org.openmetadata.service.governance.workflows.elements; -import java.util.Map; import org.openmetadata.schema.governance.workflows.TriggerType; import org.openmetadata.schema.governance.workflows.WorkflowDefinition; -import org.openmetadata.schema.governance.workflows.elements.NodeSubType; -import org.openmetadata.schema.governance.workflows.elements.WorkflowNodeDefinitionInterface; -import org.openmetadata.schema.governance.workflows.elements.nodes.automatedTask.SinkTaskDefinition; import org.openmetadata.schema.governance.workflows.elements.triggers.EventBasedEntityTriggerDefinition; import org.openmetadata.schema.governance.workflows.elements.triggers.NoOpTriggerDefinition; import org.openmetadata.schema.governance.workflows.elements.triggers.PeriodicBatchEntityTriggerDefinition; -import org.openmetadata.schema.utils.JsonUtils; import org.openmetadata.service.governance.workflows.elements.triggers.EventBasedEntityTrigger; import org.openmetadata.service.governance.workflows.elements.triggers.NoOpTrigger; import org.openmetadata.service.governance.workflows.elements.triggers.PeriodicBatchEntityTrigger; @@ -29,50 +24,10 @@ public static TriggerInterface createTrigger(WorkflowDefinition workflow) { workflow.getName(), triggerWorkflowId, (PeriodicBatchEntityTriggerDefinition) workflow.getTrigger(), - hasBatchModeNodes(workflow)); + false); }; } - /** - * Check if the workflow contains any nodes with batchMode enabled. When batch mode is detected, - * the trigger should create a single workflow execution per batch instead of N parallel - * executions (one per entity). - * - *

Note: Per the schema, batchMode defaults to true when not explicitly set. This ensures Git - * sinks use single execution mode by default, preventing race conditions from parallel commits. - */ - private static boolean hasBatchModeNodes(WorkflowDefinition workflow) { - if (workflow.getNodes() == null) { - return false; - } - for (WorkflowNodeDefinitionInterface node : workflow.getNodes()) { - if (node.getNodeSubType() == NodeSubType.SINK_TASK) { - // Handle typed SinkTaskDefinition - if (node instanceof SinkTaskDefinition sinkTask) { - if (sinkTask.getConfig() != null) { - // Schema default is true, so treat null as true - Boolean batchMode = sinkTask.getConfig().getBatchMode(); - if (batchMode == null || batchMode) { - return true; - } - } - } else { - // Fallback for Map-based config (e.g., from JSON deserialization) - Object config = node.getConfig(); - if (config != null) { - Map configMap = JsonUtils.getMap(config); - Object batchMode = configMap.get("batchMode"); - // Schema default is true, so treat null/absent as true - if (batchMode == null || Boolean.TRUE.equals(batchMode)) { - return true; - } - } - } - } - } - return false; - } - public static String getTriggerWorkflowId(String workflowFQN) { return String.format("%sTrigger", workflowFQN); } diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/nodes/automatedTask/CheckChangeDescriptionTask.java b/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/nodes/automatedTask/CheckChangeDescriptionTask.java index 9c35a923e06b..5d660125c183 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/nodes/automatedTask/CheckChangeDescriptionTask.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/nodes/automatedTask/CheckChangeDescriptionTask.java @@ -1,8 +1,11 @@ package org.openmetadata.service.governance.workflows.elements.nodes.automatedTask; +import static org.openmetadata.service.governance.workflows.Workflow.ENTITY_LIST_VARIABLE; +import static org.openmetadata.service.governance.workflows.Workflow.GLOBAL_NAMESPACE; import static org.openmetadata.service.governance.workflows.Workflow.getFlowableElementId; import java.util.HashMap; +import java.util.Map; import lombok.extern.slf4j.Slf4j; import org.flowable.bpmn.model.BoundaryEvent; import org.flowable.bpmn.model.BpmnModel; @@ -38,6 +41,17 @@ public CheckChangeDescriptionTask( StartEvent startEvent = new StartEventBuilder().id(getFlowableElementId(subProcessId, "startEvent")).build(); + Map inputNamespaceMap = new HashMap<>(); + if (nodeDefinition.getInputNamespaceMap() != null) { + @SuppressWarnings("unchecked") + Map converted = + JsonUtils.convertValue(nodeDefinition.getInputNamespaceMap(), Map.class); + if (converted != null) { + inputNamespaceMap.putAll(converted); + } + } + inputNamespaceMap.putIfAbsent(ENTITY_LIST_VARIABLE, GLOBAL_NAMESPACE); + ServiceTask checkChangeDescriptionTask = getCheckChangeDescriptionServiceTask( subProcessId, @@ -47,10 +61,7 @@ public CheckChangeDescriptionTask( nodeDefinition.getConfig() != null ? JsonUtils.pojoToJson(nodeDefinition.getConfig().getRules()) : "{}", - JsonUtils.pojoToJson( - nodeDefinition.getInputNamespaceMap() != null - ? nodeDefinition.getInputNamespaceMap() - : new HashMap<>())); + JsonUtils.pojoToJson(inputNamespaceMap)); EndEvent endEvent = new EndEventBuilder().id(getFlowableElementId(subProcessId, "endEvent")).build(); diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/nodes/automatedTask/CheckEntityAttributesTask.java b/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/nodes/automatedTask/CheckEntityAttributesTask.java index ccb8a0de5ade..d1b3034621ea 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/nodes/automatedTask/CheckEntityAttributesTask.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/nodes/automatedTask/CheckEntityAttributesTask.java @@ -1,8 +1,11 @@ package org.openmetadata.service.governance.workflows.elements.nodes.automatedTask; +import static org.openmetadata.service.governance.workflows.Workflow.ENTITY_LIST_VARIABLE; +import static org.openmetadata.service.governance.workflows.Workflow.GLOBAL_NAMESPACE; import static org.openmetadata.service.governance.workflows.Workflow.getFlowableElementId; import java.util.HashMap; +import java.util.Map; import lombok.extern.slf4j.Slf4j; import org.flowable.bpmn.model.BoundaryEvent; import org.flowable.bpmn.model.BpmnModel; @@ -38,14 +41,22 @@ public CheckEntityAttributesTask( StartEvent startEvent = new StartEventBuilder().id(getFlowableElementId(subProcessId, "startEvent")).build(); + Map inputNamespaceMap = new HashMap<>(); + if (nodeDefinition.getInputNamespaceMap() != null) { + @SuppressWarnings("unchecked") + Map converted = + JsonUtils.convertValue(nodeDefinition.getInputNamespaceMap(), Map.class); + if (converted != null) { + inputNamespaceMap.putAll(converted); + } + } + inputNamespaceMap.putIfAbsent(ENTITY_LIST_VARIABLE, GLOBAL_NAMESPACE); + ServiceTask checkEntityAttributes = getCheckEntityAttributesServiceTask( subProcessId, nodeDefinition.getConfig().getRules(), - JsonUtils.pojoToJson( - nodeDefinition.getInputNamespaceMap() != null - ? nodeDefinition.getInputNamespaceMap() - : new HashMap<>())); + JsonUtils.pojoToJson(inputNamespaceMap)); EndEvent endEvent = new EndEventBuilder().id(getFlowableElementId(subProcessId, "endEvent")).build(); diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/nodes/automatedTask/DataCompletenessTask.java b/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/nodes/automatedTask/DataCompletenessTask.java index affe3ac7fc17..e7bc4a106e42 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/nodes/automatedTask/DataCompletenessTask.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/nodes/automatedTask/DataCompletenessTask.java @@ -1,8 +1,12 @@ package org.openmetadata.service.governance.workflows.elements.nodes.automatedTask; +import static org.openmetadata.service.governance.workflows.Workflow.ENTITY_LIST_VARIABLE; +import static org.openmetadata.service.governance.workflows.Workflow.GLOBAL_NAMESPACE; import static org.openmetadata.service.governance.workflows.Workflow.getFlowableElementId; +import java.util.HashMap; import java.util.List; +import java.util.Map; import lombok.extern.slf4j.Slf4j; import org.flowable.bpmn.model.BoundaryEvent; import org.flowable.bpmn.model.BpmnModel; @@ -65,6 +69,17 @@ private ServiceTask getDataCompletenessServiceTask( // Get configuration with defaults if null var config = nodeDefinition.getConfig(); + Map inputNamespaceMap = new HashMap<>(); + if (nodeDefinition.getInputNamespaceMap() != null) { + @SuppressWarnings("unchecked") + Map converted = + JsonUtils.convertValue(nodeDefinition.getInputNamespaceMap(), Map.class); + if (converted != null) { + inputNamespaceMap.putAll(converted); + } + } + inputNamespaceMap.putIfAbsent(ENTITY_LIST_VARIABLE, GLOBAL_NAMESPACE); + List fieldExtensions = List.of( new FieldExtensionBuilder() @@ -77,11 +92,7 @@ private ServiceTask getDataCompletenessServiceTask( .build(), new FieldExtensionBuilder() .fieldName("inputNamespaceMapExpr") - .fieldValue( - JsonUtils.pojoToJson( - nodeDefinition.getInputNamespaceMap() != null - ? nodeDefinition.getInputNamespaceMap() - : new java.util.HashMap<>())) + .fieldValue(JsonUtils.pojoToJson(inputNamespaceMap)) .build()); ServiceTaskBuilder builder = diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/nodes/automatedTask/RollbackEntityTask.java b/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/nodes/automatedTask/RollbackEntityTask.java index cf367c023e7f..2e0f6d0b9832 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/nodes/automatedTask/RollbackEntityTask.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/nodes/automatedTask/RollbackEntityTask.java @@ -1,8 +1,11 @@ package org.openmetadata.service.governance.workflows.elements.nodes.automatedTask; +import static org.openmetadata.service.governance.workflows.Workflow.ENTITY_LIST_VARIABLE; +import static org.openmetadata.service.governance.workflows.Workflow.GLOBAL_NAMESPACE; import static org.openmetadata.service.governance.workflows.Workflow.getFlowableElementId; import java.util.HashMap; +import java.util.Map; import lombok.Getter; import org.flowable.bpmn.model.BoundaryEvent; import org.flowable.bpmn.model.BpmnModel; @@ -61,15 +64,21 @@ public RollbackEntityTask( private ServiceTask getRollbackEntityServiceTask( String subProcessId, RollbackEntityTaskDefinition nodeDefinition) { - // Pass the input namespace map so RollbackEntityImpl can access namespaced variables + Map inputNamespaceMap = new HashMap<>(); + if (nodeDefinition.getInputNamespaceMap() != null) { + @SuppressWarnings("unchecked") + Map converted = + JsonUtils.convertValue(nodeDefinition.getInputNamespaceMap(), Map.class); + if (converted != null) { + inputNamespaceMap.putAll(converted); + } + } + inputNamespaceMap.putIfAbsent(ENTITY_LIST_VARIABLE, GLOBAL_NAMESPACE); + FieldExtension inputNamespaceMapExpr = new FieldExtensionBuilder() .fieldName("inputNamespaceMapExpr") - .fieldValue( - JsonUtils.pojoToJson( - nodeDefinition.getInputNamespaceMap() != null - ? nodeDefinition.getInputNamespaceMap() - : new HashMap<>())) + .fieldValue(JsonUtils.pojoToJson(inputNamespaceMap)) .build(); return new ServiceTaskBuilder() diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/nodes/automatedTask/SetEntityAttributeTask.java b/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/nodes/automatedTask/SetEntityAttributeTask.java index bcca1c056cf9..cb582adccabc 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/nodes/automatedTask/SetEntityAttributeTask.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/nodes/automatedTask/SetEntityAttributeTask.java @@ -1,8 +1,11 @@ package org.openmetadata.service.governance.workflows.elements.nodes.automatedTask; +import static org.openmetadata.service.governance.workflows.Workflow.ENTITY_LIST_VARIABLE; +import static org.openmetadata.service.governance.workflows.Workflow.GLOBAL_NAMESPACE; import static org.openmetadata.service.governance.workflows.Workflow.getFlowableElementId; import java.util.HashMap; +import java.util.Map; import org.flowable.bpmn.model.BoundaryEvent; import org.flowable.bpmn.model.BpmnModel; import org.flowable.bpmn.model.EndEvent; @@ -36,15 +39,23 @@ public SetEntityAttributeTask( StartEvent startEvent = new StartEventBuilder().id(getFlowableElementId(subProcessId, "startEvent")).build(); + Map inputNamespaceMap = new HashMap<>(); + if (nodeDefinition.getInputNamespaceMap() != null) { + @SuppressWarnings("unchecked") + Map converted = + JsonUtils.convertValue(nodeDefinition.getInputNamespaceMap(), Map.class); + if (converted != null) { + inputNamespaceMap.putAll(converted); + } + } + inputNamespaceMap.putIfAbsent(ENTITY_LIST_VARIABLE, GLOBAL_NAMESPACE); + ServiceTask setEntityAttribute = getSetEntityAttributeServiceTask( subProcessId, nodeDefinition.getConfig().getFieldName(), nodeDefinition.getConfig().getFieldValue(), - JsonUtils.pojoToJson( - nodeDefinition.getInputNamespaceMap() != null - ? nodeDefinition.getInputNamespaceMap() - : new HashMap<>())); + JsonUtils.pojoToJson(inputNamespaceMap)); EndEvent endEvent = new EndEventBuilder().id(getFlowableElementId(subProcessId, "endEvent")).build(); diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/nodes/automatedTask/impl/CheckChangeDescriptionTaskImpl.java b/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/nodes/automatedTask/impl/CheckChangeDescriptionTaskImpl.java index ac90a0427401..5ca05a0cc59c 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/nodes/automatedTask/impl/CheckChangeDescriptionTaskImpl.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/nodes/automatedTask/impl/CheckChangeDescriptionTaskImpl.java @@ -1,7 +1,7 @@ package org.openmetadata.service.governance.workflows.elements.nodes.automatedTask.impl; +import static org.openmetadata.service.governance.workflows.Workflow.ENTITY_LIST_VARIABLE; import static org.openmetadata.service.governance.workflows.Workflow.EXCEPTION_VARIABLE; -import static org.openmetadata.service.governance.workflows.Workflow.RELATED_ENTITY_VARIABLE; import static org.openmetadata.service.governance.workflows.Workflow.RESULT_VARIABLE; import static org.openmetadata.service.governance.workflows.Workflow.WORKFLOW_RUNTIME_EXCEPTION; import static org.openmetadata.service.governance.workflows.WorkflowHandler.getProcessDefinitionKeyFromId; @@ -37,12 +37,23 @@ public void execute(DelegateExecution execution) { try { Map inputNamespaceMap = JsonUtils.readOrConvertValue(inputNamespaceMapExpr.getValue(execution), Map.class); - String entityLinkStr = - (String) - varHandler.getNamespacedVariable( - inputNamespaceMap.get(RELATED_ENTITY_VARIABLE), RELATED_ENTITY_VARIABLE); - boolean result = checkChangeDescription(execution, entityLinkStr); + List entityList = getEntityList(inputNamespaceMap, varHandler); + List trueEntityList = new ArrayList<>(); + List falseEntityList = new ArrayList<>(); + + for (String entityLinkStr : entityList) { + if (checkChangeDescription(execution, entityLinkStr)) { + trueEntityList.add(entityLinkStr); + } else { + falseEntityList.add(entityLinkStr); + } + } + + boolean result = !trueEntityList.isEmpty(); + varHandler.setNodeVariable("true_" + ENTITY_LIST_VARIABLE, trueEntityList); + varHandler.setNodeVariable("false_" + ENTITY_LIST_VARIABLE, falseEntityList); + varHandler.setNodeVariable(ENTITY_LIST_VARIABLE, result ? trueEntityList : falseEntityList); varHandler.setNodeVariable(RESULT_VARIABLE, result); } catch (Exception exc) { LOG.error( @@ -52,6 +63,20 @@ public void execute(DelegateExecution execution) { } } + @SuppressWarnings("unchecked") + private List getEntityList( + Map inputNamespaceMap, WorkflowVariableHandler varHandler) { + String entityListNamespace = inputNamespaceMap.get(ENTITY_LIST_VARIABLE); + if (entityListNamespace != null) { + Object entityListObj = + varHandler.getNamespacedVariable(entityListNamespace, ENTITY_LIST_VARIABLE); + if (entityListObj instanceof List) { + return (List) entityListObj; + } + } + return List.of(); + } + private boolean checkChangeDescription(DelegateExecution execution, String entityLinkStr) { // Parse entity MessageParser.EntityLink entityLink = MessageParser.EntityLink.parse(entityLinkStr); diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/nodes/automatedTask/impl/CheckEntityAttributesImpl.java b/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/nodes/automatedTask/impl/CheckEntityAttributesImpl.java index 5e5ca0876af4..0bce0ca96cb3 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/nodes/automatedTask/impl/CheckEntityAttributesImpl.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/nodes/automatedTask/impl/CheckEntityAttributesImpl.java @@ -1,11 +1,13 @@ package org.openmetadata.service.governance.workflows.elements.nodes.automatedTask.impl; +import static org.openmetadata.service.governance.workflows.Workflow.ENTITY_LIST_VARIABLE; import static org.openmetadata.service.governance.workflows.Workflow.EXCEPTION_VARIABLE; -import static org.openmetadata.service.governance.workflows.Workflow.RELATED_ENTITY_VARIABLE; import static org.openmetadata.service.governance.workflows.Workflow.RESULT_VARIABLE; import static org.openmetadata.service.governance.workflows.Workflow.WORKFLOW_RUNTIME_EXCEPTION; import static org.openmetadata.service.governance.workflows.WorkflowHandler.getProcessDefinitionKeyFromId; +import java.util.ArrayList; +import java.util.List; import java.util.Map; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.exception.ExceptionUtils; @@ -33,12 +35,25 @@ public void execute(DelegateExecution execution) { Map inputNamespaceMap = JsonUtils.readOrConvertValue(inputNamespaceMapExpr.getValue(execution), Map.class); String rules = (String) rulesExpr.getValue(execution); - MessageParser.EntityLink entityLink = - MessageParser.EntityLink.parse( - (String) - varHandler.getNamespacedVariable( - inputNamespaceMap.get(RELATED_ENTITY_VARIABLE), RELATED_ENTITY_VARIABLE)); - varHandler.setNodeVariable(RESULT_VARIABLE, checkAttributes(entityLink, rules)); + + List entityList = getEntityList(inputNamespaceMap, varHandler); + List trueEntityList = new ArrayList<>(); + List falseEntityList = new ArrayList<>(); + + for (String entityLinkStr : entityList) { + MessageParser.EntityLink entityLink = MessageParser.EntityLink.parse(entityLinkStr); + if (checkAttributes(entityLink, rules)) { + trueEntityList.add(entityLinkStr); + } else { + falseEntityList.add(entityLinkStr); + } + } + + boolean result = !trueEntityList.isEmpty(); + varHandler.setNodeVariable("true_" + ENTITY_LIST_VARIABLE, trueEntityList); + varHandler.setNodeVariable("false_" + ENTITY_LIST_VARIABLE, falseEntityList); + varHandler.setNodeVariable(ENTITY_LIST_VARIABLE, result ? trueEntityList : falseEntityList); + varHandler.setNodeVariable(RESULT_VARIABLE, result); } catch (Exception exc) { LOG.error( String.format( @@ -51,13 +66,24 @@ public void execute(DelegateExecution execution) { private Boolean checkAttributes(MessageParser.EntityLink entityLink, String rules) { EntityInterface entity = Entity.getEntity(entityLink, "*", Include.ALL); - - boolean result; try { - result = (boolean) RuleEngine.getInstance().apply(rules, JsonUtils.getMap(entity)); + return (boolean) RuleEngine.getInstance().apply(rules, JsonUtils.getMap(entity)); } catch (Exception e) { throw new RuntimeException(e); } - return result; + } + + @SuppressWarnings("unchecked") + private List getEntityList( + Map inputNamespaceMap, WorkflowVariableHandler varHandler) { + String entityListNamespace = inputNamespaceMap.get(ENTITY_LIST_VARIABLE); + if (entityListNamespace != null) { + Object entityListObj = + varHandler.getNamespacedVariable(entityListNamespace, ENTITY_LIST_VARIABLE); + if (entityListObj instanceof List) { + return (List) entityListObj; + } + } + return List.of(); } } diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/nodes/automatedTask/impl/DataCompletenessImpl.java b/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/nodes/automatedTask/impl/DataCompletenessImpl.java index 21e59235119f..e0ea4f826292 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/nodes/automatedTask/impl/DataCompletenessImpl.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/nodes/automatedTask/impl/DataCompletenessImpl.java @@ -1,13 +1,14 @@ package org.openmetadata.service.governance.workflows.elements.nodes.automatedTask.impl; +import static org.openmetadata.service.governance.workflows.Workflow.ENTITY_LIST_VARIABLE; import static org.openmetadata.service.governance.workflows.Workflow.EXCEPTION_VARIABLE; -import static org.openmetadata.service.governance.workflows.Workflow.RELATED_ENTITY_VARIABLE; import static org.openmetadata.service.governance.workflows.Workflow.RESULT_VARIABLE; import static org.openmetadata.service.governance.workflows.Workflow.WORKFLOW_RUNTIME_EXCEPTION; import static org.openmetadata.service.governance.workflows.WorkflowHandler.getProcessDefinitionKeyFromId; import java.util.ArrayList; import java.util.Comparator; +import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import lombok.Data; @@ -34,7 +35,6 @@ public class DataCompletenessImpl implements JavaDelegate { public void execute(DelegateExecution execution) { WorkflowVariableHandler varHandler = new WorkflowVariableHandler(execution); try { - // Get configuration Map inputNamespaceMap = JsonUtils.readOrConvertValue(inputNamespaceMapExpr.getValue(execution), Map.class); List fieldsToCheck = @@ -48,58 +48,71 @@ public void execute(DelegateExecution execution) { band.setMinimumScore(((Number) bandMap.get("minimumScore")).doubleValue()); qualityBands.add(band); } - // Get the entity - MessageParser.EntityLink entityLink = - MessageParser.EntityLink.parse( - (String) - varHandler.getNamespacedVariable( - inputNamespaceMap.get(RELATED_ENTITY_VARIABLE), RELATED_ENTITY_VARIABLE)); - - EntityInterface entity = Entity.getEntity(entityLink, "*", Include.ALL); - Map entityMap = JsonUtils.getMap(entity); - - // Calculate completeness - DataCompletenessResult result = calculateCompleteness(entityMap, fieldsToCheck, qualityBands); - - // Set output variables - optimize for performance - varHandler.setNodeVariable("completenessScore", result.score); - varHandler.setNodeVariable("filledFieldsCount", result.filledFieldsCount); - varHandler.setNodeVariable("totalFieldsCount", result.totalFieldsCount); - varHandler.setNodeVariable("qualityBand", result.qualityBand); - - // Only store field lists if they're reasonably sized (< 50 items) - if (result.missingFields.size() <= 50) { - varHandler.setNodeVariable("missingFields", result.missingFields); - } else { - varHandler.setNodeVariable( - "missingFields", - result.missingFields.subList(0, 50) - + " [+" - + (result.missingFields.size() - 50) - + " more]"); - } - if (result.filledFields.size() <= 50) { - varHandler.setNodeVariable("filledFields", result.filledFields); - } else { - varHandler.setNodeVariable( - "filledFields", - result.filledFields.subList(0, 50) - + " [+" - + (result.filledFields.size() - 50) - + " more]"); + List entityList = getEntityList(inputNamespaceMap, varHandler); + + // Per-band entity lists and per-entity results + Map> entitiesByBand = new LinkedHashMap<>(); + Map entityResults = new LinkedHashMap<>(); + DataCompletenessResult lastResult = null; + + for (String entityLinkStr : entityList) { + MessageParser.EntityLink entityLink = MessageParser.EntityLink.parse(entityLinkStr); + EntityInterface entity = Entity.getEntity(entityLink, "*", Include.ALL); + Map entityMap = JsonUtils.getMap(entity); + DataCompletenessResult result = + calculateCompleteness(entityMap, fieldsToCheck, qualityBands); + lastResult = result; + + entitiesByBand + .computeIfAbsent(result.qualityBand, k -> new ArrayList<>()) + .add(entityLinkStr); + entityResults.put( + entityLinkStr, + Map.of( + "score", result.score, + "band", result.qualityBand, + "missingFields", result.missingFields, + "filledFields", result.filledFields)); + + LOG.info( + "[WorkflowNode][DataCompleteness] entity='{}' score={}% band='{}' filled={}/{}", + entityLinkStr, + result.score, + result.qualityBand, + result.filledFieldsCount, + result.totalFieldsCount); } - // Set result variable for edge routing (using the quality band name) - varHandler.setNodeVariable(RESULT_VARIABLE, result.qualityBand); + // Per-band entity lists — ALL bands stored, empty or not (Phase 2 inclusive gateway ready) + for (var entry : entitiesByBand.entrySet()) { + varHandler.setNodeVariable(entry.getKey() + "_" + ENTITY_LIST_VARIABLE, entry.getValue()); + } - LOG.info( - "[WorkflowNode][DataCompleteness] EXECUTED: entity='{}' score={}% band='{}' filled={}/{}", - entityLink, - result.score, - result.qualityBand, - result.filledFieldsCount, - result.totalFieldsCount); + // Priority band = highest minimumScore band that has entities + String priorityBand = + qualityBands.stream() + .sorted(Comparator.comparingDouble(QualityBand::getMinimumScore).reversed()) + .map(QualityBand::getName) + .filter(entitiesByBand::containsKey) + .findFirst() + .orElse("undefined"); + + // Backward compat: standard entityList for Phase 1 exclusive gateway routing + varHandler.setNodeVariable( + ENTITY_LIST_VARIABLE, entitiesByBand.getOrDefault(priorityBand, List.of())); + varHandler.setNodeVariable(RESULT_VARIABLE, priorityBand); + varHandler.setNodeVariable("entityResults", entityResults); + + // Scalar outputs for backward compat when processing a single entity + if (entityList.size() == 1 && lastResult != null) { + varHandler.setNodeVariable("completenessScore", lastResult.score); + varHandler.setNodeVariable("filledFieldsCount", lastResult.filledFieldsCount); + varHandler.setNodeVariable("totalFieldsCount", lastResult.totalFieldsCount); + varHandler.setNodeVariable("qualityBand", lastResult.qualityBand); + storeFieldList(varHandler, "missingFields", lastResult.missingFields); + storeFieldList(varHandler, "filledFields", lastResult.filledFields); + } } catch (Exception exc) { LOG.error( @@ -111,6 +124,30 @@ public void execute(DelegateExecution execution) { } } + private void storeFieldList( + WorkflowVariableHandler varHandler, String varName, List fields) { + if (fields.size() <= 50) { + varHandler.setNodeVariable(varName, fields); + } else { + varHandler.setNodeVariable( + varName, fields.subList(0, 50) + " [+" + (fields.size() - 50) + " more]"); + } + } + + @SuppressWarnings("unchecked") + private List getEntityList( + Map inputNamespaceMap, WorkflowVariableHandler varHandler) { + String entityListNamespace = inputNamespaceMap.get(ENTITY_LIST_VARIABLE); + if (entityListNamespace != null) { + Object entityListObj = + varHandler.getNamespacedVariable(entityListNamespace, ENTITY_LIST_VARIABLE); + if (entityListObj instanceof List) { + return (List) entityListObj; + } + } + return List.of(); + } + private DataCompletenessResult calculateCompleteness( Map entityMap, List fieldsToCheck, List qualityBands) { @@ -127,7 +164,6 @@ private DataCompletenessResult calculateCompleteness( totalFieldsToCheck += fieldInfo.totalCount; totalFieldsFilled += fieldInfo.filledCount; - // Record detailed results if (fieldInfo.isFullyComplete()) { result.filledFields.add( fieldPath + " (" + fieldInfo.filledCount + "/" + fieldInfo.totalCount + ")"); @@ -142,13 +178,11 @@ private DataCompletenessResult calculateCompleteness( result.totalFieldsCount = totalFieldsToCheck; result.filledFieldsCount = totalFieldsFilled; - // Calculate percentage result.score = result.totalFieldsCount > 0 ? (result.filledFieldsCount * 100.0) / result.totalFieldsCount : 0.0; - // Determine quality band based on score result.qualityBand = determineQualityBand(result.score, qualityBands); return result; @@ -167,23 +201,17 @@ private FieldCompletenessInfo evaluateFieldCompleteness( FieldCompletenessInfo info = new FieldCompletenessInfo(); - // Handle nested fields with dot notation String[] parts = fieldPath.split("\\."); - // Check if this is an array element field check (e.g., "columns.description") if (parts.length > 1) { - // Get the first part to check if it's an array Object firstField = getNestedValue(entityMap, parts[0]); if (firstField instanceof List) { - // It's an array field check like "columns.description" List arrayList = (List) firstField; if (arrayList.isEmpty()) { - // Empty array - no items to check info.totalCount = 1; - info.filledCount = 0; // Empty arrays are considered missing + info.filledCount = 0; } else { - // Check the nested field in each array element String nestedPath = fieldPath.substring(parts[0].length() + 1); info.totalCount = arrayList.size(); @@ -197,26 +225,22 @@ private FieldCompletenessInfo evaluateFieldCompleteness( } } } else { - // It's a regular nested field path (e.g., "database.name") Object value = getNestedValue(entityMap, fieldPath); info.totalCount = 1; - // Smart detection for the nested value if (value instanceof List) { List list = (List) value; - info.filledCount = !list.isEmpty() ? 1 : 0; // Empty arrays are considered missing + info.filledCount = !list.isEmpty() ? 1 : 0; } else { info.filledCount = isFieldFilled(value) ? 1 : 0; } } } else { - // Simple field - with smart array detection Object value = getNestedValue(entityMap, fieldPath); info.totalCount = 1; - // Smart detection: if it's an array, check for non-empty if (value instanceof List) { List list = (List) value; - info.filledCount = !list.isEmpty() ? 1 : 0; // Empty arrays are considered missing + info.filledCount = !list.isEmpty() ? 1 : 0; } else { info.filledCount = isFieldFilled(value) ? 1 : 0; } @@ -225,9 +249,6 @@ private FieldCompletenessInfo evaluateFieldCompleteness( return info; } - /** - * Gets a nested value from a map using dot notation. - */ private Object getNestedValue(Map map, String path) { if (map == null || path == null) { return null; @@ -259,12 +280,12 @@ private boolean isFieldFilled(Object value) { if (value instanceof String) { String str = (String) value; - return !str.trim().isEmpty(); // Empty strings are considered missing + return !str.trim().isEmpty(); } if (value instanceof List) { List list = (List) value; - return !list.isEmpty(); // Empty arrays are considered missing + return !list.isEmpty(); } if (value instanceof Map) { @@ -272,24 +293,19 @@ private boolean isFieldFilled(Object value) { return !map.isEmpty(); } - // For other types (numbers, booleans), non-null means filled return true; } private String determineQualityBand(double score, List qualityBands) { - // Sort bands by minimumScore in descending order to evaluate from highest to lowest List sortedBands = new ArrayList<>(qualityBands); sortedBands.sort(Comparator.comparingDouble(QualityBand::getMinimumScore).reversed()); - // Find the matching band for (QualityBand band : sortedBands) { if (score >= band.getMinimumScore()) { return band.getName(); } } - // If no band matches (shouldn't happen if bands are configured correctly) - // Return the band with the lowest threshold return sortedBands.isEmpty() ? "undefined" : sortedBands.getLast().getName(); } diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/nodes/automatedTask/impl/RollbackEntityImpl.java b/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/nodes/automatedTask/impl/RollbackEntityImpl.java index c9ee9cf4cfb7..0bb3cebd6a59 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/nodes/automatedTask/impl/RollbackEntityImpl.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/nodes/automatedTask/impl/RollbackEntityImpl.java @@ -1,15 +1,19 @@ package org.openmetadata.service.governance.workflows.elements.nodes.automatedTask.impl; -import static org.openmetadata.service.governance.workflows.Workflow.RELATED_ENTITY_VARIABLE; +import static org.openmetadata.service.governance.workflows.Workflow.ENTITY_LIST_VARIABLE; +import static org.openmetadata.service.governance.workflows.Workflow.EXCEPTION_VARIABLE; import static org.openmetadata.service.governance.workflows.Workflow.UPDATED_BY_VARIABLE; import static org.openmetadata.service.governance.workflows.Workflow.WORKFLOW_INSTANCE_EXECUTION_ID_VARIABLE; +import static org.openmetadata.service.governance.workflows.Workflow.WORKFLOW_RUNTIME_EXCEPTION; import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.UUID; import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.exception.ExceptionUtils; import org.flowable.common.engine.api.delegate.Expression; +import org.flowable.engine.delegate.BpmnError; import org.flowable.engine.delegate.DelegateExecution; import org.flowable.engine.delegate.JavaDelegate; import org.openmetadata.schema.EntityInterface; @@ -34,9 +38,8 @@ public class RollbackEntityImpl implements JavaDelegate { @Override public void execute(DelegateExecution execution) { + WorkflowVariableHandler varHandler = new WorkflowVariableHandler(execution); try { - WorkflowVariableHandler varHandler = new WorkflowVariableHandler(execution); - Map inputNamespaceMap = JsonUtils.readOrConvertValue(inputNamespaceMapExpr.getValue(execution), Map.class); @@ -47,12 +50,6 @@ public void execute(DelegateExecution execution) { ? workflowInstanceExecutionIdObj.toString() : (String) workflowInstanceExecutionIdObj; - MessageParser.EntityLink entityLink = - MessageParser.EntityLink.parse( - (String) - varHandler.getNamespacedVariable( - inputNamespaceMap.get(RELATED_ENTITY_VARIABLE), RELATED_ENTITY_VARIABLE)); - String updatedBy = (String) varHandler.getNamespacedVariable( @@ -61,54 +58,82 @@ public void execute(DelegateExecution execution) { updatedBy = "governance-bot"; } - EntityInterface currentEntity = Entity.getEntity(entityLink, "*", Include.ALL); + List entityList = getEntityList(inputNamespaceMap, varHandler); + for (String entityLinkStr : entityList) { + MessageParser.EntityLink entityLink = MessageParser.EntityLink.parse(entityLinkStr); + rollbackEntity(execution, entityLink, updatedBy, workflowInstanceExecutionId); + } - String entityType = currentEntity.getEntityReference().getType(); - UUID entityId = currentEntity.getId(); + } catch (BpmnError e) { + throw e; + } catch (Exception e) { + LOG.error("[RollbackEntity] Error during entity rollback: {}", e.getMessage(), e); + varHandler.setGlobalVariable(EXCEPTION_VARIABLE, ExceptionUtils.getStackTrace(e)); + throw new BpmnError(WORKFLOW_RUNTIME_EXCEPTION, e.getMessage()); + } + } - LOG.info( - "[RollbackEntity] Rolling back entity: {} ({}), Workflow Instance: {}", + private void rollbackEntity( + DelegateExecution execution, + MessageParser.EntityLink entityLink, + String updatedBy, + String workflowInstanceExecutionId) { + EntityInterface currentEntity = Entity.getEntity(entityLink, "*", Include.ALL); + String entityType = currentEntity.getEntityReference().getType(); + UUID entityId = currentEntity.getId(); + + LOG.info( + "[RollbackEntity] Rolling back entity: {} ({}), Workflow Instance: {}", + currentEntity.getName(), + entityId, + workflowInstanceExecutionId); + + EntityRepository repository = Entity.getEntityRepository(entityType); + + Double previousVersion = getPreviousApprovedVersion(currentEntity, repository); + if (previousVersion == null) { + LOG.warn( + "[RollbackEntity] No previous approved version found for entity: {} ({})", currentEntity.getName(), - entityId, - workflowInstanceExecutionId); - - EntityRepository repository = Entity.getEntityRepository(entityType); - - Double previousVersion = getPreviousApprovedVersion(currentEntity, repository); - if (previousVersion == null) { - LOG.warn( - "[RollbackEntity] No previous approved version found for entity: {} ({})", - currentEntity.getName(), - entityId); - return; - } + entityId); + return; + } - EntityInterface previousEntity = repository.getVersion(entityId, previousVersion.toString()); + EntityInterface previousEntity = repository.getVersion(entityId, previousVersion.toString()); - LOG.info( - "[RollbackEntity] Rolling back entity {} from version {} to version {}", - currentEntity.getName(), - currentEntity.getVersion(), - previousVersion); + LOG.info( + "[RollbackEntity] Rolling back entity {} from version {} to version {}", + currentEntity.getName(), + currentEntity.getVersion(), + previousVersion); - restoreToPreviousVersion(repository, currentEntity, previousEntity, updatedBy); + restoreToPreviousVersion(repository, currentEntity, previousEntity, updatedBy); - execution.setVariable("rollbackAction", "rollback"); - execution.setVariable("rollbackFromVersion", currentEntity.getVersion()); - execution.setVariable("rollbackToVersion", previousVersion); - execution.setVariable("rollbackEntityId", entityId.toString()); - execution.setVariable("rollbackEntityType", entityType); + execution.setVariable("rollbackAction", "rollback"); + execution.setVariable("rollbackFromVersion", currentEntity.getVersion()); + execution.setVariable("rollbackToVersion", previousVersion); + execution.setVariable("rollbackEntityId", entityId.toString()); + execution.setVariable("rollbackEntityType", entityType); - LOG.info( - "[RollbackEntity] Successfully rolled back entity: {} ({}) to version {}", - currentEntity.getName(), - entityId, - previousVersion); + LOG.info( + "[RollbackEntity] Successfully rolled back entity: {} ({}) to version {}", + currentEntity.getName(), + entityId, + previousVersion); + } - } catch (Exception e) { - LOG.error("[RollbackEntity] Error during entity rollback: {}", e.getMessage(), e); - throw new RuntimeException("Failed to rollback entity", e); + @SuppressWarnings("unchecked") + private List getEntityList( + Map inputNamespaceMap, WorkflowVariableHandler varHandler) { + String entityListNamespace = inputNamespaceMap.get(ENTITY_LIST_VARIABLE); + if (entityListNamespace != null) { + Object entityListObj = + varHandler.getNamespacedVariable(entityListNamespace, ENTITY_LIST_VARIABLE); + if (entityListObj instanceof List) { + return (List) entityListObj; + } } + return List.of(); } private Double getPreviousApprovedVersion( diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/nodes/automatedTask/impl/SetEntityAttributeImpl.java b/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/nodes/automatedTask/impl/SetEntityAttributeImpl.java index afcba6150d9d..8802198c64e7 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/nodes/automatedTask/impl/SetEntityAttributeImpl.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/nodes/automatedTask/impl/SetEntityAttributeImpl.java @@ -1,11 +1,12 @@ package org.openmetadata.service.governance.workflows.elements.nodes.automatedTask.impl; +import static org.openmetadata.service.governance.workflows.Workflow.ENTITY_LIST_VARIABLE; import static org.openmetadata.service.governance.workflows.Workflow.EXCEPTION_VARIABLE; -import static org.openmetadata.service.governance.workflows.Workflow.RELATED_ENTITY_VARIABLE; import static org.openmetadata.service.governance.workflows.Workflow.UPDATED_BY_VARIABLE; import static org.openmetadata.service.governance.workflows.Workflow.WORKFLOW_RUNTIME_EXCEPTION; import static org.openmetadata.service.governance.workflows.WorkflowHandler.getProcessDefinitionKeyFromId; +import java.util.List; import java.util.Map; import java.util.Optional; import lombok.extern.slf4j.Slf4j; @@ -32,21 +33,11 @@ public class SetEntityAttributeImpl implements JavaDelegate { public void execute(DelegateExecution execution) { WorkflowVariableHandler varHandler = new WorkflowVariableHandler(execution); try { - // Extract entity from workflow context Map inputNamespaceMap = JsonUtils.readOrConvertValue(inputNamespaceMapExpr.getValue(execution), Map.class); - String relatedEntityNamespace = (String) inputNamespaceMap.get(RELATED_ENTITY_VARIABLE); - String relatedEntityValue = - (String) - varHandler.getNamespacedVariable(relatedEntityNamespace, RELATED_ENTITY_VARIABLE); - MessageParser.EntityLink entityLink = MessageParser.EntityLink.parse(relatedEntityValue); - - String entityType = entityLink.getEntityType(); - EntityInterface entity = Entity.getEntity(entityLink, "*", Include.ALL); String fieldName = fieldNameExpr != null ? (String) fieldNameExpr.getValue(execution) : ""; - // Simple null check - if fieldValueExpr is null, treat as empty/null value String fieldValue = null; if (fieldValueExpr != null) { Object value = fieldValueExpr.getValue(execution); @@ -61,18 +52,19 @@ public void execute(DelegateExecution execution) { .map(ns -> (String) varHandler.getNamespacedVariable(ns, UPDATED_BY_VARIABLE)) .orElse(null); - // Apply the field change using shared utility with bot impersonation - // Note: fieldValue can be null to clear/remove a field value - // When actualUser is available, use it as the user and mark 'governance-bot' as impersonator - // Otherwise, use 'governance-bot' directly (for system-initiated workflows) - if (actualUser != null && !actualUser.isEmpty()) { - // User-initiated workflow: preserve actual user, mark bot as impersonator - EntityFieldUtils.setEntityField( - entity, entityType, actualUser, fieldName, fieldValue, true, "governance-bot"); - } else { - // System-initiated workflow: use governance-bot directly - EntityFieldUtils.setEntityField( - entity, entityType, "governance-bot", fieldName, fieldValue, true, null); + List entityList = getEntityList(inputNamespaceMap, varHandler); + for (String entityLinkStr : entityList) { + MessageParser.EntityLink entityLink = MessageParser.EntityLink.parse(entityLinkStr); + String entityType = entityLink.getEntityType(); + EntityInterface entity = Entity.getEntity(entityLink, "*", Include.ALL); + + if (actualUser != null && !actualUser.isEmpty()) { + EntityFieldUtils.setEntityField( + entity, entityType, actualUser, fieldName, fieldValue, true, "governance-bot"); + } else { + EntityFieldUtils.setEntityField( + entity, entityType, "governance-bot", fieldName, fieldValue, true, null); + } } } catch (Exception exc) { @@ -82,4 +74,18 @@ public void execute(DelegateExecution execution) { throw new BpmnError(WORKFLOW_RUNTIME_EXCEPTION, exc.getMessage()); } } + + @SuppressWarnings("unchecked") + private List getEntityList( + Map inputNamespaceMap, WorkflowVariableHandler varHandler) { + String entityListNamespace = (String) inputNamespaceMap.get(ENTITY_LIST_VARIABLE); + if (entityListNamespace != null) { + Object entityListObj = + varHandler.getNamespacedVariable(entityListNamespace, ENTITY_LIST_VARIABLE); + if (entityListObj instanceof List) { + return (List) entityListObj; + } + } + return List.of(); + } } diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/nodes/automatedTask/sink/SinkTaskDelegate.java b/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/nodes/automatedTask/sink/SinkTaskDelegate.java index 31146fc0bbaa..9fabdb90849d 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/nodes/automatedTask/sink/SinkTaskDelegate.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/nodes/automatedTask/sink/SinkTaskDelegate.java @@ -15,7 +15,6 @@ import static org.openmetadata.service.governance.workflows.Workflow.ENTITY_LIST_VARIABLE; import static org.openmetadata.service.governance.workflows.Workflow.EXCEPTION_VARIABLE; -import static org.openmetadata.service.governance.workflows.Workflow.RELATED_ENTITY_VARIABLE; import static org.openmetadata.service.governance.workflows.Workflow.RESULT_VARIABLE; import static org.openmetadata.service.governance.workflows.Workflow.WORKFLOW_RUNTIME_EXCEPTION; import static org.openmetadata.service.governance.workflows.WorkflowHandler.getProcessDefinitionKeyFromId; @@ -40,16 +39,9 @@ /** * Flowable delegate that executes sink operations within a workflow. * - *

This delegate supports two modes: - * - *

    - *
  • Single entity mode: Processes one entity at a time (event-based workflows) - *
  • Batch mode: Processes all entities in the batch at once (periodic batch workflows - * with batchMode=true in sink config) - *
- * - *

When batchMode is enabled in the sink config, the trigger automatically configures single - * execution mode (cardinality=1), ensuring only one workflow instance processes the entire batch. + *

Always reads from {@code ENTITY_LIST_VARIABLE}. When {@code batchMode=true} and the sink + * provider supports batching, all entities are written in a single batch call. Otherwise each + * entity is written individually. */ @Slf4j public class SinkTaskDelegate implements JavaDelegate { @@ -94,16 +86,7 @@ public void execute(DelegateExecution execution) { Map inputNamespaceMap = JsonUtils.readOrConvertValue(inputNamespaceMapExpr.getValue(execution), Map.class); - // Check if we have an entity list for batch processing - String entityListNamespace = inputNamespaceMap.get(ENTITY_LIST_VARIABLE); - List entityList = null; - if (entityListNamespace != null) { - Object entityListObj = - varHandler.getNamespacedVariable(entityListNamespace, ENTITY_LIST_VARIABLE); - if (entityListObj instanceof List) { - entityList = (List) entityListObj; - } - } + List entityList = getEntityList(inputNamespaceMap, varHandler); // Get the sink provider from registry sinkProvider = @@ -133,16 +116,10 @@ public void execute(DelegateExecution execution) { SinkResult result; - // Determine execution mode: batch or single entity - if (batchMode - && entityList != null - && !entityList.isEmpty() - && sinkProvider.supportsBatch()) { - // Batch mode: process all entities at once (single workflow instance) + if (batchMode && !entityList.isEmpty() && sinkProvider.supportsBatch()) { result = executeBatchMode(context, sinkProvider, entityList); } else { - // Single entity mode: process one entity - result = executeSingleEntityMode(context, sinkProvider, inputNamespaceMap, varHandler); + result = executeListMode(context, sinkProvider, entityList); } // Set output variables @@ -275,27 +252,58 @@ BatchAccumulator merge(SinkResult result, List fetchErrors .build(); } - /** Execute sink in single entity mode - process one entity at a time. */ - private SinkResult executeSingleEntityMode( - SinkContext context, - SinkProvider sinkProvider, - Map inputNamespaceMap, - WorkflowVariableHandler varHandler) { - - // Get entity from workflow context - String relatedEntityNamespace = inputNamespaceMap.get(RELATED_ENTITY_VARIABLE); - String relatedEntityValue = - (String) varHandler.getNamespacedVariable(relatedEntityNamespace, RELATED_ENTITY_VARIABLE); - - MessageParser.EntityLink entityLink = MessageParser.EntityLink.parse(relatedEntityValue); - EntityInterface entity = Entity.getEntity(entityLink, "*", Include.ALL); + private SinkResult executeListMode( + SinkContext context, SinkProvider sinkProvider, List entityList) { + int syncedCount = 0; + int failedCount = 0; + List syncedEntities = new ArrayList<>(); + List errors = new ArrayList<>(); + + for (String entityLinkStr : entityList) { + try { + MessageParser.EntityLink entityLink = MessageParser.EntityLink.parse(entityLinkStr); + EntityInterface entity = Entity.getEntity(entityLink, "*", Include.ALL); + SinkResult entityResult = sinkProvider.write(context, entity); + syncedCount += entityResult.getSyncedCount(); + failedCount += entityResult.getFailedCount(); + if (entityResult.getSyncedEntities() != null) { + syncedEntities.addAll(entityResult.getSyncedEntities()); + } + if (entityResult.getErrors() != null) { + errors.addAll(entityResult.getErrors()); + } + } catch (Exception e) { + LOG.error("[{}] Failed to process entity: {}", context.getWorkflowName(), entityLinkStr, e); + failedCount++; + errors.add( + SinkResult.SinkError.builder() + .entityFqn(entityLinkStr) + .errorMessage("Failed to process entity: " + e.getMessage()) + .cause(e) + .build()); + } + } - LOG.info( - "[{}] Executing single entity sink for: {}", - context.getWorkflowName(), - entity.getFullyQualifiedName()); + return SinkResult.builder() + .success(failedCount == 0) + .syncedCount(syncedCount) + .failedCount(failedCount) + .syncedEntities(syncedEntities.isEmpty() ? null : syncedEntities) + .errors(errors.isEmpty() ? null : errors) + .build(); + } - // Execute single entity write - return sinkProvider.write(context, entity); + @SuppressWarnings("unchecked") + private List getEntityList( + Map inputNamespaceMap, WorkflowVariableHandler varHandler) { + String entityListNamespace = inputNamespaceMap.get(ENTITY_LIST_VARIABLE); + if (entityListNamespace != null) { + Object entityListObj = + varHandler.getNamespacedVariable(entityListNamespace, ENTITY_LIST_VARIABLE); + if (entityListObj instanceof List) { + return (List) entityListObj; + } + } + return List.of(); } } diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/triggers/EventBasedEntityTrigger.java b/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/triggers/EventBasedEntityTrigger.java index 2d8d98cb6449..03325dbccd20 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/triggers/EventBasedEntityTrigger.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/triggers/EventBasedEntityTrigger.java @@ -1,5 +1,6 @@ package org.openmetadata.service.governance.workflows.elements.triggers; +import static org.openmetadata.service.governance.workflows.Workflow.ENTITY_LIST_VARIABLE; import static org.openmetadata.service.governance.workflows.Workflow.EXCEPTION_VARIABLE; import static org.openmetadata.service.governance.workflows.Workflow.GLOBAL_NAMESPACE; import static org.openmetadata.service.governance.workflows.Workflow.RELATED_ENTITY_VARIABLE; @@ -171,7 +172,11 @@ private CallActivity getWorkflowTrigger( List inputParameters = new ArrayList<>(); - // ALWAYS pass relatedEntity for backward compatibility + IOParameter entityListParam = new IOParameter(); + entityListParam.setSource(getNamespacedVariableName(GLOBAL_NAMESPACE, ENTITY_LIST_VARIABLE)); + entityListParam.setTarget(getNamespacedVariableName(GLOBAL_NAMESPACE, ENTITY_LIST_VARIABLE)); + inputParameters.add(entityListParam); + IOParameter relatedEntityParam = new IOParameter(); relatedEntityParam.setSource( getNamespacedVariableName(GLOBAL_NAMESPACE, RELATED_ENTITY_VARIABLE)); @@ -179,11 +184,9 @@ private CallActivity getWorkflowTrigger( getNamespacedVariableName(GLOBAL_NAMESPACE, RELATED_ENTITY_VARIABLE)); inputParameters.add(relatedEntityParam); - // Dynamically add any additional outputs declared in trigger - Eg updatedBy in - // GlossaryTermApprovalWorkflow for (String triggerOutput : triggerOutputs) { - if (!RELATED_ENTITY_VARIABLE.equals( - triggerOutput)) { // Skip relatedEntity (already added above) + if (!RELATED_ENTITY_VARIABLE.equals(triggerOutput) + && !ENTITY_LIST_VARIABLE.equals(triggerOutput)) { IOParameter inputParameter = new IOParameter(); inputParameter.setSource(getNamespacedVariableName(GLOBAL_NAMESPACE, triggerOutput)); inputParameter.setTarget(getNamespacedVariableName(GLOBAL_NAMESPACE, triggerOutput)); diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/triggers/PeriodicBatchEntityTrigger.java b/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/triggers/PeriodicBatchEntityTrigger.java index 7641f5e3f8c9..f88c66293a56 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/triggers/PeriodicBatchEntityTrigger.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/triggers/PeriodicBatchEntityTrigger.java @@ -1,6 +1,5 @@ package org.openmetadata.service.governance.workflows.elements.triggers; -import static org.openmetadata.service.governance.workflows.Workflow.ENTITY_LIST_VARIABLE; import static org.openmetadata.service.governance.workflows.Workflow.EXCEPTION_VARIABLE; import static org.openmetadata.service.governance.workflows.Workflow.GLOBAL_NAMESPACE; import static org.openmetadata.service.governance.workflows.Workflow.RELATED_ENTITY_VARIABLE; @@ -46,8 +45,8 @@ public class PeriodicBatchEntityTrigger implements TriggerInterface { @Getter private final String triggerWorkflowId; private final boolean singleExecutionMode; public static String HAS_FINISHED_VARIABLE = "hasFinished"; - public static String CARDINALITY_VARIABLE = "numberOfEntities"; public static String COLLECTION_VARIABLE = "entityList"; + private static final String NUMBER_OF_ENTITIES_VARIABLE = "numberOfEntities"; public PeriodicBatchEntityTrigger( String mainWorkflowName, @@ -58,12 +57,6 @@ public PeriodicBatchEntityTrigger( this.singleExecutionMode = singleExecutionMode; List entityTypes = getEntityTypesFromConfig(triggerDefinition.getConfig()); - if (singleExecutionMode) { - LOG.info( - "Workflow {} configured for single execution mode (batch sink detected)", - mainWorkflowName); - } - for (String entityType : entityTypes) { String processId = String.format("%s-%s", triggerWorkflowId, entityType); Process process = new Process(); @@ -84,7 +77,8 @@ public PeriodicBatchEntityTrigger( getFetchEntitiesTask(processId, entityType, triggerDefinition); process.addFlowElement(fetchEntitiesTask); - CallActivity workflowTrigger = getWorkflowTriggerCallActivity(processId, mainWorkflowName); + CallActivity workflowTrigger = + getWorkflowTriggerCallActivity(processId, mainWorkflowName, singleExecutionMode); process.addFlowElement(workflowTrigger); EndEvent endEvent = @@ -126,11 +120,9 @@ private TimerEventDefinition getTimerEventDefinition(AppSchedule schedule) { } private CallActivity getWorkflowTriggerCallActivity( - String triggerWorkflowId, String mainWorkflowName) { - // In single execution mode (batch sink detected), use cardinality = 1 to create - // only ONE workflow instance that processes all entities in a single batch. - // Otherwise, use numberOfEntities to create N parallel instances (one per entity). - String cardinality = singleExecutionMode ? "1" : String.format("${%s}", CARDINALITY_VARIABLE); + String triggerWorkflowId, String mainWorkflowName, boolean singleExecution) { + String cardinality = + singleExecution ? "1" : String.format("${%s}", NUMBER_OF_ENTITIES_VARIABLE); MultiInstanceLoopCharacteristics multiInstance = new MultiInstanceLoopCharacteristicsBuilder() @@ -146,21 +138,32 @@ private CallActivity getWorkflowTriggerCallActivity( .inheritBusinessKey(true) .build(); - IOParameter inputParameter = new IOParameter(); - inputParameter.setSource(RELATED_ENTITY_VARIABLE); - inputParameter.setTarget(getNamespacedVariableName(GLOBAL_NAMESPACE, RELATED_ENTITY_VARIABLE)); - - // Pass the entire entity list to the main workflow for batch sink operations - IOParameter entityListParameter = new IOParameter(); - entityListParameter.setSource(COLLECTION_VARIABLE); - entityListParameter.setTarget( - getNamespacedVariableName(GLOBAL_NAMESPACE, ENTITY_LIST_VARIABLE)); + IOParameter relatedEntityParameter = new IOParameter(); + relatedEntityParameter.setSource(RELATED_ENTITY_VARIABLE); + relatedEntityParameter.setTarget( + getNamespacedVariableName(GLOBAL_NAMESPACE, RELATED_ENTITY_VARIABLE)); IOParameter outputParameter = new IOParameter(); outputParameter.setSource(getNamespacedVariableName(GLOBAL_NAMESPACE, EXCEPTION_VARIABLE)); outputParameter.setTarget(EXCEPTION_VARIABLE); - workflowTrigger.setInParameters(List.of(inputParameter, entityListParameter)); + List inParameters; + if (singleExecution) { + IOParameter entityListParameter = new IOParameter(); + entityListParameter.setSource(COLLECTION_VARIABLE); + entityListParameter.setTarget( + getNamespacedVariableName(GLOBAL_NAMESPACE, COLLECTION_VARIABLE)); + inParameters = List.of(relatedEntityParameter, entityListParameter); + } else { + IOParameter entityListParameter = new IOParameter(); + entityListParameter.setSourceExpression( + String.format("${entityToListMap[%s]}", RELATED_ENTITY_VARIABLE)); + entityListParameter.setTarget( + getNamespacedVariableName(GLOBAL_NAMESPACE, COLLECTION_VARIABLE)); + inParameters = List.of(relatedEntityParameter, entityListParameter); + } + + workflowTrigger.setInParameters(inParameters); workflowTrigger.setOutParameters(List.of(outputParameter)); workflowTrigger.setLoopCharacteristics(multiInstance); diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/triggers/impl/FetchEntitiesImpl.java b/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/triggers/impl/FetchEntitiesImpl.java index 75f4d23921a1..4e8dd0bee616 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/triggers/impl/FetchEntitiesImpl.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/triggers/impl/FetchEntitiesImpl.java @@ -1,12 +1,13 @@ package org.openmetadata.service.governance.workflows.elements.triggers.impl; -import static org.openmetadata.service.governance.workflows.elements.triggers.PeriodicBatchEntityTrigger.CARDINALITY_VARIABLE; import static org.openmetadata.service.governance.workflows.elements.triggers.PeriodicBatchEntityTrigger.COLLECTION_VARIABLE; import static org.openmetadata.service.governance.workflows.elements.triggers.PeriodicBatchEntityTrigger.HAS_FINISHED_VARIABLE; import java.io.IOException; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.Optional; import java.util.Set; import lombok.extern.slf4j.Slf4j; @@ -27,6 +28,9 @@ public class FetchEntitiesImpl implements JavaDelegate { private static final Set ENTITIES_NEEDING_KEYWORD_SORT = Set.of("testCase", "user", "team"); + // Kept for backward compat: old BPMNs not yet redeployed may reference ${numberOfEntities} + private static final String CARDINALITY_VARIABLE = "numberOfEntities"; + private Expression entityTypesExpr; private Expression searchFilterExpr; private Expression batchSizeExpr; @@ -111,12 +115,17 @@ public void execute(DelegateExecution execution) { execution.setVariable("searchAfter", response.getLastDocumentsInBatch()); } - int cardinality = entityList.size(); boolean hasFinished = entityList.isEmpty(); - execution.setVariable(CARDINALITY_VARIABLE, cardinality); + Map> entityToListMap = new HashMap<>(); + for (String entity : entityList) { + entityToListMap.put(entity, List.of(entity)); + } + + execution.setVariable(CARDINALITY_VARIABLE, entityList.size()); execution.setVariable(HAS_FINISHED_VARIABLE, hasFinished); execution.setVariable(COLLECTION_VARIABLE, entityList); + execution.setVariable("entityToListMap", entityToListMap); } private String extractEntityTypeFromProcessKey(String processKey) { diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/migration/mysql/v1130/Migration.java b/openmetadata-service/src/main/java/org/openmetadata/service/migration/mysql/v1130/Migration.java index d146890469f6..d4d3d89f2d34 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/migration/mysql/v1130/Migration.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/migration/mysql/v1130/Migration.java @@ -15,5 +15,6 @@ public Migration(MigrationFile migrationFile) { @SneakyThrows public void runDataMigration() { MigrationUtil.updateOwnerChartFormulas(); + MigrationUtil.migrateWorkflowInputNamespaceMap(); } } diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/migration/postgres/v1130/Migration.java b/openmetadata-service/src/main/java/org/openmetadata/service/migration/postgres/v1130/Migration.java index 909ea509319d..1183ce6dd8df 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/migration/postgres/v1130/Migration.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/migration/postgres/v1130/Migration.java @@ -15,5 +15,6 @@ public Migration(MigrationFile migrationFile) { @SneakyThrows public void runDataMigration() { MigrationUtil.updateOwnerChartFormulas(); + MigrationUtil.migrateWorkflowInputNamespaceMap(); } } diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/migration/utils/v1130/MigrationUtil.java b/openmetadata-service/src/main/java/org/openmetadata/service/migration/utils/v1130/MigrationUtil.java index f5005e76f4b3..b5d882d825d4 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/migration/utils/v1130/MigrationUtil.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/migration/utils/v1130/MigrationUtil.java @@ -1,16 +1,37 @@ package org.openmetadata.service.migration.utils.v1130; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ArrayNode; +import com.fasterxml.jackson.databind.node.ObjectNode; +import java.util.List; +import java.util.Set; import lombok.extern.slf4j.Slf4j; import org.openmetadata.schema.dataInsight.custom.DataInsightCustomChart; +import org.openmetadata.schema.governance.workflows.WorkflowDefinition; +import org.openmetadata.schema.utils.JsonUtils; +import org.openmetadata.service.Entity; import org.openmetadata.service.jdbi3.DataInsightSystemChartRepository; +import org.openmetadata.service.jdbi3.ListFilter; +import org.openmetadata.service.jdbi3.WorkflowDefinitionRepository; import org.openmetadata.service.util.EntityUtil; @Slf4j public class MigrationUtil { private MigrationUtil() {} + private static final ObjectMapper MAPPER = new ObjectMapper(); + private static final String ADMIN_USER_NAME = "admin"; private static final String OLD_FIELD = "owners.name.keyword"; private static final String NEW_FIELD = "ownerName"; + private static final Set BATCH_NODE_SUBTYPES = + Set.of( + "checkEntityAttributesTask", + "checkChangeDescriptionTask", + "setEntityAttributeTask", + "rollbackEntityTask", + "sinkTask", + "dataCompletenessTask"); public static void updateOwnerChartFormulas() { DataInsightSystemChartRepository repository = new DataInsightSystemChartRepository(); @@ -48,4 +69,149 @@ public static void updateOwnerChartFormulas() { } } } + + /** + * Migrates all workflow definitions to support batch entity processing: + * + *

    + *
  1. Adds {@code entityList} to the trigger's output so validation passes. + *
  2. Adds {@code entityList} to each batch-capable automated task node's + * {@code inputNamespaceMap}, using the same namespace as {@code relatedEntity} (or + * {@code "global"} if absent). + *
+ * + *

Always calls {@link WorkflowDefinitionRepository#createOrUpdate} for every workflow, + * triggering BPMN redeployment with the new {@code loopCardinality("1")} structure even when no + * JSON changes were needed. + * + *

The migration is idempotent — safe to run multiple times. + */ + public static void migrateWorkflowInputNamespaceMap() { + LOG.info("Starting v1130 migration: adding entityList to workflow node inputNamespaceMaps"); + + WorkflowDefinitionRepository repository = + (WorkflowDefinitionRepository) Entity.getEntityRepository(Entity.WORKFLOW_DEFINITION); + + List allWorkflows = + repository.listAll(EntityUtil.Fields.EMPTY_FIELDS, new ListFilter()); + + int totalRedeployed = 0; + for (WorkflowDefinition workflow : allWorkflows) { + try { + String originalJson = JsonUtils.pojoToJson(workflow); + JsonNode originalNode = MAPPER.readTree(originalJson); + JsonNode migrated = migrateWorkflowJson(originalNode); + + WorkflowDefinition toSave = + migrated != originalNode + ? JsonUtils.readValue(MAPPER.writeValueAsString(migrated), WorkflowDefinition.class) + : workflow; + + // Always call createOrUpdate to trigger BPMN redeployment with new loopCardinality + repository.createOrUpdate(null, toSave, ADMIN_USER_NAME); + totalRedeployed++; + LOG.debug("Redeployed workflow: {}", workflow.getFullyQualifiedName()); + } catch (Exception e) { + LOG.error( + "Error migrating workflow '{}': {}", + workflow.getFullyQualifiedName(), + e.getMessage(), + e); + } + } + + LOG.info( + "Completed v1130 migration: {} workflow definitions redeployed with entityList support", + totalRedeployed); + } + + private static JsonNode migrateWorkflowJson(JsonNode root) { + if (root == null || !root.isObject()) { + return root; + } + + boolean changed = false; + ObjectNode result = ((ObjectNode) root).deepCopy(); + + JsonNode triggerNode = result.get("trigger"); + if (triggerNode != null && triggerNode.isObject()) { + JsonNode outputNode = triggerNode.get("output"); + if (outputNode != null && outputNode.isArray()) { + boolean hasEntityList = false; + for (JsonNode item : outputNode) { + if ("entityList".equals(item.asText())) { + hasEntityList = true; + break; + } + } + if (!hasEntityList) { + ObjectNode newTrigger = ((ObjectNode) triggerNode).deepCopy(); + ArrayNode newOutput = MAPPER.createArrayNode(); + newOutput.add("entityList"); + for (JsonNode item : outputNode) { + newOutput.add(item); + } + newTrigger.set("output", newOutput); + result.set("trigger", newTrigger); + changed = true; + } + } + } + + JsonNode nodesNode = result.get("nodes"); + if (nodesNode != null && nodesNode.isArray()) { + ArrayNode newNodes = MAPPER.createArrayNode(); + boolean nodesChanged = false; + for (JsonNode nodeElement : nodesNode) { + if (nodeElement.isObject()) { + JsonNode subTypeNode = nodeElement.get("subType"); + String subType = subTypeNode != null ? subTypeNode.asText() : ""; + if (BATCH_NODE_SUBTYPES.contains(subType)) { + JsonNode migratedNode = addEntityListToNamespaceMap(nodeElement); + newNodes.add(migratedNode); + if (migratedNode != nodeElement) { + nodesChanged = true; + } + } else { + newNodes.add(nodeElement); + } + } else { + newNodes.add(nodeElement); + } + } + if (nodesChanged) { + result.set("nodes", newNodes); + changed = true; + } + } + + return changed ? result : root; + } + + private static JsonNode addEntityListToNamespaceMap(JsonNode nodeObj) { + JsonNode inputNamespaceMapNode = nodeObj.get("inputNamespaceMap"); + if (inputNamespaceMapNode == null || !inputNamespaceMapNode.isObject()) { + return nodeObj; + } + ObjectNode inputNamespaceMap = (ObjectNode) inputNamespaceMapNode; + + boolean hasEntityList = inputNamespaceMap.has("entityList"); + boolean hasRelatedEntity = inputNamespaceMap.has("relatedEntity"); + + if (hasEntityList && !hasRelatedEntity) { + return nodeObj; + } + + ObjectNode newInputNamespaceMap = inputNamespaceMap.deepCopy(); + if (!hasEntityList) { + String namespace = + hasRelatedEntity ? inputNamespaceMap.get("relatedEntity").asText() : "global"; + newInputNamespaceMap.put("entityList", namespace); + } + newInputNamespaceMap.remove("relatedEntity"); + + ObjectNode result = ((ObjectNode) nodeObj).deepCopy(); + result.set("inputNamespaceMap", newInputNamespaceMap); + return result; + } } diff --git a/openmetadata-service/src/main/resources/json/data/governance/workflows/GlossaryApprovalWorkflow.json b/openmetadata-service/src/main/resources/json/data/governance/workflows/GlossaryApprovalWorkflow.json index 48898c70ea6c..0e6d11c10908 100644 --- a/openmetadata-service/src/main/resources/json/data/governance/workflows/GlossaryApprovalWorkflow.json +++ b/openmetadata-service/src/main/resources/json/data/governance/workflows/GlossaryApprovalWorkflow.json @@ -9,13 +9,22 @@ "trigger": { "type": "eventBasedEntity", "config": { - "entityTypes": ["glossaryTerm"], - "events": ["Created", "Updated"], + "entityTypes": [ + "glossaryTerm" + ], + "events": [ + "Created", + "Updated" + ], "exclude": [], "include": [], "filter": {} }, - "output": ["relatedEntity", "updatedBy"] + "output": [ + "entityList", + "relatedEntity", + "updatedBy" + ] }, "nodes": [ { @@ -63,7 +72,7 @@ "rules": "{\"and\":[{\"isReviewer\":{\"var\":\"updatedBy\"}}]}" }, "inputNamespaceMap": { - "relatedEntity": "global" + "entityList": "global" } }, { @@ -76,7 +85,7 @@ "fieldValue": "Approved" }, "inputNamespaceMap": { - "relatedEntity": "global", + "entityList": "global", "updatedBy": "global" } }, @@ -89,7 +98,7 @@ "rules": "{\"and\":[{\"some\":[{\"var\":\"reviewers\"},{\"!=\":[{\"var\":\"fullyQualifiedName\"},null]}]}]}" }, "inputNamespaceMap": { - "relatedEntity": "global" + "entityList": "global" } }, { @@ -101,7 +110,7 @@ "rules": "{\"and\":[{\"!!\":[{\"var\":\"description\"}]}]}" }, "inputNamespaceMap": { - "relatedEntity": "global" + "entityList": "global" } }, { @@ -114,7 +123,7 @@ "fieldValue": "In Review" }, "inputNamespaceMap": { - "relatedEntity": "global" + "entityList": "global" } }, { @@ -127,7 +136,7 @@ "fieldValue": "Draft" }, "inputNamespaceMap": { - "relatedEntity": "global" + "entityList": "global" } }, { @@ -157,7 +166,7 @@ "rules": "{\"and\":[{\"==\":[{\"var\":\"version\"},0.1]}]}" }, "inputNamespaceMap": { - "relatedEntity": "global" + "entityList": "global" } }, { @@ -170,7 +179,7 @@ "fieldValue": "In Review" }, "inputNamespaceMap": { - "relatedEntity": "global", + "entityList": "global", "updatedBy": "global" } }, @@ -191,7 +200,9 @@ "inputNamespaceMap": { "relatedEntity": "global" }, - "output": ["updatedBy"] + "output": [ + "updatedBy" + ] }, { "type": "automatedTask", @@ -200,7 +211,7 @@ "displayName": "Rollback Glossary Term Changes", "config": {}, "inputNamespaceMap": { - "relatedEntity": "global", + "entityList": "global", "updatedBy": "ApprovalForUpdates" } }, @@ -214,7 +225,7 @@ "fieldValue": "Approved" }, "inputNamespaceMap": { - "relatedEntity": "global", + "entityList": "global", "updatedBy": "ApprovalForUpdates" } }, @@ -240,7 +251,7 @@ "fieldValue": "Approved" }, "inputNamespaceMap": { - "relatedEntity": "global", + "entityList": "global", "updatedBy": "ApproveGlossaryTerm" } }, @@ -254,7 +265,7 @@ "fieldValue": "Approved" }, "inputNamespaceMap": { - "relatedEntity": "global" + "entityList": "global" } }, { @@ -267,7 +278,7 @@ "fieldValue": "Rejected" }, "inputNamespaceMap": { - "relatedEntity": "global", + "entityList": "global", "updatedBy": "ApproveGlossaryTerm" } } @@ -374,4 +385,4 @@ "to": "RejectedEnd" } ] -} \ No newline at end of file +} diff --git a/openmetadata-spec/src/main/resources/json/schema/governance/workflows/elements/nodes/automatedTask/checkChangeDescriptionTask.json b/openmetadata-spec/src/main/resources/json/schema/governance/workflows/elements/nodes/automatedTask/checkChangeDescriptionTask.json index 5f84498ba9fc..d20698349973 100644 --- a/openmetadata-spec/src/main/resources/json/schema/governance/workflows/elements/nodes/automatedTask/checkChangeDescriptionTask.json +++ b/openmetadata-spec/src/main/resources/json/schema/governance/workflows/elements/nodes/automatedTask/checkChangeDescriptionTask.json @@ -62,7 +62,7 @@ "input": { "type": "array", "items": { "type": "string" }, - "default": ["relatedEntity"], + "default": ["entityList"], "additionalItems": false, "minItems": 1, "maxItems": 1 @@ -70,13 +70,19 @@ "inputNamespaceMap": { "type": "object", "properties": { - "relatedEntity": { + "entityList": { "type": "string", "default": "global" } }, "additionalProperties": false, - "required": ["relatedEntity"] + "required": ["entityList"] + }, + "output": { + "type": "array", + "items": { "type": "string" }, + "default": ["entityList", "falseEntityList"], + "additionalItems": false }, "branches": { "type": "array", diff --git a/openmetadata-spec/src/main/resources/json/schema/governance/workflows/elements/nodes/automatedTask/checkEntityAttributesTask.json b/openmetadata-spec/src/main/resources/json/schema/governance/workflows/elements/nodes/automatedTask/checkEntityAttributesTask.json index a881cc80fa87..7f891ecfb7d3 100644 --- a/openmetadata-spec/src/main/resources/json/schema/governance/workflows/elements/nodes/automatedTask/checkEntityAttributesTask.json +++ b/openmetadata-spec/src/main/resources/json/schema/governance/workflows/elements/nodes/automatedTask/checkEntityAttributesTask.json @@ -49,7 +49,7 @@ "input": { "type": "array", "items": { "type": "string" }, - "default": ["relatedEntity"], + "default": ["entityList"], "additionalItems": false, "minItems": 1, "maxItems": 1 @@ -57,13 +57,20 @@ "inputNamespaceMap": { "type": "object", "properties": { + "entityList": { + "type": "string" + }, "relatedEntity": { - "type": "string", - "default": "global" + "type": "string" } }, - "additionalProperties": false, - "required": ["relatedEntity"] + "additionalProperties": false + }, + "output": { + "type": "array", + "items": { "type": "string" }, + "default": ["entityList", "falseEntityList"], + "additionalItems": false }, "branches": { "type": "array", diff --git a/openmetadata-spec/src/main/resources/json/schema/governance/workflows/elements/nodes/automatedTask/dataCompletenessTask.json b/openmetadata-spec/src/main/resources/json/schema/governance/workflows/elements/nodes/automatedTask/dataCompletenessTask.json index 2f3e2b79e945..4a8ddd90c38e 100644 --- a/openmetadata-spec/src/main/resources/json/schema/governance/workflows/elements/nodes/automatedTask/dataCompletenessTask.json +++ b/openmetadata-spec/src/main/resources/json/schema/governance/workflows/elements/nodes/automatedTask/dataCompletenessTask.json @@ -89,10 +89,9 @@ "input": { "type": "array", "items": { "type": "string" }, - "default": ["relatedEntity"], + "default": ["relatedEntity", "entityList"], "additionalItems": false, - "minItems": 1, - "maxItems": 1 + "minItems": 1 }, "inputNamespaceMap": { "type": "object", @@ -100,6 +99,10 @@ "relatedEntity": { "type": "string", "default": "global" + }, + "entityList": { + "type": "string", + "default": "global" } }, "additionalProperties": false, diff --git a/openmetadata-spec/src/main/resources/json/schema/governance/workflows/elements/nodes/automatedTask/rollbackEntityTask.json b/openmetadata-spec/src/main/resources/json/schema/governance/workflows/elements/nodes/automatedTask/rollbackEntityTask.json index 1de4c3d1ef56..049b76e81326 100644 --- a/openmetadata-spec/src/main/resources/json/schema/governance/workflows/elements/nodes/automatedTask/rollbackEntityTask.json +++ b/openmetadata-spec/src/main/resources/json/schema/governance/workflows/elements/nodes/automatedTask/rollbackEntityTask.json @@ -41,15 +41,15 @@ "input": { "type": "array", "items": { "type": "string" }, - "default": ["relatedEntity", "updatedBy"], + "default": ["entityList", "updatedBy"], "additionalItems": false, - "minItems": 2, + "minItems": 1, "maxItems": 2 }, "inputNamespaceMap": { "type": "object", "properties": { - "relatedEntity": { + "entityList": { "type": "string", "default": "global" }, @@ -59,7 +59,7 @@ } }, "additionalProperties": false, - "required": ["relatedEntity"] + "required": ["entityList"] }, "output": { "type": "array", diff --git a/openmetadata-spec/src/main/resources/json/schema/governance/workflows/elements/nodes/automatedTask/setEntityAttributeTask.json b/openmetadata-spec/src/main/resources/json/schema/governance/workflows/elements/nodes/automatedTask/setEntityAttributeTask.json index 63176a899790..f97e5b642d8c 100644 --- a/openmetadata-spec/src/main/resources/json/schema/governance/workflows/elements/nodes/automatedTask/setEntityAttributeTask.json +++ b/openmetadata-spec/src/main/resources/json/schema/governance/workflows/elements/nodes/automatedTask/setEntityAttributeTask.json @@ -58,15 +58,15 @@ "input": { "type": "array", "items": { "type": "string" }, - "default": ["relatedEntity", "updatedBy"], + "default": ["entityList", "updatedBy"], "additionalItems": false, - "minItems": 2, + "minItems": 1, "maxItems": 2 }, "inputNamespaceMap": { "type": "object", "properties": { - "relatedEntity": { + "entityList": { "type": "string", "default": "global" }, @@ -76,7 +76,7 @@ } }, "additionalProperties": false, - "required": ["relatedEntity"] + "required": ["entityList"] }, "output": { "type": "array", diff --git a/openmetadata-spec/src/main/resources/json/schema/governance/workflows/elements/nodes/automatedTask/sinkTask.json b/openmetadata-spec/src/main/resources/json/schema/governance/workflows/elements/nodes/automatedTask/sinkTask.json index ced047d92c4d..5450fa225d82 100644 --- a/openmetadata-spec/src/main/resources/json/schema/governance/workflows/elements/nodes/automatedTask/sinkTask.json +++ b/openmetadata-spec/src/main/resources/json/schema/governance/workflows/elements/nodes/automatedTask/sinkTask.json @@ -143,14 +143,14 @@ "input": { "type": "array", "items": { "type": "string" }, - "default": ["relatedEntity", "updatedBy"], + "default": ["entityList", "updatedBy"], "additionalItems": false, "minItems": 1 }, "inputNamespaceMap": { "type": "object", "properties": { - "relatedEntity": { + "entityList": { "type": "string", "default": "global" }, @@ -160,7 +160,7 @@ } }, "additionalProperties": false, - "required": ["relatedEntity"] + "required": ["entityList"] }, "output": { "title": "Output Variables", diff --git a/openmetadata-spec/src/main/resources/json/schema/governance/workflows/elements/triggers/eventBasedEntityTrigger.json b/openmetadata-spec/src/main/resources/json/schema/governance/workflows/elements/triggers/eventBasedEntityTrigger.json index 166c91562f86..79974fa836d4 100644 --- a/openmetadata-spec/src/main/resources/json/schema/governance/workflows/elements/triggers/eventBasedEntityTrigger.json +++ b/openmetadata-spec/src/main/resources/json/schema/governance/workflows/elements/triggers/eventBasedEntityTrigger.json @@ -111,11 +111,12 @@ "type": "string" }, "default": [ + "entityList", "relatedEntity" ], "additionalItems": false, "minItems": 1, - "maxItems": 1, + "maxItems": 2, "uniqueItems": true } }, diff --git a/openmetadata-spec/src/main/resources/json/schema/governance/workflows/elements/triggers/periodicBatchEntityTrigger.json b/openmetadata-spec/src/main/resources/json/schema/governance/workflows/elements/triggers/periodicBatchEntityTrigger.json index eb5f9ab995f5..f7366ff3b3b9 100644 --- a/openmetadata-spec/src/main/resources/json/schema/governance/workflows/elements/triggers/periodicBatchEntityTrigger.json +++ b/openmetadata-spec/src/main/resources/json/schema/governance/workflows/elements/triggers/periodicBatchEntityTrigger.json @@ -89,11 +89,12 @@ "type": "string" }, "default": [ + "entityList", "relatedEntity" ], "additionalItems": false, "minItems": 1, - "maxItems": 1, + "maxItems": 2, "uniqueItems": true } }, From bc61d9e0b4733cf6f3667bc10db71fb1dbcf1814 Mon Sep 17 00:00:00 2001 From: Ram Narayan Balaji Date: Fri, 20 Mar 2026 15:11:15 +0530 Subject: [PATCH 2/3] Initial Commit - Change Event Offset Governance Workflows --- .../native/1.13.0/mysql/schemaChanges.sql | 8 + .../native/1.13.0/postgres/schemaChanges.sql | 8 + .../tests/WorkflowDefinitionResourceIT.java | 14 + .../workflows/elements/TriggerFactory.java | 3 +- .../triggers/PeriodicBatchEntityTrigger.java | 107 +++---- .../impl/CommitChangeEventOffsetImpl.java | 73 +++++ .../triggers/impl/FetchChangeEventsImpl.java | 288 ++++++++++++++++++ .../triggers/impl/FetchEntitiesImpl.java | 6 + .../service/jdbi3/CollectionDAO.java | 28 ++ .../PeriodicBatchEntityTriggerTest.java | 47 +-- .../triggers/periodicBatchEntityTrigger.json | 4 +- 11 files changed, 511 insertions(+), 75 deletions(-) create mode 100644 openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/triggers/impl/CommitChangeEventOffsetImpl.java create mode 100644 openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/triggers/impl/FetchChangeEventsImpl.java diff --git a/bootstrap/sql/migrations/native/1.13.0/mysql/schemaChanges.sql b/bootstrap/sql/migrations/native/1.13.0/mysql/schemaChanges.sql index 306703ef6fb2..fda33c094d63 100644 --- a/bootstrap/sql/migrations/native/1.13.0/mysql/schemaChanges.sql +++ b/bootstrap/sql/migrations/native/1.13.0/mysql/schemaChanges.sql @@ -84,3 +84,11 @@ SELECT ue.id, re.id, 'user', 'role', 10 FROM user_entity ue, role_entity re WHERE ue.name = 'mcpapplicationbot' AND re.name = 'ApplicationBotImpersonationRole'; + +-- Add composite index on change_event(entityType, offset) for efficient incremental +-- change-event-driven workflow processing (filters by entityType + offset range). +CREATE INDEX IF NOT EXISTS idx_change_event_entity_type_offset ON change_event (entityType, `offset`); + +-- Widen change_event_consumers.id from VARCHAR(36) to VARCHAR(768) to support workflow consumer IDs +-- which follow the pattern {workflowFQN}Trigger-{entityType} and can exceed 36 characters. +ALTER TABLE change_event_consumers MODIFY COLUMN id VARCHAR(768) NOT NULL; diff --git a/bootstrap/sql/migrations/native/1.13.0/postgres/schemaChanges.sql b/bootstrap/sql/migrations/native/1.13.0/postgres/schemaChanges.sql index 6fd046c7924b..e510e36f0179 100644 --- a/bootstrap/sql/migrations/native/1.13.0/postgres/schemaChanges.sql +++ b/bootstrap/sql/migrations/native/1.13.0/postgres/schemaChanges.sql @@ -96,3 +96,11 @@ FROM user_entity ue, role_entity re WHERE ue.name = 'mcpapplicationbot' AND re.name = 'ApplicationBotImpersonationRole' ON CONFLICT DO NOTHING; + +-- Add composite index on change_event(entityType, offset) for efficient incremental +-- change-event-driven workflow processing (filters by entityType + offset range). +CREATE INDEX IF NOT EXISTS idx_change_event_entity_type_offset ON change_event (entitytype, "offset"); + +-- Widen change_event_consumers.id from VARCHAR(36) to VARCHAR(768) to support workflow consumer IDs +-- which follow the pattern {workflowFQN}Trigger-{entityType} and can exceed 36 characters. +ALTER TABLE change_event_consumers ALTER COLUMN id TYPE VARCHAR(768); diff --git a/openmetadata-integration-tests/src/test/java/org/openmetadata/it/tests/WorkflowDefinitionResourceIT.java b/openmetadata-integration-tests/src/test/java/org/openmetadata/it/tests/WorkflowDefinitionResourceIT.java index 341146a6a020..31bb2197bfe3 100644 --- a/openmetadata-integration-tests/src/test/java/org/openmetadata/it/tests/WorkflowDefinitionResourceIT.java +++ b/openmetadata-integration-tests/src/test/java/org/openmetadata/it/tests/WorkflowDefinitionResourceIT.java @@ -2409,6 +2409,20 @@ void test_PeriodicBatchWorkflowEntityFiltering(TestNamespace ns, TestInfo test) LOG.info("Multi-entity workflow processed all entities successfully"); + // Create new change events for databases so the second incremental run has events to process. + // (The first run committed an offset; the second run only sees events > that offset.) + JsonNode db1Patch = + MAPPER.readTree( + "[{\"op\":\"replace\",\"path\":\"/description\",\"value\":\"Pre-second-run update for database 1\"}]"); + client.databases().patch(database1.getId(), db1Patch); + + JsonNode db2Patch = + MAPPER.readTree( + "[{\"op\":\"replace\",\"path\":\"/description\",\"value\":\"Pre-second-run update for database 2\"}]"); + client.databases().patch(database2.getId(), db2Patch); + + LOG.info("Patched databases to create new change events for the second run"); + // Now modify workflow to only process database entities Map singleEntityTriggerConfig = new HashMap<>(); singleEntityTriggerConfig.put("entityTypes", List.of("database")); diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/TriggerFactory.java b/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/TriggerFactory.java index 167e9658b5c8..a740b3ec7b95 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/TriggerFactory.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/TriggerFactory.java @@ -24,7 +24,8 @@ public static TriggerInterface createTrigger(WorkflowDefinition workflow) { workflow.getName(), triggerWorkflowId, (PeriodicBatchEntityTriggerDefinition) workflow.getTrigger(), - false); + false, + workflow.getFullyQualifiedName()); }; } diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/triggers/PeriodicBatchEntityTrigger.java b/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/triggers/PeriodicBatchEntityTrigger.java index f88c66293a56..c4a43d5d8547 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/triggers/PeriodicBatchEntityTrigger.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/triggers/PeriodicBatchEntityTrigger.java @@ -29,7 +29,8 @@ import org.openmetadata.schema.utils.JsonUtils; import org.openmetadata.service.apps.scheduler.AppScheduler; import org.openmetadata.service.governance.workflows.elements.TriggerInterface; -import org.openmetadata.service.governance.workflows.elements.triggers.impl.FetchEntitiesImpl; +import org.openmetadata.service.governance.workflows.elements.triggers.impl.CommitChangeEventOffsetImpl; +import org.openmetadata.service.governance.workflows.elements.triggers.impl.FetchChangeEventsImpl; import org.openmetadata.service.governance.workflows.flowable.builders.CallActivityBuilder; import org.openmetadata.service.governance.workflows.flowable.builders.EndEventBuilder; import org.openmetadata.service.governance.workflows.flowable.builders.FieldExtensionBuilder; @@ -52,7 +53,8 @@ public PeriodicBatchEntityTrigger( String mainWorkflowName, String triggerWorkflowId, PeriodicBatchEntityTriggerDefinition triggerDefinition, - boolean singleExecutionMode) { + boolean singleExecutionMode, + String workflowFqn) { this.triggerWorkflowId = triggerWorkflowId; this.singleExecutionMode = singleExecutionMode; List entityTypes = getEntityTypesFromConfig(triggerDefinition.getConfig()); @@ -73,10 +75,6 @@ public PeriodicBatchEntityTrigger( oTimerDefinition.ifPresent(startEvent::addEventDefinition); process.addFlowElement(startEvent); - ServiceTask fetchEntitiesTask = - getFetchEntitiesTask(processId, entityType, triggerDefinition); - process.addFlowElement(fetchEntitiesTask); - CallActivity workflowTrigger = getWorkflowTriggerCallActivity(processId, mainWorkflowName, singleExecutionMode); process.addFlowElement(workflowTrigger); @@ -85,21 +83,24 @@ public PeriodicBatchEntityTrigger( new EndEventBuilder().id(getFlowableElementId(processId, "endEvent")).build(); process.addFlowElement(endEvent); - SequenceFlow finished = new SequenceFlow(fetchEntitiesTask.getId(), endEvent.getId()); + ServiceTask fetchTask = + getFetchChangeEventsTask(processId, entityType, triggerDefinition, workflowFqn); + process.addFlowElement(fetchTask); + + ServiceTask commitTask = getCommitOffsetTask(processId, entityType, workflowFqn); + process.addFlowElement(commitTask); + + SequenceFlow finished = new SequenceFlow(fetchTask.getId(), commitTask.getId()); finished.setConditionExpression(String.format("${%s}", HAS_FINISHED_VARIABLE)); - SequenceFlow notFinished = - new SequenceFlow(fetchEntitiesTask.getId(), workflowTrigger.getId()); + SequenceFlow notFinished = new SequenceFlow(fetchTask.getId(), workflowTrigger.getId()); notFinished.setConditionExpression(String.format("${!%s}", HAS_FINISHED_VARIABLE)); - // Start -> Fetch Entities - process.addFlowElement(new SequenceFlow(startEvent.getId(), fetchEntitiesTask.getId())); - // Fetch Entities -> End + process.addFlowElement(new SequenceFlow(startEvent.getId(), fetchTask.getId())); process.addFlowElement(finished); - // Fetch Entities -> WorkflowTrigger process.addFlowElement(notFinished); - // WorkflowTrigger -> Fetch Entities (Loop Back to get next batch) - process.addFlowElement(new SequenceFlow(workflowTrigger.getId(), fetchEntitiesTask.getId())); + process.addFlowElement(new SequenceFlow(workflowTrigger.getId(), fetchTask.getId())); + process.addFlowElement(new SequenceFlow(commitTask.getId(), endEvent.getId())); processes.add(process); } @@ -170,71 +171,71 @@ private CallActivity getWorkflowTriggerCallActivity( return workflowTrigger; } - private ServiceTask getFetchEntitiesTask( + private ServiceTask getFetchChangeEventsTask( String workflowTriggerId, String entityType, - PeriodicBatchEntityTriggerDefinition triggerDefinition) { + PeriodicBatchEntityTriggerDefinition triggerDefinition, + String workflowFqn) { FieldExtension entityTypesExpr = new FieldExtensionBuilder().fieldName("entityTypesExpr").fieldValue(entityType).build(); - // Extract entity-specific filter based on the filter configuration - String entitySpecificFilter = - extractEntitySpecificFilter(triggerDefinition.getConfig().getFilters(), entityType); - - FieldExtension searchFilterExpr = - new FieldExtensionBuilder(false) - .fieldName("searchFilterExpr") - .fieldValue(entitySpecificFilter) - .build(); - FieldExtension batchSizeExpr = new FieldExtensionBuilder() .fieldName("batchSizeExpr") .fieldValue(String.valueOf(triggerDefinition.getConfig().getBatchSize())) .build(); + FieldExtension workflowFqnExpr = + new FieldExtensionBuilder() + .fieldName("workflowFqnExpr") + .fieldValue(workflowFqn != null ? workflowFqn : "") + .build(); + ServiceTask serviceTask = new ServiceTaskBuilder() - .id(getFlowableElementId(workflowTriggerId, "fetchEntityTask")) - .implementation(FetchEntitiesImpl.class.getName()) + .id(getFlowableElementId(workflowTriggerId, "fetchChangeEventsTask")) + .implementation(FetchChangeEventsImpl.class.getName()) .build(); serviceTask.getFieldExtensions().add(entityTypesExpr); - serviceTask.getFieldExtensions().add(searchFilterExpr); serviceTask.getFieldExtensions().add(batchSizeExpr); + serviceTask.getFieldExtensions().add(workflowFqnExpr); + + Object filters = triggerDefinition.getConfig().getFilters(); + if (filters != null) { + String filtersJson = + filters instanceof String ? (String) filters : JsonUtils.pojoToJson(filters); + FieldExtension searchFilterExpr = + new FieldExtensionBuilder().fieldName("searchFilterExpr").fieldValue(filtersJson).build(); + serviceTask.getFieldExtensions().add(searchFilterExpr); + } serviceTask.setAsynchronousLeave(true); return serviceTask; } - private String extractEntitySpecificFilter(Object filtersObj, String entityType) { - if (filtersObj == null) { - return null; - } - - // Handle map format with entity-specific filters (values are JSON strings) - if (filtersObj instanceof Map) { - @SuppressWarnings("unchecked") - Map filterMap = (Map) filtersObj; + private ServiceTask getCommitOffsetTask( + String workflowTriggerId, String entityType, String workflowFqn) { + FieldExtension workflowFqnExpr = + new FieldExtensionBuilder() + .fieldName("workflowFqnExpr") + .fieldValue(workflowFqn != null ? workflowFqn : "") + .build(); - // First check for entity-specific filter - String specificFilter = filterMap.get(entityType); - if (specificFilter != null && !specificFilter.isEmpty()) { - return specificFilter; - } + FieldExtension entityTypeExpr = + new FieldExtensionBuilder().fieldName("entityTypeExpr").fieldValue(entityType).build(); - // Fall back to default filter if no specific filter found - String defaultFilter = filterMap.get("default"); - if (defaultFilter != null && !defaultFilter.isEmpty()) { - return defaultFilter; - } + ServiceTask serviceTask = + new ServiceTaskBuilder() + .id(getFlowableElementId(workflowTriggerId, "commitOffsetTask")) + .implementation(CommitChangeEventOffsetImpl.class.getName()) + .build(); - return null; - } + serviceTask.getFieldExtensions().add(workflowFqnExpr); + serviceTask.getFieldExtensions().add(entityTypeExpr); - // If it's not a map, try to convert to string - return JsonUtils.pojoToJson(filtersObj); + return serviceTask; } private List getEntityTypesFromConfig(Object configObj) { diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/triggers/impl/CommitChangeEventOffsetImpl.java b/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/triggers/impl/CommitChangeEventOffsetImpl.java new file mode 100644 index 000000000000..992730ddb3e9 --- /dev/null +++ b/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/triggers/impl/CommitChangeEventOffsetImpl.java @@ -0,0 +1,73 @@ +package org.openmetadata.service.governance.workflows.elements.triggers.impl; + +import static org.openmetadata.service.apps.bundles.changeEvent.AbstractEventConsumer.OFFSET_EXTENSION; +import static org.openmetadata.service.governance.workflows.elements.triggers.impl.FetchChangeEventsImpl.MAX_PROCESSED_OFFSET_VARIABLE; +import static org.openmetadata.service.governance.workflows.elements.triggers.impl.FetchChangeEventsImpl.buildConsumerId; + +import lombok.extern.slf4j.Slf4j; +import org.flowable.common.engine.api.delegate.Expression; +import org.flowable.engine.delegate.DelegateExecution; +import org.flowable.engine.delegate.JavaDelegate; +import org.openmetadata.schema.entity.events.EventSubscriptionOffset; +import org.openmetadata.schema.utils.JsonUtils; +import org.openmetadata.service.Entity; + +@Slf4j +public class CommitChangeEventOffsetImpl implements JavaDelegate { + + private static final String OFFSET_JSON_SCHEMA = "eventSubscriptionOffset"; + + private Expression workflowFqnExpr; + private Expression entityTypeExpr; + + @Override + public void execute(DelegateExecution execution) { + String workflowFqn = (String) workflowFqnExpr.getValue(execution); + String entityType = (String) entityTypeExpr.getValue(execution); + String consumerId = buildConsumerId(workflowFqn, entityType); + + Long maxProcessedOffset = (Long) execution.getVariable(MAX_PROCESSED_OFFSET_VARIABLE); + if (maxProcessedOffset == null) { + LOG.debug( + "No events processed for workflow '{}' entity type '{}'. Offset not updated.", + workflowFqn, + entityType); + return; + } + + String existingJson = + Entity.getCollectionDAO() + .eventSubscriptionDAO() + .getSubscriberExtension(consumerId, OFFSET_EXTENSION); + + if (existingJson != null) { + EventSubscriptionOffset existing = + JsonUtils.readValue(existingJson, EventSubscriptionOffset.class); + if (existing.getCurrentOffset() >= maxProcessedOffset) { + LOG.debug( + "Stored offset {} >= processed offset {} for workflow '{}'. No update needed.", + existing.getCurrentOffset(), + maxProcessedOffset, + workflowFqn); + return; + } + } + + EventSubscriptionOffset newOffset = + new EventSubscriptionOffset() + .withStartingOffset(maxProcessedOffset) + .withCurrentOffset(maxProcessedOffset) + .withTimestamp(System.currentTimeMillis()); + + Entity.getCollectionDAO() + .eventSubscriptionDAO() + .upsertSubscriberExtension( + consumerId, OFFSET_EXTENSION, OFFSET_JSON_SCHEMA, JsonUtils.pojoToJson(newOffset)); + + LOG.info( + "Committed offset {} for workflow '{}' entity type '{}'.", + maxProcessedOffset, + workflowFqn, + entityType); + } +} diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/triggers/impl/FetchChangeEventsImpl.java b/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/triggers/impl/FetchChangeEventsImpl.java new file mode 100644 index 000000000000..6ed3303bde2f --- /dev/null +++ b/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/triggers/impl/FetchChangeEventsImpl.java @@ -0,0 +1,288 @@ +package org.openmetadata.service.governance.workflows.elements.triggers.impl; + +import static org.openmetadata.service.apps.bundles.changeEvent.AbstractEventConsumer.OFFSET_EXTENSION; +import static org.openmetadata.service.governance.workflows.elements.triggers.PeriodicBatchEntityTrigger.COLLECTION_VARIABLE; +import static org.openmetadata.service.governance.workflows.elements.triggers.PeriodicBatchEntityTrigger.HAS_FINISHED_VARIABLE; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.node.ArrayNode; +import com.fasterxml.jackson.databind.node.ObjectNode; +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; +import java.util.stream.Collectors; +import lombok.extern.slf4j.Slf4j; +import org.flowable.common.engine.api.delegate.Expression; +import org.flowable.engine.delegate.DelegateExecution; +import org.flowable.engine.delegate.JavaDelegate; +import org.openmetadata.schema.entity.events.EventSubscriptionOffset; +import org.openmetadata.schema.type.ChangeEvent; +import org.openmetadata.schema.utils.JsonUtils; +import org.openmetadata.service.Entity; +import org.openmetadata.service.jdbi3.CollectionDAO.ChangeEventDAO.ChangeEventRecord; +import org.openmetadata.service.resources.feeds.MessageParser; +import org.openmetadata.service.search.SearchRepository; +import org.openmetadata.service.search.SearchResultListMapper; +import org.openmetadata.service.search.SearchSortFilter; + +@Slf4j +public class FetchChangeEventsImpl implements JavaDelegate { + + static final String CURRENT_BATCH_OFFSET_VARIABLE = "currentBatchOffset"; + static final String MAX_PROCESSED_OFFSET_VARIABLE = "maxProcessedOffset"; + private static final String CARDINALITY_VARIABLE = "numberOfEntities"; + + private static final Set ENTITIES_NEEDING_KEYWORD_FQN = + Set.of("testCase", "user", "team"); + + private Expression entityTypesExpr; + private Expression batchSizeExpr; + private Expression workflowFqnExpr; + private Expression searchFilterExpr; + + @Override + public void execute(DelegateExecution execution) { + String entityType = resolveEntityType(execution); + int batchSize = Integer.parseInt((String) batchSizeExpr.getValue(execution)); + String workflowFqn = (String) workflowFqnExpr.getValue(execution); + + long currentOffset = resolveStartingOffset(execution, workflowFqn, entityType); + + List records = + Entity.getCollectionDAO() + .changeEventDAO() + .listByEntityTypesWithOffset(List.of(entityType), currentOffset, batchSize); + + if (records.isEmpty() && currentOffset > 0) { + long minOffset = + Entity.getCollectionDAO() + .changeEventDAO() + .getMinOffsetForEntityTypes(List.of(entityType)); + if (minOffset > 0 && minOffset > currentOffset) { + LOG.warn( + "Workflow '{}' stored offset {} has been purged. Earliest available offset for entity type '{}' is {}. Resuming from there.", + workflowFqn, + currentOffset, + entityType, + minOffset); + currentOffset = minOffset - 1; + records = + Entity.getCollectionDAO() + .changeEventDAO() + .listByEntityTypesWithOffset(List.of(entityType), currentOffset, batchSize); + } + } + + // Compute the max offset across all fetched records before any filtering. + // This ensures the cursor always advances past a batch even when the search filter + // removes all entities, preventing infinite re-processing of the same offset window. + long batchMaxOffset = currentOffset; + for (ChangeEventRecord record : records) { + if (record.offset() > batchMaxOffset) { + batchMaxOffset = record.offset(); + } + } + + Map fqnToMaxOffset = deduplicateByFqn(records); + + String searchFilter = + Optional.ofNullable(searchFilterExpr) + .map(expr -> (String) expr.getValue(execution)) + .orElse(null); + + if (searchFilter != null && !searchFilter.isBlank() && !fqnToMaxOffset.isEmpty()) { + Set matchingFqns = + filterFqnsBySearchCriteria( + new ArrayList<>(fqnToMaxOffset.keySet()), entityType, searchFilter); + fqnToMaxOffset.entrySet().removeIf(e -> !matchingFqns.contains(e.getKey())); + } + + List entityList = new ArrayList<>(); + Map> entityToListMap = new HashMap<>(); + + for (Map.Entry entry : fqnToMaxOffset.entrySet()) { + String fqn = entry.getKey(); + String entityLink = new MessageParser.EntityLink(entityType, fqn).getLinkString(); + entityList.add(entityLink); + entityToListMap.put(entityLink, List.of(entityLink)); + } + + // hasFinished is true only when no more change events exist (records fetch returned empty). + // A batch where all entities were filtered out is NOT finished — the cursor still advances + // via batchMaxOffset, and the loop runs the workflow trigger with zero iterations before + // fetching the next batch. + boolean hasFinished = records.isEmpty(); + + execution.setVariable(CURRENT_BATCH_OFFSET_VARIABLE, batchMaxOffset); + + Long existingMax = (Long) execution.getVariable(MAX_PROCESSED_OFFSET_VARIABLE); + if (existingMax == null || batchMaxOffset > existingMax) { + execution.setVariable(MAX_PROCESSED_OFFSET_VARIABLE, batchMaxOffset); + } + + execution.setVariable(CARDINALITY_VARIABLE, entityList.size()); + execution.setVariable(HAS_FINISHED_VARIABLE, hasFinished); + execution.setVariable(COLLECTION_VARIABLE, entityList); + execution.setVariable("entityToListMap", entityToListMap); + } + + private String resolveEntityType(DelegateExecution execution) { + String processDefinitionKey = execution.getProcessDefinitionId().split(":")[0]; + if (processDefinitionKey.contains("-")) { + String[] parts = processDefinitionKey.split("-"); + if (parts.length >= 2) { + return parts[parts.length - 1]; + } + } + return (String) entityTypesExpr.getValue(execution); + } + + private long resolveStartingOffset( + DelegateExecution execution, String workflowFqn, String entityType) { + Object stored = execution.getVariable(CURRENT_BATCH_OFFSET_VARIABLE); + if (stored != null) { + return (Long) stored; + } + String consumerId = buildConsumerId(workflowFqn, entityType); + String json = + Entity.getCollectionDAO() + .eventSubscriptionDAO() + .getSubscriberExtension(consumerId, OFFSET_EXTENSION); + if (json != null) { + EventSubscriptionOffset offset = JsonUtils.readValue(json, EventSubscriptionOffset.class); + return offset.getCurrentOffset(); + } + long minOffset = + Entity.getCollectionDAO().changeEventDAO().getMinOffsetForEntityTypes(List.of(entityType)); + if (minOffset > 0) { + LOG.info( + "No stored offset for workflow '{}' entity type '{}'. Starting from earliest available offset {}.", + workflowFqn, + entityType, + minOffset); + return minOffset - 1; + } + return 0L; + } + + private Map deduplicateByFqn(List records) { + Map fqnToMaxOffset = new LinkedHashMap<>(); + for (ChangeEventRecord record : records) { + ChangeEvent event = JsonUtils.readValue(record.json(), ChangeEvent.class); + String fqn = event.getEntityFullyQualifiedName(); + if (fqn == null || fqn.isEmpty()) { + continue; + } + fqnToMaxOffset.merge(fqn, record.offset(), Math::max); + } + return fqnToMaxOffset; + } + + private Set filterFqnsBySearchCriteria( + List fqns, String entityType, String searchFilter) { + String entitySpecificFilter = extractEntityFilter(searchFilter, entityType); + if (entitySpecificFilter == null || entitySpecificFilter.isBlank()) { + return new HashSet<>(fqns); + } + + SearchRepository searchRepository = Entity.getSearchRepository(); + if (!searchRepository.checkIfIndexingIsSupported(entityType)) { + return new HashSet<>(fqns); + } + + try { + String combinedFilter = buildCombinedFilter(fqns, entityType, entitySpecificFilter); + String fqnField = + ENTITIES_NEEDING_KEYWORD_FQN.contains(entityType) + ? "fullyQualifiedName.keyword" + : "fullyQualifiedName"; + SearchSortFilter sortFilter = new SearchSortFilter(fqnField, null, null, null); + SearchResultListMapper result = + searchRepository.listWithDeepPagination( + entityType, + null, + combinedFilter, + new String[] {"fullyQualifiedName"}, + sortFilter, + fqns.size(), + null); + + return result.getResults().stream() + .map(r -> (String) r.get("fullyQualifiedName")) + .filter(Objects::nonNull) + .collect(Collectors.toSet()); + } catch (IOException e) { + LOG.warn( + "Failed to apply search filter for entity type '{}', processing all entities: {}", + entityType, + e.getMessage()); + return new HashSet<>(fqns); + } + } + + private String extractEntityFilter(String filterObj, String entityType) { + if (filterObj == null || filterObj.isBlank()) { + return null; + } + try { + @SuppressWarnings("unchecked") + Map filterMap = JsonUtils.readValue(filterObj, Map.class); + String filter = filterMap.get(entityType); + if (filter != null && !filter.isBlank()) { + return filter; + } + return filterMap.get("default"); + } catch (Exception e) { + LOG.warn("Could not parse filter for entity type '{}': {}", entityType, e.getMessage()); + return null; + } + } + + private String buildCombinedFilter( + List fqns, String entityType, String entitySpecificFilter) throws IOException { + JsonNode entityFilterNode = JsonUtils.readTree(entitySpecificFilter); + JsonNode configuredQuery = entityFilterNode.get("query"); + + String fqnField = + ENTITIES_NEEDING_KEYWORD_FQN.contains(entityType) + ? "fullyQualifiedName.keyword" + : "fullyQualifiedName"; + + ArrayNode fqnArray = JsonUtils.getObjectMapper().createArrayNode(); + for (String fqn : fqns) { + fqnArray.add(fqn); + } + ObjectNode termsNode = JsonUtils.getObjectMapper().createObjectNode(); + termsNode.set(fqnField, fqnArray); + ObjectNode termsFilter = JsonUtils.getObjectMapper().createObjectNode(); + termsFilter.set("terms", termsNode); + + ArrayNode filterArray = JsonUtils.getObjectMapper().createArrayNode(); + if (configuredQuery != null) { + filterArray.add(configuredQuery); + } + filterArray.add(termsFilter); + + ObjectNode boolNode = JsonUtils.getObjectMapper().createObjectNode(); + boolNode.set("filter", filterArray); + + ObjectNode queryNode = JsonUtils.getObjectMapper().createObjectNode(); + queryNode.set("bool", boolNode); + + ObjectNode result = JsonUtils.getObjectMapper().createObjectNode(); + result.set("query", queryNode); + + return JsonUtils.getObjectMapper().writeValueAsString(result); + } + + static String buildConsumerId(String workflowFqn, String entityType) { + return workflowFqn + "Trigger-" + entityType; + } +} diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/triggers/impl/FetchEntitiesImpl.java b/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/triggers/impl/FetchEntitiesImpl.java index 4e8dd0bee616..a97070d4850a 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/triggers/impl/FetchEntitiesImpl.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/triggers/impl/FetchEntitiesImpl.java @@ -21,6 +21,12 @@ import org.openmetadata.service.search.SearchResultListMapper; import org.openmetadata.service.search.SearchSortFilter; +/** + * @deprecated All periodic batch workflows now use {@link FetchChangeEventsImpl} for incremental + * change-event-driven processing. This class is no longer used by {@link + * org.openmetadata.service.governance.workflows.elements.triggers.PeriodicBatchEntityTrigger}. + */ +@Deprecated @Slf4j public class FetchEntitiesImpl implements JavaDelegate { // Entity types whose fullyQualifiedName is mapped as "text" (not "keyword") diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/CollectionDAO.java b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/CollectionDAO.java index 8708385af8d4..6316b1780b9b 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/CollectionDAO.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/CollectionDAO.java @@ -6854,6 +6854,34 @@ record ChangeEventRecord(long offset, String json) {} @RegisterRowMapper(ChangeEventRecordMapper.class) List listWithOffset( @Bind("limit") int limit, @Bind("afterOffset") long afterOffset); + + @ConnectionAwareSqlQuery( + value = + "SELECT `offset`, json FROM change_event " + + "WHERE `offset` > :afterOffset AND entityType IN () " + + "ORDER BY `offset` ASC LIMIT :limit", + connectionType = MYSQL) + @ConnectionAwareSqlQuery( + value = + "SELECT \"offset\", json FROM change_event " + + "WHERE \"offset\" > :afterOffset AND entityType IN () " + + "ORDER BY \"offset\" ASC LIMIT :limit", + connectionType = POSTGRES) + @RegisterRowMapper(ChangeEventRecordMapper.class) + List listByEntityTypesWithOffset( + @BindList("entityTypes") List entityTypes, + @Bind("afterOffset") long afterOffset, + @Bind("limit") int limit); + + @ConnectionAwareSqlQuery( + value = + "SELECT COALESCE(MIN(`offset`), 0) FROM change_event WHERE entityType IN ()", + connectionType = MYSQL) + @ConnectionAwareSqlQuery( + value = + "SELECT COALESCE(MIN(\"offset\"), 0) FROM change_event WHERE entityType IN ()", + connectionType = POSTGRES) + long getMinOffsetForEntityTypes(@BindList("entityTypes") List entityTypes); } class ChangeEventRecordMapper implements RowMapper { diff --git a/openmetadata-service/src/test/java/org/openmetadata/service/governance/workflows/elements/triggers/PeriodicBatchEntityTriggerTest.java b/openmetadata-service/src/test/java/org/openmetadata/service/governance/workflows/elements/triggers/PeriodicBatchEntityTriggerTest.java index 8dace9cb3d0e..e4fa416e24bc 100644 --- a/openmetadata-service/src/test/java/org/openmetadata/service/governance/workflows/elements/triggers/PeriodicBatchEntityTriggerTest.java +++ b/openmetadata-service/src/test/java/org/openmetadata/service/governance/workflows/elements/triggers/PeriodicBatchEntityTriggerTest.java @@ -38,7 +38,8 @@ void testTriggerCreation_SingleExecutionMode() { PeriodicBatchEntityTriggerDefinition triggerDef = createTriggerDefinition(); PeriodicBatchEntityTrigger trigger = - new PeriodicBatchEntityTrigger("MainWorkflow", "MainWorkflowTrigger", triggerDef, true); + new PeriodicBatchEntityTrigger( + "MainWorkflow", "MainWorkflowTrigger", triggerDef, true, "MainWorkflow"); assertNotNull(trigger); assertEquals("MainWorkflowTrigger", trigger.getTriggerWorkflowId()); @@ -49,7 +50,8 @@ void testTriggerCreation_MultipleExecutionMode() { PeriodicBatchEntityTriggerDefinition triggerDef = createTriggerDefinition(); PeriodicBatchEntityTrigger trigger = - new PeriodicBatchEntityTrigger("MainWorkflow", "MainWorkflowTrigger", triggerDef, false); + new PeriodicBatchEntityTrigger( + "MainWorkflow", "MainWorkflowTrigger", triggerDef, false, "MainWorkflow"); assertNotNull(trigger); assertEquals("MainWorkflowTrigger", trigger.getTriggerWorkflowId()); @@ -60,12 +62,12 @@ void testSingleExecutionMode_CardinalityIsOne() { PeriodicBatchEntityTriggerDefinition triggerDef = createTriggerDefinition(); PeriodicBatchEntityTrigger trigger = - new PeriodicBatchEntityTrigger("MainWorkflow", "MainWorkflowTrigger", triggerDef, true); + new PeriodicBatchEntityTrigger( + "MainWorkflow", "MainWorkflowTrigger", triggerDef, true, "MainWorkflow"); BpmnModel model = new BpmnModel(); trigger.addToWorkflow(model); - // Find the CallActivity CallActivity callActivity = findCallActivity(model); assertNotNull(callActivity, "CallActivity should exist in the process"); @@ -82,7 +84,8 @@ void testMultipleExecutionMode_CardinalityIsVariable() { PeriodicBatchEntityTriggerDefinition triggerDef = createTriggerDefinition(); PeriodicBatchEntityTrigger trigger = - new PeriodicBatchEntityTrigger("MainWorkflow", "MainWorkflowTrigger", triggerDef, false); + new PeriodicBatchEntityTrigger( + "MainWorkflow", "MainWorkflowTrigger", triggerDef, false, "MainWorkflow"); BpmnModel model = new BpmnModel(); trigger.addToWorkflow(model); @@ -104,7 +107,8 @@ void testEntityListParameterIsPassedToWorkflow() { PeriodicBatchEntityTriggerDefinition triggerDef = createTriggerDefinition(); PeriodicBatchEntityTrigger trigger = - new PeriodicBatchEntityTrigger("MainWorkflow", "MainWorkflowTrigger", triggerDef, true); + new PeriodicBatchEntityTrigger( + "MainWorkflow", "MainWorkflowTrigger", triggerDef, true, "MainWorkflow"); BpmnModel model = new BpmnModel(); trigger.addToWorkflow(model); @@ -112,7 +116,6 @@ void testEntityListParameterIsPassedToWorkflow() { CallActivity callActivity = findCallActivity(model); assertNotNull(callActivity); - // Verify entityList is passed as input parameter List inParams = callActivity.getInParameters(); boolean hasEntityListParam = inParams.stream() @@ -127,7 +130,8 @@ void testRelatedEntityParameterIsPassedToWorkflow() { PeriodicBatchEntityTriggerDefinition triggerDef = createTriggerDefinition(); PeriodicBatchEntityTrigger trigger = - new PeriodicBatchEntityTrigger("MainWorkflow", "MainWorkflowTrigger", triggerDef, true); + new PeriodicBatchEntityTrigger( + "MainWorkflow", "MainWorkflowTrigger", triggerDef, true, "MainWorkflow"); BpmnModel model = new BpmnModel(); trigger.addToWorkflow(model); @@ -152,7 +156,8 @@ void testProcessStructure_HasRequiredElements() { PeriodicBatchEntityTriggerDefinition triggerDef = createTriggerDefinition(); PeriodicBatchEntityTrigger trigger = - new PeriodicBatchEntityTrigger("MainWorkflow", "MainWorkflowTrigger", triggerDef, true); + new PeriodicBatchEntityTrigger( + "MainWorkflow", "MainWorkflowTrigger", triggerDef, true, "MainWorkflow"); BpmnModel model = new BpmnModel(); trigger.addToWorkflow(model); @@ -161,18 +166,20 @@ void testProcessStructure_HasRequiredElements() { Process process = model.getProcesses().get(0); - // Verify start event exists boolean hasStartEvent = process.getFlowElements().stream().anyMatch(e -> e instanceof StartEvent); assertTrue(hasStartEvent, "Process should have a start event"); - // Verify fetch entities service task exists - boolean hasFetchEntitiesTask = + boolean hasFetchChangeEventsTask = process.getFlowElements().stream() - .anyMatch(e -> e instanceof ServiceTask && e.getId().contains("fetchEntityTask")); - assertTrue(hasFetchEntitiesTask, "Process should have a fetch entities task"); + .anyMatch(e -> e instanceof ServiceTask && e.getId().contains("fetchChangeEventsTask")); + assertTrue(hasFetchChangeEventsTask, "Process should have a fetch change events task"); + + boolean hasCommitOffsetTask = + process.getFlowElements().stream() + .anyMatch(e -> e instanceof ServiceTask && e.getId().contains("commitOffsetTask")); + assertTrue(hasCommitOffsetTask, "Process should have a commit offset task"); - // Verify call activity exists boolean hasCallActivity = process.getFlowElements().stream().anyMatch(e -> e instanceof CallActivity); assertTrue(hasCallActivity, "Process should have a call activity"); @@ -183,14 +190,14 @@ void testMultipleEntityTypes_CreatesMultipleProcesses() { PeriodicBatchEntityTriggerDefinition triggerDef = createTriggerDefinitionWithMultipleTypes(); PeriodicBatchEntityTrigger trigger = - new PeriodicBatchEntityTrigger("MainWorkflow", "MainWorkflowTrigger", triggerDef, true); + new PeriodicBatchEntityTrigger( + "MainWorkflow", "MainWorkflowTrigger", triggerDef, true, "MainWorkflow"); BpmnModel model = new BpmnModel(); trigger.addToWorkflow(model); assertEquals(3, model.getProcesses().size(), "Should create one process per entity type"); - // Verify each process has the correct ID pattern List processIds = model.getProcesses().stream().map(Process::getId).toList(); assertTrue( processIds.stream().anyMatch(id -> id.contains("glossaryTerm")), @@ -207,7 +214,8 @@ void testCallActivityInheritsBusinessKey() { PeriodicBatchEntityTriggerDefinition triggerDef = createTriggerDefinition(); PeriodicBatchEntityTrigger trigger = - new PeriodicBatchEntityTrigger("MainWorkflow", "MainWorkflowTrigger", triggerDef, true); + new PeriodicBatchEntityTrigger( + "MainWorkflow", "MainWorkflowTrigger", triggerDef, true, "MainWorkflow"); BpmnModel model = new BpmnModel(); trigger.addToWorkflow(model); @@ -224,7 +232,8 @@ void testCallActivityCallsCorrectWorkflow() { PeriodicBatchEntityTriggerDefinition triggerDef = createTriggerDefinition(); PeriodicBatchEntityTrigger trigger = - new PeriodicBatchEntityTrigger("MyMainWorkflow", "MyMainWorkflowTrigger", triggerDef, true); + new PeriodicBatchEntityTrigger( + "MyMainWorkflow", "MyMainWorkflowTrigger", triggerDef, true, "MyMainWorkflow"); BpmnModel model = new BpmnModel(); trigger.addToWorkflow(model); diff --git a/openmetadata-spec/src/main/resources/json/schema/governance/workflows/elements/triggers/periodicBatchEntityTrigger.json b/openmetadata-spec/src/main/resources/json/schema/governance/workflows/elements/triggers/periodicBatchEntityTrigger.json index f7366ff3b3b9..bfa987aac9da 100644 --- a/openmetadata-spec/src/main/resources/json/schema/governance/workflows/elements/triggers/periodicBatchEntityTrigger.json +++ b/openmetadata-spec/src/main/resources/json/schema/governance/workflows/elements/triggers/periodicBatchEntityTrigger.json @@ -66,10 +66,10 @@ }, "anyOf": [ { - "required": ["schedule", "entityType", "filters"] + "required": ["schedule", "entityType"] }, { - "required": ["schedule", "entityTypes", "filters"] + "required": ["schedule", "entityTypes"] } ], "additionalProperties": false From 6e9d2f26fc5f0e189284e238cb5e66d4b90edc65 Mon Sep 17 00:00:00 2001 From: "github-actions[bot]" Date: Fri, 27 Mar 2026 15:07:23 +0000 Subject: [PATCH 3/3] Update generated TypeScript types --- .../workflows/elements/triggers/periodicBatchEntityTrigger.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/openmetadata-ui/src/main/resources/ui/src/generated/governance/workflows/elements/triggers/periodicBatchEntityTrigger.ts b/openmetadata-ui/src/main/resources/ui/src/generated/governance/workflows/elements/triggers/periodicBatchEntityTrigger.ts index 4d8ac067e723..a111c97f15bb 100644 --- a/openmetadata-ui/src/main/resources/ui/src/generated/governance/workflows/elements/triggers/periodicBatchEntityTrigger.ts +++ b/openmetadata-ui/src/main/resources/ui/src/generated/governance/workflows/elements/triggers/periodicBatchEntityTrigger.ts @@ -41,7 +41,7 @@ export interface TriggerConfiguration { * Search filters for entities. Can be a string (applied to all entity types) or an object * mapping entity types to their specific filters. */ - filters: FiltersObject | string; + filters?: FiltersObject | string; /** * Defines the schedule of the Periodic Trigger. */