Skip to content
Open
Original file line number Diff line number Diff line change
Expand Up @@ -84,3 +84,11 @@ SELECT ue.id, re.id, 'user', 'role', 10
FROM user_entity ue, role_entity re
WHERE ue.name = 'mcpapplicationbot'
AND re.name = 'ApplicationBotImpersonationRole';

-- Add composite index on change_event(entityType, offset) for efficient incremental
-- change-event-driven workflow processing (filters by entityType + offset range).
CREATE INDEX IF NOT EXISTS idx_change_event_entity_type_offset ON change_event (entityType, `offset`);

-- Widen change_event_consumers.id from VARCHAR(36) to VARCHAR(768) to support workflow consumer IDs
-- which follow the pattern {workflowFQN}Trigger-{entityType} and can exceed 36 characters.
ALTER TABLE change_event_consumers MODIFY COLUMN id VARCHAR(768) NOT NULL;
Original file line number Diff line number Diff line change
Expand Up @@ -96,3 +96,11 @@ FROM user_entity ue, role_entity re
WHERE ue.name = 'mcpapplicationbot'
AND re.name = 'ApplicationBotImpersonationRole'
ON CONFLICT DO NOTHING;

-- Add composite index on change_event(entityType, offset) for efficient incremental
-- change-event-driven workflow processing (filters by entityType + offset range).
CREATE INDEX IF NOT EXISTS idx_change_event_entity_type_offset ON change_event (entitytype, "offset");

-- Widen change_event_consumers.id from VARCHAR(36) to VARCHAR(768) to support workflow consumer IDs
-- which follow the pattern {workflowFQN}Trigger-{entityType} and can exceed 36 characters.
ALTER TABLE change_event_consumers ALTER COLUMN id TYPE VARCHAR(768);
Original file line number Diff line number Diff line change
Expand Up @@ -2408,6 +2408,20 @@ void test_PeriodicBatchWorkflowEntityFiltering(TestNamespace ns, TestInfo test)

LOG.info("Multi-entity workflow processed all entities successfully");

// Create new change events for databases so the second incremental run has events to process.
// (The first run committed an offset; the second run only sees events > that offset.)
JsonNode db1Patch =
MAPPER.readTree(
"[{\"op\":\"replace\",\"path\":\"/description\",\"value\":\"Pre-second-run update for database 1\"}]");
client.databases().patch(database1.getId(), db1Patch);

JsonNode db2Patch =
MAPPER.readTree(
"[{\"op\":\"replace\",\"path\":\"/description\",\"value\":\"Pre-second-run update for database 2\"}]");
client.databases().patch(database2.getId(), db2Patch);

LOG.info("Patched databases to create new change events for the second run");

// Now modify workflow to only process database entities
Map<String, Object> singleEntityTriggerConfig = new HashMap<>();
singleEntityTriggerConfig.put("entityTypes", List.of("database"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@ public static TriggerInterface createTrigger(WorkflowDefinition workflow) {
workflow.getName(),
triggerWorkflowId,
(PeriodicBatchEntityTriggerDefinition) workflow.getTrigger(),
hasBatchModeNodes(workflow));
hasBatchModeNodes(workflow),
workflow.getFullyQualifiedName());
};
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@
import org.openmetadata.schema.utils.JsonUtils;
import org.openmetadata.service.apps.scheduler.AppScheduler;
import org.openmetadata.service.governance.workflows.elements.TriggerInterface;
import org.openmetadata.service.governance.workflows.elements.triggers.impl.FetchEntitiesImpl;
import org.openmetadata.service.governance.workflows.elements.triggers.impl.CommitChangeEventOffsetImpl;
import org.openmetadata.service.governance.workflows.elements.triggers.impl.FetchChangeEventsImpl;
import org.openmetadata.service.governance.workflows.flowable.builders.CallActivityBuilder;
import org.openmetadata.service.governance.workflows.flowable.builders.EndEventBuilder;
import org.openmetadata.service.governance.workflows.flowable.builders.FieldExtensionBuilder;
Expand All @@ -52,7 +53,8 @@ public PeriodicBatchEntityTrigger(
String mainWorkflowName,
String triggerWorkflowId,
PeriodicBatchEntityTriggerDefinition triggerDefinition,
boolean singleExecutionMode) {
boolean singleExecutionMode,
String workflowFqn) {
this.triggerWorkflowId = triggerWorkflowId;
this.singleExecutionMode = singleExecutionMode;
List<String> entityTypes = getEntityTypesFromConfig(triggerDefinition.getConfig());
Expand All @@ -73,10 +75,6 @@ public PeriodicBatchEntityTrigger(
oTimerDefinition.ifPresent(startEvent::addEventDefinition);
process.addFlowElement(startEvent);

ServiceTask fetchEntitiesTask =
getFetchEntitiesTask(processId, entityType, triggerDefinition);
process.addFlowElement(fetchEntitiesTask);

CallActivity workflowTrigger =
getWorkflowTriggerCallActivity(processId, mainWorkflowName, singleExecutionMode);
process.addFlowElement(workflowTrigger);
Expand All @@ -85,21 +83,24 @@ public PeriodicBatchEntityTrigger(
new EndEventBuilder().id(getFlowableElementId(processId, "endEvent")).build();
process.addFlowElement(endEvent);

SequenceFlow finished = new SequenceFlow(fetchEntitiesTask.getId(), endEvent.getId());
ServiceTask fetchTask =
getFetchChangeEventsTask(processId, entityType, triggerDefinition, workflowFqn);
process.addFlowElement(fetchTask);

ServiceTask commitTask = getCommitOffsetTask(processId, entityType, workflowFqn);
process.addFlowElement(commitTask);

SequenceFlow finished = new SequenceFlow(fetchTask.getId(), commitTask.getId());
finished.setConditionExpression(String.format("${%s}", HAS_FINISHED_VARIABLE));

SequenceFlow notFinished =
new SequenceFlow(fetchEntitiesTask.getId(), workflowTrigger.getId());
SequenceFlow notFinished = new SequenceFlow(fetchTask.getId(), workflowTrigger.getId());
notFinished.setConditionExpression(String.format("${!%s}", HAS_FINISHED_VARIABLE));

// Start -> Fetch Entities
process.addFlowElement(new SequenceFlow(startEvent.getId(), fetchEntitiesTask.getId()));
// Fetch Entities -> End
process.addFlowElement(new SequenceFlow(startEvent.getId(), fetchTask.getId()));
process.addFlowElement(finished);
// Fetch Entities -> WorkflowTrigger
process.addFlowElement(notFinished);
// WorkflowTrigger -> Fetch Entities (Loop Back to get next batch)
process.addFlowElement(new SequenceFlow(workflowTrigger.getId(), fetchEntitiesTask.getId()));
process.addFlowElement(new SequenceFlow(workflowTrigger.getId(), fetchTask.getId()));
process.addFlowElement(new SequenceFlow(commitTask.getId(), endEvent.getId()));

processes.add(process);
}
Expand Down Expand Up @@ -170,71 +171,71 @@ private CallActivity getWorkflowTriggerCallActivity(
return workflowTrigger;
}

private ServiceTask getFetchEntitiesTask(
private ServiceTask getFetchChangeEventsTask(
String workflowTriggerId,
String entityType,
PeriodicBatchEntityTriggerDefinition triggerDefinition) {
PeriodicBatchEntityTriggerDefinition triggerDefinition,
String workflowFqn) {
FieldExtension entityTypesExpr =
new FieldExtensionBuilder().fieldName("entityTypesExpr").fieldValue(entityType).build();

// Extract entity-specific filter based on the filter configuration
String entitySpecificFilter =
extractEntitySpecificFilter(triggerDefinition.getConfig().getFilters(), entityType);

FieldExtension searchFilterExpr =
new FieldExtensionBuilder(false)
.fieldName("searchFilterExpr")
.fieldValue(entitySpecificFilter)
.build();

FieldExtension batchSizeExpr =
new FieldExtensionBuilder()
.fieldName("batchSizeExpr")
.fieldValue(String.valueOf(triggerDefinition.getConfig().getBatchSize()))
.build();

FieldExtension workflowFqnExpr =
new FieldExtensionBuilder()
.fieldName("workflowFqnExpr")
.fieldValue(workflowFqn != null ? workflowFqn : "")
.build();

ServiceTask serviceTask =
new ServiceTaskBuilder()
.id(getFlowableElementId(workflowTriggerId, "fetchEntityTask"))
.implementation(FetchEntitiesImpl.class.getName())
.id(getFlowableElementId(workflowTriggerId, "fetchChangeEventsTask"))
.implementation(FetchChangeEventsImpl.class.getName())
.build();

serviceTask.getFieldExtensions().add(entityTypesExpr);
serviceTask.getFieldExtensions().add(searchFilterExpr);
serviceTask.getFieldExtensions().add(batchSizeExpr);
serviceTask.getFieldExtensions().add(workflowFqnExpr);

Object filters = triggerDefinition.getConfig().getFilters();
if (filters != null) {
String filtersJson =
filters instanceof String ? (String) filters : JsonUtils.pojoToJson(filters);
FieldExtension searchFilterExpr =
new FieldExtensionBuilder().fieldName("searchFilterExpr").fieldValue(filtersJson).build();
serviceTask.getFieldExtensions().add(searchFilterExpr);
}

serviceTask.setAsynchronousLeave(true);

return serviceTask;
}

private String extractEntitySpecificFilter(Object filtersObj, String entityType) {
if (filtersObj == null) {
return null;
}

// Handle map format with entity-specific filters (values are JSON strings)
if (filtersObj instanceof Map) {
@SuppressWarnings("unchecked")
Map<String, String> filterMap = (Map<String, String>) filtersObj;
private ServiceTask getCommitOffsetTask(
String workflowTriggerId, String entityType, String workflowFqn) {
FieldExtension workflowFqnExpr =
new FieldExtensionBuilder()
.fieldName("workflowFqnExpr")
.fieldValue(workflowFqn != null ? workflowFqn : "")
.build();

// First check for entity-specific filter
String specificFilter = filterMap.get(entityType);
if (specificFilter != null && !specificFilter.isEmpty()) {
return specificFilter;
}
FieldExtension entityTypeExpr =
new FieldExtensionBuilder().fieldName("entityTypeExpr").fieldValue(entityType).build();

// Fall back to default filter if no specific filter found
String defaultFilter = filterMap.get("default");
if (defaultFilter != null && !defaultFilter.isEmpty()) {
return defaultFilter;
}
ServiceTask serviceTask =
new ServiceTaskBuilder()
.id(getFlowableElementId(workflowTriggerId, "commitOffsetTask"))
.implementation(CommitChangeEventOffsetImpl.class.getName())
.build();

return null;
}
serviceTask.getFieldExtensions().add(workflowFqnExpr);
serviceTask.getFieldExtensions().add(entityTypeExpr);

// If it's not a map, try to convert to string
return JsonUtils.pojoToJson(filtersObj);
return serviceTask;
}

private List<String> getEntityTypesFromConfig(Object configObj) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
package org.openmetadata.service.governance.workflows.elements.triggers.impl;

import static org.openmetadata.service.apps.bundles.changeEvent.AbstractEventConsumer.OFFSET_EXTENSION;
import static org.openmetadata.service.governance.workflows.elements.triggers.impl.FetchChangeEventsImpl.MAX_PROCESSED_OFFSET_VARIABLE;
import static org.openmetadata.service.governance.workflows.elements.triggers.impl.FetchChangeEventsImpl.buildConsumerId;

import lombok.extern.slf4j.Slf4j;
import org.flowable.common.engine.api.delegate.Expression;
import org.flowable.engine.delegate.DelegateExecution;
import org.flowable.engine.delegate.JavaDelegate;
import org.openmetadata.schema.entity.events.EventSubscriptionOffset;
import org.openmetadata.schema.utils.JsonUtils;
import org.openmetadata.service.Entity;

@Slf4j
public class CommitChangeEventOffsetImpl implements JavaDelegate {

private static final String OFFSET_JSON_SCHEMA = "eventSubscriptionOffset";

private Expression workflowFqnExpr;
private Expression entityTypeExpr;

@Override
public void execute(DelegateExecution execution) {
String workflowFqn = (String) workflowFqnExpr.getValue(execution);
String entityType = (String) entityTypeExpr.getValue(execution);
String consumerId = buildConsumerId(workflowFqn, entityType);

Long maxProcessedOffset = (Long) execution.getVariable(MAX_PROCESSED_OFFSET_VARIABLE);
if (maxProcessedOffset == null) {
LOG.debug(
"No events processed for workflow '{}' entity type '{}'. Offset not updated.",
workflowFqn,
entityType);
return;
}

String existingJson =
Entity.getCollectionDAO()
.eventSubscriptionDAO()
.getSubscriberExtension(consumerId, OFFSET_EXTENSION);

if (existingJson != null) {
EventSubscriptionOffset existing =
JsonUtils.readValue(existingJson, EventSubscriptionOffset.class);
if (existing.getCurrentOffset() >= maxProcessedOffset) {
LOG.debug(
"Stored offset {} >= processed offset {} for workflow '{}'. No update needed.",
existing.getCurrentOffset(),
maxProcessedOffset,
workflowFqn);
return;
}
}

EventSubscriptionOffset newOffset =
new EventSubscriptionOffset()
.withStartingOffset(maxProcessedOffset)
.withCurrentOffset(maxProcessedOffset)
.withTimestamp(System.currentTimeMillis());

Entity.getCollectionDAO()
.eventSubscriptionDAO()
.upsertSubscriberExtension(
consumerId, OFFSET_EXTENSION, OFFSET_JSON_SCHEMA, JsonUtils.pojoToJson(newOffset));

LOG.info(
"Committed offset {} for workflow '{}' entity type '{}'.",
maxProcessedOffset,
workflowFqn,
entityType);
}
}
Loading
Loading