Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
41 commits
Select commit Hold shift + click to select a range
010f3ea
Implement IntermediateCatchEventBuilder
IceS2 Mar 4, 2026
9403129
Implement IntermediateCatchEventBuilder
IceS2 Mar 4, 2026
53ced59
Implement ManualTask workflow node
IceS2 Mar 16, 2026
aed3969
Simplify BaseDelegate: remove shared Expression fields
IceS2 Mar 16, 2026
77d0d61
Update generated TypeScript types
github-actions[bot] Mar 16, 2026
909f2d2
Fix CloseTaskImpl resolution bug and rename taskCreated variable
IceS2 Mar 16, 2026
bb29b37
Merge branch 'feat/incident-lifecycle-workflow' into feat/ilw-pr2-man…
IceS2 Mar 18, 2026
0ab66d0
Address PR review: configurable assignees, null-guard, required fields
IceS2 Mar 20, 2026
1a5c2b8
Update generated TypeScript types
github-actions[bot] Mar 20, 2026
a5020bc
fix: safe boolean cast and dedup assignees in resolveAssignees
IceS2 Mar 23, 2026
6f9712d
Add Task-Workflow bridge with async retry
IceS2 Mar 16, 2026
57d2080
Address code review findings on Task-Workflow bridge
IceS2 Mar 16, 2026
aa52ef1
feat(outbox): add task_workflow_outbox table migrations
IceS2 Mar 19, 2026
5e78598
feat(outbox): add OutboxEntry POJO and TaskWorkflowOutboxDAO
IceS2 Mar 19, 2026
f431868
feat(outbox): add TaskWorkflowOutboxDrainer with unit tests
IceS2 Mar 19, 2026
443f8cd
feat(outbox): route task status events through outbox in WorkflowEven…
IceS2 Mar 19, 2026
7d39894
test(outbox): add consumer routing filter tests
IceS2 Mar 19, 2026
60849c2
refactor(outbox): remove direct postUpdate hook and retry infrastructure
IceS2 Mar 19, 2026
b2df46b
feat(outbox): wire TaskWorkflowOutboxDrainer lifecycle into WorkflowH…
IceS2 Mar 19, 2026
4cb3ed6
test(outbox): increase ManualTaskWorkflowTest timeouts for polling la…
IceS2 Mar 19, 2026
521d2f4
fix(outbox): wrap enqueueTaskMessage in try-catch to prevent signal b…
IceS2 Mar 19, 2026
35205ef
fix(outbox): rename index prefix from idx_two_ to idx_outbox_ for cla…
IceS2 Mar 19, 2026
6d0d47c
fix(outbox): prevent out-of-order delivery when another worker holds …
IceS2 Mar 20, 2026
9b6f50b
perf(outbox): replace 1+3N queries with single bulk query per drain c…
IceS2 Mar 20, 2026
b9e24c7
fix(outbox): address review findings — C2, C3, I2, I3, I4, I5
IceS2 Mar 20, 2026
926787c
fix(outbox): add batch limit and prioritized ordering to drain query
IceS2 Mar 20, 2026
17f09d5
test(outbox): replace ManualTaskWorkflowTest with lean E2E ManualTask…
IceS2 Mar 20, 2026
f73830b
fix(outbox): handle raw string FieldChange.newValue in enqueueTaskMes…
IceS2 Mar 20, 2026
56ec631
style: spotless formatting on IncidentTaskIntegrationIT
IceS2 Mar 20, 2026
b5512e0
fix(outbox): wrap enqueue retry exhaustion in EventPublisherException
IceS2 Mar 20, 2026
b001e36
fix(outbox): null createdBy fallback, exhausted cleanup, idempotent e…
IceS2 Mar 20, 2026
f7a40dc
fix(outbox): use INSERT IGNORE / ON CONFLICT DO NOTHING for idempoten…
IceS2 Mar 20, 2026
82c23fa
feat(incident): add IncidentTcrsSyncHandler for Task→TCRS sync
IceS2 Mar 25, 2026
46bdff2
Update generated TypeScript types
github-actions[bot] Mar 25, 2026
0b494af
fix(manualTask): address PR review — assignees as EntityLinks, termin…
IceS2 Mar 25, 2026
5380b1b
Merge branch 'feat/ilw-pr2-manual-task-node' into feat/ilw-pr3-task-w…
IceS2 Mar 25, 2026
ea7a316
Merge branch 'feat/ilw-pr3-task-workflow-bridge' into feat/ilw-item2-…
IceS2 Mar 25, 2026
0bc1464
Merge branch 'feat/ilw-item2-incident-tcrs-sync-hook' of github.com:o…
IceS2 Mar 25, 2026
5386602
Update generated TypeScript types
github-actions[bot] Mar 25, 2026
a3df3e2
Merge branch 'feat/incident-lifecycle-workflow' into feat/ilw-item2-i…
IceS2 Mar 30, 2026
f9e2a6e
fix: address PR review — rename methods, guard extractStringValue, op…
IceS2 Mar 31, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions bootstrap/sql/migrations/native/2.0.0/mysql/schemaChanges.sql
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,11 @@ CREATE TABLE IF NOT EXISTS task_entity (
KEY idx_status_about (status, aboutFqnHash)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;

-- aboutEntityLink: hierarchical entity identity for lifecycle handler lookups
ALTER TABLE task_entity ADD COLUMN IF NOT EXISTS aboutEntityLink varchar(1024)
GENERATED ALWAYS AS (json_unquote(json_extract(`json`, _utf8mb4'$.aboutEntityLink'))) STORED;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is mysql not utf8 by default?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it should be, but it does not hurt to be defensive, right?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In sql files okay, in run time for other cases when we know the value is so and so, it's better to code that way rather than being defensive, makes debugging a pain. As this is in sql file, it's okay!

CREATE INDEX idx_task_about_entity_link ON task_entity (aboutEntityLink(255));

CREATE TABLE IF NOT EXISTS new_task_sequence (
id bigint NOT NULL DEFAULT 0
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,11 @@ CREATE INDEX IF NOT EXISTS idx_task_status_category ON task_entity (status, cate
CREATE INDEX IF NOT EXISTS idx_task_about_fqn_hash ON task_entity (aboutfqnhash);
CREATE INDEX IF NOT EXISTS idx_task_status_about ON task_entity (status, aboutfqnhash);

-- aboutEntityLink: hierarchical entity identity for lifecycle handler lookups
ALTER TABLE task_entity ADD COLUMN IF NOT EXISTS aboutentitylink character varying(1024)
GENERATED ALWAYS AS ((json ->> 'aboutEntityLink'::text)) STORED;
CREATE INDEX IF NOT EXISTS idx_task_about_entity_link ON task_entity (aboutentitylink);

CREATE TABLE IF NOT EXISTS new_task_sequence (
id bigint NOT NULL DEFAULT 0
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,13 @@
import static org.awaitility.Awaitility.await;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;

import com.fasterxml.jackson.core.type.TypeReference;
import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicReference;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
Expand All @@ -38,23 +40,32 @@
import org.openmetadata.schema.entity.services.DatabaseService;
import org.openmetadata.schema.entity.tasks.Task;
import org.openmetadata.schema.governance.workflows.WorkflowDefinition;
import org.openmetadata.schema.tests.TestCase;
import org.openmetadata.schema.tests.type.TestCaseResolutionStatus;
import org.openmetadata.schema.tests.type.TestCaseResolutionStatusTypes;
import org.openmetadata.schema.tests.type.TestCaseStatus;
import org.openmetadata.schema.type.TaskCategory;
import org.openmetadata.schema.type.TaskEntityStatus;
import org.openmetadata.schema.type.TaskEntityType;
import org.openmetadata.schema.utils.JsonUtils;
import org.openmetadata.sdk.client.OpenMetadataClient;
import org.openmetadata.sdk.fluent.builders.TestCaseBuilder;
import org.openmetadata.sdk.models.ListParams;
import org.openmetadata.sdk.models.ListResponse;
import org.openmetadata.sdk.network.HttpMethod;
import org.openmetadata.sdk.network.RequestOptions;

/**
* E2E integration test for the outbox-based ManualTask message delivery pipeline.
* E2E integration test for the outbox-based ManualTask message delivery pipeline and incident TCRS
* sync.
*
* <p>Verifies that task status changes (via PATCH) flow through the full pipeline: ChangeEvent →
* WorkflowEventConsumer → task_workflow_outbox → TaskWorkflowOutboxDrainer → Flowable
* messageEventReceived → ManualTask subprocess processes each status in order.
*
* <p>Also verifies the IncidentTcrsSyncHandler: when a workflow creates an incident Task with
* aboutEntityLink, status changes are synced to TCRS records (Task → TCRS Strangler Fig bridge).
*
* <p>Proof of delivery: workflow instance stage results show each status transition, and the
* workflow instance reaches FINISHED status.
*/
Expand All @@ -66,29 +77,47 @@ public class ManualTaskOutboxIT {
private static final Duration PIPELINE_TIMEOUT = Duration.ofSeconds(120);

@Test
void outboxDeliversStatusChangesInOrder(TestNamespace ns) {
void outboxDeliversStatusChangesInOrder_andSyncsTcrs(TestNamespace ns) {
OpenMetadataClient client = SdkClients.adminClient();

String id = ns.shortPrefix();
String workflowName = "outbox-e2e-" + id;

DatabaseService service = DatabaseServiceTestFactory.createPostgresWithName("sv" + id, ns);
DatabaseSchema schema = DatabaseSchemaTestFactory.createSimpleWithName("sc" + id, ns, service);
Table table =
TableTestFactory.createSimpleWithName("tbl" + id, ns, schema.getFullyQualifiedName());

// Deploy workflow and wait for Flowable to confirm the process definition is ready.
// The ChangeEvent consumer processes events in offset order — if we trigger
// the test failure before Flowable has the signal start event registered,
// the testCase-entityUpdated signal fires but no process catches it.
// Create TestCase BEFORE deploying workflow — the testCase-entityCreated ChangeEvent
// will be consumed before the workflow exists, avoiding spurious triggers.
TestCase testCase =
TestCaseBuilder.create(client)
.name("tc" + id)
.forTable(table)
.testDefinition("tableRowCountToEqual")
.parameter("value", "100")
.create();

WorkflowDefinition workflow = deployManualTaskWorkflow(client, workflowName);
assertNotNull(workflow, "Workflow should be deployed");
waitForWorkflowDeployed(client, workflowName);

try {
Table table =
TableTestFactory.createSimpleWithName("tbl" + id, ns, schema.getFullyQualifiedName());
// Trigger: fail the test → ChangeEventHandler creates a testCase-entityUpdated ChangeEvent
// (via getChangeEventForEntityTimeSeries which resolves TestCaseResult → parent TestCase).
createFailedTestResult(client, testCase);

AtomicReference<Task> taskRef = new AtomicReference<>();
await()
.atMost(PIPELINE_TIMEOUT)
.pollInterval(Duration.ofSeconds(2))
.until(
() -> {
Task found = findIncidentTaskForEntity(client, table);
Task found = findIncidentTaskForTestCase(client, testCase);
if (found != null && found.getWorkflowInstanceId() != null) {
taskRef.set(found);
return true;
Expand All @@ -102,13 +131,40 @@ void outboxDeliversStatusChangesInOrder(TestNamespace ns) {
assertEquals(TaskCategory.Incident, task.getCategory());
assertEquals(TaskEntityType.IncidentResolution, task.getType());

// Verify aboutEntityLink encodes testCase FQN + incident stateId
String aboutLink = task.getAboutEntityLink();
assertNotNull(aboutLink, "aboutEntityLink should be populated");
assertTrue(aboutLink.contains("testCase"), "Should reference testCase entity type");
assertTrue(aboutLink.contains("incidents"), "Should contain incidents field");
assertTrue(
aboutLink.contains(testCase.getFullyQualifiedName()), "Should contain the testCase FQN");

// Get stateId from TestCase for TCRS verification
TestCase updatedTc =
client.testCases().getByName(testCase.getFullyQualifiedName(), "incidentId");
UUID stateId = updatedTc.getIncidentId();
assertNotNull(stateId, "TestCase should have incidentId set from test failure");
assertTrue(aboutLink.contains(stateId.toString()), "Should contain the incident stateId");

patchTaskStatus(client, task.getId().toString(), "InProgress");

// Verify TCRS(Ack) synced by handler (async — poll)
await()
.atMost(Duration.ofSeconds(30))
.pollInterval(Duration.ofSeconds(2))
.until(
() ->
listTcrsForStateId(client, stateId).stream()
.anyMatch(
r ->
r.getTestCaseResolutionStatusType()
== TestCaseResolutionStatusTypes.Ack));

patchTaskStatus(client, task.getId().toString(), "Completed");

String workflowInstanceId = task.getWorkflowInstanceId().toString();

// Workflow reaching FINISHED proves the full outbox pipeline delivered the terminal status
// (InProgress delivery is implicit — workflow can't reach FINISHED without processing it)
await()
.atMost(PIPELINE_TIMEOUT)
.pollInterval(Duration.ofSeconds(2))
Expand All @@ -126,9 +182,39 @@ void outboxDeliversStatusChangesInOrder(TestNamespace ns) {
Task resolvedTask = client.tasks().get(task.getId().toString(), "resolution");
assertNotNull(resolvedTask.getResolution(), "CloseTaskDelegate should have set resolution");

// Count how many times the ManualTask subprocess was entered.
// Each entry creates a new stage record via WorkflowInstanceStageListener.
// Expected: 3 entries (Open cycle, InProgress cycle, Completed cycle)
// Verify TCRS(Resolved) synced by handler
await()
.atMost(Duration.ofSeconds(30))
.pollInterval(Duration.ofSeconds(2))
.until(
() ->
listTcrsForStateId(client, stateId).stream()
.anyMatch(
r ->
r.getTestCaseResolutionStatusType()
== TestCaseResolutionStatusTypes.Resolved));

// Final TCRS verification: all expected records exist
List<TestCaseResolutionStatus> allRecords = listTcrsForStateId(client, stateId);
assertTrue(
allRecords.stream()
.anyMatch(
r -> r.getTestCaseResolutionStatusType() == TestCaseResolutionStatusTypes.New),
"TCRS(New) should exist from initial test failure");
assertTrue(
allRecords.stream()
.anyMatch(
r -> r.getTestCaseResolutionStatusType() == TestCaseResolutionStatusTypes.Ack),
"TCRS(Ack) should be synced from Task InProgress");
assertTrue(
allRecords.stream()
.anyMatch(
r ->
r.getTestCaseResolutionStatusType()
== TestCaseResolutionStatusTypes.Resolved),
"TCRS(Resolved) should be synced from Task Completed");

// Count ManualTask subprocess entries
List<Map<String, Object>> states =
getWorkflowInstanceStates(client, workflowName, workflowInstanceId);
long manualTaskEntries =
Expand All @@ -155,12 +241,12 @@ private WorkflowDefinition deployManualTaskWorkflow(
{
"name": "%s",
"displayName": "Outbox E2E Test Workflow",
"description": "Tests outbox-based ManualTask delivery",
"description": "Tests outbox-based ManualTask delivery and TCRS sync",
"trigger": {
"type": "eventBasedEntity",
"config": {
"events": ["Created"],
"entityTypes": ["table"]
"events": ["Updated"],
"entityTypes": ["testCase"]
},
"output": ["relatedEntity"]
},
Expand Down Expand Up @@ -215,25 +301,75 @@ private WorkflowDefinition deployManualTaskWorkflow(
return client.workflowDefinitions().create(request);
}

private Task findIncidentTaskForEntity(OpenMetadataClient client, Table table) {
private void waitForWorkflowDeployed(OpenMetadataClient client, String workflowName) {
await()
.atMost(Duration.ofSeconds(30))
.pollInterval(Duration.ofSeconds(2))
.until(
() -> {
try {
WorkflowDefinition wd =
client.workflowDefinitions().getByName(workflowName, "deployed");
return Boolean.TRUE.equals(wd.getDeployed());
} catch (Exception e) {
return false;
}
});
}

private void createFailedTestResult(OpenMetadataClient client, TestCase testCase) {
org.openmetadata.schema.api.tests.CreateTestCaseResult failedResult =
new org.openmetadata.schema.api.tests.CreateTestCaseResult();
failedResult.setTimestamp(System.currentTimeMillis());
failedResult.setTestCaseStatus(TestCaseStatus.Failed);
failedResult.setResult("Test failed - triggering incident");
client.testCaseResults().create(testCase.getFullyQualifiedName(), failedResult);
}

private Task findIncidentTaskForTestCase(OpenMetadataClient client, TestCase testCase) {
ListParams params =
new ListParams()
.addFilter("category", "Incident")
.addFilter("status", "Open")
.setFields("payload,about")
.setFields("payload,about,aboutEntityLink")
.setLimit(100);
ListResponse<Task> tasks = client.tasks().list(params);

for (Task task : tasks.getData()) {
if (task.getAbout() != null
&& task.getAbout().getFullyQualifiedName() != null
&& task.getAbout().getFullyQualifiedName().equals(table.getFullyQualifiedName())) {
&& task.getAbout().getFullyQualifiedName().equals(testCase.getFullyQualifiedName())) {
return task;
}
}
return null;
}

@SuppressWarnings("unchecked")
private List<TestCaseResolutionStatus> listTcrsForStateId(
OpenMetadataClient client, UUID stateId) {
try {
String response =
client
.getHttpClient()
.executeForString(
HttpMethod.GET,
"/v1/dataQuality/testCases/testCaseIncidentStatus/stateId/" + stateId,
null,
RequestOptions.builder().build());

Map<String, Object> result = JsonUtils.readValue(response, new TypeReference<>() {});
List<Object> data = (List<Object>) result.get("data");
if (data == null) {
return List.of();
}
return data.stream()
.map(d -> JsonUtils.convertValue(d, TestCaseResolutionStatus.class))
.toList();
} catch (Exception e) {
return List.of();
}
}

private void patchTaskStatus(OpenMetadataClient client, String taskId, String status) {
String patchJson =
String.format("[{\"op\": \"replace\", \"path\": \"/status\", \"value\": \"%s\"}]", status);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -298,7 +298,7 @@ public OpenMetadataClient(OpenMetadataConfig config) {
this.aiApplications = new AIApplicationService(httpClient);
this.promptTemplates = new PromptTemplateService(httpClient);

// Initialize task services
// Initialize task services
this.tasks = new TaskService(httpClient);

// Initialize feed service
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
import org.openmetadata.service.audit.AuditLogRepository;
import org.openmetadata.service.events.lifecycle.EntityLifecycleEventDispatcher;
import org.openmetadata.service.events.lifecycle.handlers.DomainSyncHandler;
import org.openmetadata.service.events.lifecycle.handlers.IncidentTcrsSyncHandler;
import org.openmetadata.service.exception.CatalogExceptionMessage;
import org.openmetadata.service.exception.EntityNotFoundException;
import org.openmetadata.service.jdbi3.ChangeEventRepository;
Expand Down Expand Up @@ -392,19 +393,26 @@ public static void initializeRepositories(OpenMetadataApplicationConfig config,
}
}
}
registerDomainSyncHandler();
registerLifecycleHandlers();
initializedRepositories = true;
}
}

private static void registerDomainSyncHandler() {
private static void registerLifecycleHandlers() {
try {
DomainSyncHandler domainSyncHandler = new DomainSyncHandler();
EntityLifecycleEventDispatcher.getInstance().registerHandler(domainSyncHandler);
LOG.info("Successfully registered DomainSyncHandler for entity lifecycle events");
} catch (Exception e) {
LOG.error("Failed to register DomainSyncHandler", e);
}
try {
IncidentTcrsSyncHandler incidentTcrsSyncHandler = new IncidentTcrsSyncHandler();
EntityLifecycleEventDispatcher.getInstance().registerHandler(incidentTcrsSyncHandler);
LOG.info("Successfully registered IncidentTcrsSyncHandler for entity lifecycle events");
} catch (Exception e) {
LOG.error("Failed to register IncidentTcrsSyncHandler", e);
}
}

public static void cleanup() {
Expand Down
Loading
Loading