diff --git a/bootstrap/sql/migrations/native/2.0.0/mysql/schemaChanges.sql b/bootstrap/sql/migrations/native/2.0.0/mysql/schemaChanges.sql index a41d9883bdfe..397ba35c5de7 100644 --- a/bootstrap/sql/migrations/native/2.0.0/mysql/schemaChanges.sql +++ b/bootstrap/sql/migrations/native/2.0.0/mysql/schemaChanges.sql @@ -30,6 +30,11 @@ CREATE TABLE IF NOT EXISTS task_entity ( KEY idx_status_about (status, aboutFqnHash) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci; +-- aboutEntityLink: hierarchical entity identity for lifecycle handler lookups +ALTER TABLE task_entity ADD COLUMN IF NOT EXISTS aboutEntityLink varchar(1024) + GENERATED ALWAYS AS (json_unquote(json_extract(`json`, _utf8mb4'$.aboutEntityLink'))) STORED; +CREATE INDEX idx_task_about_entity_link ON task_entity (aboutEntityLink(255)); + CREATE TABLE IF NOT EXISTS new_task_sequence ( id bigint NOT NULL DEFAULT 0 ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci; diff --git a/bootstrap/sql/migrations/native/2.0.0/postgres/schemaChanges.sql b/bootstrap/sql/migrations/native/2.0.0/postgres/schemaChanges.sql index 6b0e3f407785..462928954238 100644 --- a/bootstrap/sql/migrations/native/2.0.0/postgres/schemaChanges.sql +++ b/bootstrap/sql/migrations/native/2.0.0/postgres/schemaChanges.sql @@ -31,6 +31,11 @@ CREATE INDEX IF NOT EXISTS idx_task_status_category ON task_entity (status, cate CREATE INDEX IF NOT EXISTS idx_task_about_fqn_hash ON task_entity (aboutfqnhash); CREATE INDEX IF NOT EXISTS idx_task_status_about ON task_entity (status, aboutfqnhash); +-- aboutEntityLink: hierarchical entity identity for lifecycle handler lookups +ALTER TABLE task_entity ADD COLUMN IF NOT EXISTS aboutentitylink character varying(1024) + GENERATED ALWAYS AS ((json ->> 'aboutEntityLink'::text)) STORED; +CREATE INDEX IF NOT EXISTS idx_task_about_entity_link ON task_entity (aboutentitylink); + CREATE TABLE IF NOT EXISTS new_task_sequence ( id bigint NOT NULL DEFAULT 0 ); diff --git a/openmetadata-integration-tests/src/test/java/org/openmetadata/it/tests/ManualTaskOutboxIT.java b/openmetadata-integration-tests/src/test/java/org/openmetadata/it/tests/ManualTaskOutboxIT.java index b87350cd54f8..8554ad8ee3f5 100644 --- a/openmetadata-integration-tests/src/test/java/org/openmetadata/it/tests/ManualTaskOutboxIT.java +++ b/openmetadata-integration-tests/src/test/java/org/openmetadata/it/tests/ManualTaskOutboxIT.java @@ -16,11 +16,13 @@ import static org.awaitility.Awaitility.await; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; import com.fasterxml.jackson.core.type.TypeReference; import java.time.Duration; import java.util.List; import java.util.Map; +import java.util.UUID; import java.util.concurrent.atomic.AtomicReference; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; @@ -38,23 +40,32 @@ import org.openmetadata.schema.entity.services.DatabaseService; import org.openmetadata.schema.entity.tasks.Task; import org.openmetadata.schema.governance.workflows.WorkflowDefinition; +import org.openmetadata.schema.tests.TestCase; +import org.openmetadata.schema.tests.type.TestCaseResolutionStatus; +import org.openmetadata.schema.tests.type.TestCaseResolutionStatusTypes; +import org.openmetadata.schema.tests.type.TestCaseStatus; import org.openmetadata.schema.type.TaskCategory; import org.openmetadata.schema.type.TaskEntityStatus; import org.openmetadata.schema.type.TaskEntityType; import org.openmetadata.schema.utils.JsonUtils; import org.openmetadata.sdk.client.OpenMetadataClient; +import org.openmetadata.sdk.fluent.builders.TestCaseBuilder; import org.openmetadata.sdk.models.ListParams; import org.openmetadata.sdk.models.ListResponse; import org.openmetadata.sdk.network.HttpMethod; import org.openmetadata.sdk.network.RequestOptions; /** - * E2E integration test for the outbox-based ManualTask message delivery pipeline. + * E2E integration test for the outbox-based ManualTask message delivery pipeline and incident TCRS + * sync. * *

Verifies that task status changes (via PATCH) flow through the full pipeline: ChangeEvent → * WorkflowEventConsumer → task_workflow_outbox → TaskWorkflowOutboxDrainer → Flowable * messageEventReceived → ManualTask subprocess processes each status in order. * + *

Also verifies the IncidentTcrsSyncHandler: when a workflow creates an incident Task with + * aboutEntityLink, status changes are synced to TCRS records (Task → TCRS Strangler Fig bridge). + * *

Proof of delivery: workflow instance stage results show each status transition, and the * workflow instance reaches FINISHED status. */ @@ -66,7 +77,7 @@ public class ManualTaskOutboxIT { private static final Duration PIPELINE_TIMEOUT = Duration.ofSeconds(120); @Test - void outboxDeliversStatusChangesInOrder(TestNamespace ns) { + void outboxDeliversStatusChangesInOrder_andSyncsTcrs(TestNamespace ns) { OpenMetadataClient client = SdkClients.adminClient(); String id = ns.shortPrefix(); @@ -74,13 +85,31 @@ void outboxDeliversStatusChangesInOrder(TestNamespace ns) { DatabaseService service = DatabaseServiceTestFactory.createPostgresWithName("sv" + id, ns); DatabaseSchema schema = DatabaseSchemaTestFactory.createSimpleWithName("sc" + id, ns, service); + Table table = + TableTestFactory.createSimpleWithName("tbl" + id, ns, schema.getFullyQualifiedName()); + + // Deploy workflow and wait for Flowable to confirm the process definition is ready. + // The ChangeEvent consumer processes events in offset order — if we trigger + // the test failure before Flowable has the signal start event registered, + // the testCase-entityUpdated signal fires but no process catches it. + // Create TestCase BEFORE deploying workflow — the testCase-entityCreated ChangeEvent + // will be consumed before the workflow exists, avoiding spurious triggers. + TestCase testCase = + TestCaseBuilder.create(client) + .name("tc" + id) + .forTable(table) + .testDefinition("tableRowCountToEqual") + .parameter("value", "100") + .create(); WorkflowDefinition workflow = deployManualTaskWorkflow(client, workflowName); assertNotNull(workflow, "Workflow should be deployed"); + waitForWorkflowDeployed(client, workflowName); try { - Table table = - TableTestFactory.createSimpleWithName("tbl" + id, ns, schema.getFullyQualifiedName()); + // Trigger: fail the test → ChangeEventHandler creates a testCase-entityUpdated ChangeEvent + // (via getChangeEventForEntityTimeSeries which resolves TestCaseResult → parent TestCase). + createFailedTestResult(client, testCase); AtomicReference taskRef = new AtomicReference<>(); await() @@ -88,7 +117,7 @@ void outboxDeliversStatusChangesInOrder(TestNamespace ns) { .pollInterval(Duration.ofSeconds(2)) .until( () -> { - Task found = findIncidentTaskForEntity(client, table); + Task found = findIncidentTaskForTestCase(client, testCase); if (found != null && found.getWorkflowInstanceId() != null) { taskRef.set(found); return true; @@ -102,13 +131,40 @@ void outboxDeliversStatusChangesInOrder(TestNamespace ns) { assertEquals(TaskCategory.Incident, task.getCategory()); assertEquals(TaskEntityType.IncidentResolution, task.getType()); + // Verify aboutEntityLink encodes testCase FQN + incident stateId + String aboutLink = task.getAboutEntityLink(); + assertNotNull(aboutLink, "aboutEntityLink should be populated"); + assertTrue(aboutLink.contains("testCase"), "Should reference testCase entity type"); + assertTrue(aboutLink.contains("incidents"), "Should contain incidents field"); + assertTrue( + aboutLink.contains(testCase.getFullyQualifiedName()), "Should contain the testCase FQN"); + + // Get stateId from TestCase for TCRS verification + TestCase updatedTc = + client.testCases().getByName(testCase.getFullyQualifiedName(), "incidentId"); + UUID stateId = updatedTc.getIncidentId(); + assertNotNull(stateId, "TestCase should have incidentId set from test failure"); + assertTrue(aboutLink.contains(stateId.toString()), "Should contain the incident stateId"); + patchTaskStatus(client, task.getId().toString(), "InProgress"); + + // Verify TCRS(Ack) synced by handler (async — poll) + await() + .atMost(Duration.ofSeconds(30)) + .pollInterval(Duration.ofSeconds(2)) + .until( + () -> + listTcrsForStateId(client, stateId).stream() + .anyMatch( + r -> + r.getTestCaseResolutionStatusType() + == TestCaseResolutionStatusTypes.Ack)); + patchTaskStatus(client, task.getId().toString(), "Completed"); String workflowInstanceId = task.getWorkflowInstanceId().toString(); // Workflow reaching FINISHED proves the full outbox pipeline delivered the terminal status - // (InProgress delivery is implicit — workflow can't reach FINISHED without processing it) await() .atMost(PIPELINE_TIMEOUT) .pollInterval(Duration.ofSeconds(2)) @@ -126,9 +182,39 @@ void outboxDeliversStatusChangesInOrder(TestNamespace ns) { Task resolvedTask = client.tasks().get(task.getId().toString(), "resolution"); assertNotNull(resolvedTask.getResolution(), "CloseTaskDelegate should have set resolution"); - // Count how many times the ManualTask subprocess was entered. - // Each entry creates a new stage record via WorkflowInstanceStageListener. - // Expected: 3 entries (Open cycle, InProgress cycle, Completed cycle) + // Verify TCRS(Resolved) synced by handler + await() + .atMost(Duration.ofSeconds(30)) + .pollInterval(Duration.ofSeconds(2)) + .until( + () -> + listTcrsForStateId(client, stateId).stream() + .anyMatch( + r -> + r.getTestCaseResolutionStatusType() + == TestCaseResolutionStatusTypes.Resolved)); + + // Final TCRS verification: all expected records exist + List allRecords = listTcrsForStateId(client, stateId); + assertTrue( + allRecords.stream() + .anyMatch( + r -> r.getTestCaseResolutionStatusType() == TestCaseResolutionStatusTypes.New), + "TCRS(New) should exist from initial test failure"); + assertTrue( + allRecords.stream() + .anyMatch( + r -> r.getTestCaseResolutionStatusType() == TestCaseResolutionStatusTypes.Ack), + "TCRS(Ack) should be synced from Task InProgress"); + assertTrue( + allRecords.stream() + .anyMatch( + r -> + r.getTestCaseResolutionStatusType() + == TestCaseResolutionStatusTypes.Resolved), + "TCRS(Resolved) should be synced from Task Completed"); + + // Count ManualTask subprocess entries List> states = getWorkflowInstanceStates(client, workflowName, workflowInstanceId); long manualTaskEntries = @@ -155,12 +241,12 @@ private WorkflowDefinition deployManualTaskWorkflow( { "name": "%s", "displayName": "Outbox E2E Test Workflow", - "description": "Tests outbox-based ManualTask delivery", + "description": "Tests outbox-based ManualTask delivery and TCRS sync", "trigger": { "type": "eventBasedEntity", "config": { - "events": ["Created"], - "entityTypes": ["table"] + "events": ["Updated"], + "entityTypes": ["testCase"] }, "output": ["relatedEntity"] }, @@ -215,25 +301,75 @@ private WorkflowDefinition deployManualTaskWorkflow( return client.workflowDefinitions().create(request); } - private Task findIncidentTaskForEntity(OpenMetadataClient client, Table table) { + private void waitForWorkflowDeployed(OpenMetadataClient client, String workflowName) { + await() + .atMost(Duration.ofSeconds(30)) + .pollInterval(Duration.ofSeconds(2)) + .until( + () -> { + try { + WorkflowDefinition wd = + client.workflowDefinitions().getByName(workflowName, "deployed"); + return Boolean.TRUE.equals(wd.getDeployed()); + } catch (Exception e) { + return false; + } + }); + } + + private void createFailedTestResult(OpenMetadataClient client, TestCase testCase) { + org.openmetadata.schema.api.tests.CreateTestCaseResult failedResult = + new org.openmetadata.schema.api.tests.CreateTestCaseResult(); + failedResult.setTimestamp(System.currentTimeMillis()); + failedResult.setTestCaseStatus(TestCaseStatus.Failed); + failedResult.setResult("Test failed - triggering incident"); + client.testCaseResults().create(testCase.getFullyQualifiedName(), failedResult); + } + + private Task findIncidentTaskForTestCase(OpenMetadataClient client, TestCase testCase) { ListParams params = new ListParams() .addFilter("category", "Incident") - .addFilter("status", "Open") - .setFields("payload,about") + .setFields("payload,about,aboutEntityLink") .setLimit(100); ListResponse tasks = client.tasks().list(params); for (Task task : tasks.getData()) { if (task.getAbout() != null && task.getAbout().getFullyQualifiedName() != null - && task.getAbout().getFullyQualifiedName().equals(table.getFullyQualifiedName())) { + && task.getAbout().getFullyQualifiedName().equals(testCase.getFullyQualifiedName())) { return task; } } return null; } + @SuppressWarnings("unchecked") + private List listTcrsForStateId( + OpenMetadataClient client, UUID stateId) { + try { + String response = + client + .getHttpClient() + .executeForString( + HttpMethod.GET, + "/v1/dataQuality/testCases/testCaseIncidentStatus/stateId/" + stateId, + null, + RequestOptions.builder().build()); + + Map result = JsonUtils.readValue(response, new TypeReference<>() {}); + List data = (List) result.get("data"); + if (data == null) { + return List.of(); + } + return data.stream() + .map(d -> JsonUtils.convertValue(d, TestCaseResolutionStatus.class)) + .toList(); + } catch (Exception e) { + return List.of(); + } + } + private void patchTaskStatus(OpenMetadataClient client, String taskId, String status) { String patchJson = String.format("[{\"op\": \"replace\", \"path\": \"/status\", \"value\": \"%s\"}]", status); diff --git a/openmetadata-sdk/src/main/java/org/openmetadata/sdk/client/OpenMetadataClient.java b/openmetadata-sdk/src/main/java/org/openmetadata/sdk/client/OpenMetadataClient.java index 1fef43768f4c..ef27d653220d 100644 --- a/openmetadata-sdk/src/main/java/org/openmetadata/sdk/client/OpenMetadataClient.java +++ b/openmetadata-sdk/src/main/java/org/openmetadata/sdk/client/OpenMetadataClient.java @@ -298,7 +298,7 @@ public OpenMetadataClient(OpenMetadataConfig config) { this.aiApplications = new AIApplicationService(httpClient); this.promptTemplates = new PromptTemplateService(httpClient); -// Initialize task services + // Initialize task services this.tasks = new TaskService(httpClient); // Initialize feed service diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/Entity.java b/openmetadata-service/src/main/java/org/openmetadata/service/Entity.java index 104101b6c0a6..6d0ddde6364d 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/Entity.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/Entity.java @@ -58,6 +58,7 @@ import org.openmetadata.service.audit.AuditLogRepository; import org.openmetadata.service.events.lifecycle.EntityLifecycleEventDispatcher; import org.openmetadata.service.events.lifecycle.handlers.DomainSyncHandler; +import org.openmetadata.service.events.lifecycle.handlers.IncidentTcrsSyncHandler; import org.openmetadata.service.exception.CatalogExceptionMessage; import org.openmetadata.service.exception.EntityNotFoundException; import org.openmetadata.service.jdbi3.ChangeEventRepository; @@ -392,12 +393,12 @@ public static void initializeRepositories(OpenMetadataApplicationConfig config, } } } - registerDomainSyncHandler(); + registerLifecycleHandlers(); initializedRepositories = true; } } - private static void registerDomainSyncHandler() { + private static void registerLifecycleHandlers() { try { DomainSyncHandler domainSyncHandler = new DomainSyncHandler(); EntityLifecycleEventDispatcher.getInstance().registerHandler(domainSyncHandler); @@ -405,6 +406,13 @@ private static void registerDomainSyncHandler() { } catch (Exception e) { LOG.error("Failed to register DomainSyncHandler", e); } + try { + IncidentTcrsSyncHandler incidentTcrsSyncHandler = new IncidentTcrsSyncHandler(); + EntityLifecycleEventDispatcher.getInstance().registerHandler(incidentTcrsSyncHandler); + LOG.info("Successfully registered IncidentTcrsSyncHandler for entity lifecycle events"); + } catch (Exception e) { + LOG.error("Failed to register IncidentTcrsSyncHandler", e); + } } public static void cleanup() { diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/events/lifecycle/handlers/IncidentTcrsSyncHandler.java b/openmetadata-service/src/main/java/org/openmetadata/service/events/lifecycle/handlers/IncidentTcrsSyncHandler.java new file mode 100644 index 000000000000..8ceccec1875b --- /dev/null +++ b/openmetadata-service/src/main/java/org/openmetadata/service/events/lifecycle/handlers/IncidentTcrsSyncHandler.java @@ -0,0 +1,238 @@ +package org.openmetadata.service.events.lifecycle.handlers; + +import java.util.Collections; +import java.util.List; +import java.util.Set; +import java.util.UUID; +import lombok.extern.slf4j.Slf4j; +import org.openmetadata.schema.EntityInterface; +import org.openmetadata.schema.entity.tasks.Task; +import org.openmetadata.schema.tests.type.Resolved; +import org.openmetadata.schema.tests.type.TestCaseResolutionStatus; +import org.openmetadata.schema.tests.type.TestCaseResolutionStatusTypes; +import org.openmetadata.schema.type.ChangeDescription; +import org.openmetadata.schema.type.EntityReference; +import org.openmetadata.schema.type.FieldChange; +import org.openmetadata.schema.type.Include; +import org.openmetadata.schema.type.Relationship; +import org.openmetadata.schema.type.TaskCategory; +import org.openmetadata.schema.type.TaskEntityStatus; +import org.openmetadata.schema.utils.JsonUtils; +import org.openmetadata.service.Entity; +import org.openmetadata.service.events.lifecycle.EntityLifecycleEventHandler; +import org.openmetadata.service.jdbi3.CollectionDAO; +import org.openmetadata.service.resources.feeds.MessageParser; +import org.openmetadata.service.security.policyevaluator.SubjectContext; + +/** + * Strangler Fig bridge: syncs Task lifecycle events to TCRS (TestCaseResolutionStatus) records. + * + *

Listens for Task entity events. When a Task with category=Incident and an aboutEntityLink + * containing "incidents" is created or updated, writes corresponding TCRS records to the + * time-series table. This keeps the existing incident UI and API working while Tasks become the + * source of truth. + * + *

Removable: delete this file + one registration line in Entity.java. + */ +@Slf4j +public class IncidentTcrsSyncHandler implements EntityLifecycleEventHandler { + + private static final String HANDLER_NAME = "IncidentTcrsSyncHandler"; + private static final String INCIDENTS_FIELD = "incidents"; + + @Override + public String getHandlerName() { + return HANDLER_NAME; + } + + @Override + public Set getSupportedEntityTypes() { + return Set.of(Entity.TASK); + } + + @Override + public void onEntityCreated(EntityInterface entity, SubjectContext subjectContext) { + if (!(entity instanceof Task task) || !isIncidentTask(task)) { + return; + } + + MessageParser.EntityLink link = MessageParser.EntityLink.parse(task.getAboutEntityLink()); + String testCaseFqn = link.getEntityFQN(); + UUID stateId = UUID.fromString(link.getArrayFieldName()); + + if (incidentResolutionStatusExists(stateId)) { + LOG.debug("[{}] TCRS(New) already exists for stateId={}, skipping.", HANDLER_NAME, stateId); + return; + } + + EntityReference updatedBy = task.getCreatedBy(); + + TestCaseResolutionStatus tcrs = + new TestCaseResolutionStatus() + .withId(UUID.randomUUID()) + .withStateId(stateId) + .withTimestamp(System.currentTimeMillis()) + .withTestCaseResolutionStatusType(TestCaseResolutionStatusTypes.New) + .withUpdatedBy(updatedBy) + .withUpdatedAt(System.currentTimeMillis()); + + insertIncidentResolutionStatus(testCaseFqn, tcrs); + LOG.info( + "[{}] Synced Task created -> TCRS(New) for testCase='{}', stateId='{}'", + HANDLER_NAME, + testCaseFqn, + stateId); + } + + @Override + public void onEntityUpdated( + EntityInterface entity, ChangeDescription changeDescription, SubjectContext subjectContext) { + if (!(entity instanceof Task task) || !isIncidentTask(task)) { + return; + } + + MessageParser.EntityLink link = MessageParser.EntityLink.parse(task.getAboutEntityLink()); + String testCaseFqn = link.getEntityFQN(); + UUID stateId = UUID.fromString(link.getArrayFieldName()); + + TestCaseResolutionStatusTypes tcrsType = + mapTaskChangeToResolutionStatusType(task, changeDescription); + if (tcrsType == null) { + return; + } + + EntityReference updatedBy = resolveUpdatedBy(task); + + TestCaseResolutionStatus tcrs = + new TestCaseResolutionStatus() + .withId(UUID.randomUUID()) + .withStateId(stateId) + .withTimestamp(System.currentTimeMillis()) + .withTestCaseResolutionStatusType(tcrsType) + .withUpdatedBy(updatedBy) + .withUpdatedAt(System.currentTimeMillis()); + + if (tcrsType == TestCaseResolutionStatusTypes.Resolved) { + Resolved resolved = + new Resolved() + .withResolvedBy(updatedBy) + .withTestCaseFailureComment("Resolved via governance workflow task"); + tcrs.withTestCaseResolutionStatusDetails(resolved); + } + + insertIncidentResolutionStatus(testCaseFqn, tcrs); + LOG.info( + "[{}] Synced Task update -> TCRS({}) for testCase='{}', stateId='{}'", + HANDLER_NAME, + tcrsType, + testCaseFqn, + stateId); + } + + boolean isIncidentTask(Task task) { + if (task.getCategory() != TaskCategory.Incident) { + return false; + } + String aboutLink = task.getAboutEntityLink(); + if (aboutLink == null || aboutLink.isBlank()) { + return false; + } + try { + MessageParser.EntityLink link = MessageParser.EntityLink.parse(aboutLink); + return INCIDENTS_FIELD.equals(link.getFieldName()) && link.getArrayFieldName() != null; + } catch (Exception e) { + LOG.warn("[{}] Failed to parse aboutEntityLink: {}", HANDLER_NAME, aboutLink, e); + return false; + } + } + + private TestCaseResolutionStatusTypes mapTaskChangeToResolutionStatusType( + Task task, ChangeDescription changeDescription) { + if (isTerminalStatus(task.getStatus())) { + return TestCaseResolutionStatusTypes.Resolved; + } + + if (changeDescription == null) { + return null; + } + + for (FieldChange fc : safeList(changeDescription.getFieldsUpdated())) { + if ("status".equals(fc.getName())) { + String newStatus = extractStringValue(fc.getNewValue()); + if (TaskEntityStatus.InProgress.value().equals(newStatus)) { + return TestCaseResolutionStatusTypes.Ack; + } + } + } + + for (FieldChange fc : safeList(changeDescription.getFieldsUpdated())) { + if ("assignees".equals(fc.getName())) { + return TestCaseResolutionStatusTypes.Assigned; + } + } + for (FieldChange fc : safeList(changeDescription.getFieldsAdded())) { + if ("assignees".equals(fc.getName())) { + return TestCaseResolutionStatusTypes.Assigned; + } + } + + return null; + } + + private boolean isTerminalStatus(TaskEntityStatus status) { + return status == TaskEntityStatus.Completed + || status == TaskEntityStatus.Cancelled + || status == TaskEntityStatus.Failed; + } + + private EntityReference resolveUpdatedBy(Task task) { + String updatedBy = task.getUpdatedBy(); + if (updatedBy != null && !updatedBy.isBlank()) { + try { + return Entity.getEntityReferenceByName(Entity.USER, updatedBy, Include.NON_DELETED); + } catch (Exception e) { + LOG.warn( + "[{}] Could not resolve updatedBy '{}': {}", HANDLER_NAME, updatedBy, e.getMessage()); + } + } + return task.getCreatedBy(); + } + + private boolean incidentResolutionStatusExists(UUID stateId) { + List records = + ((CollectionDAO.TestCaseResolutionStatusTimeSeriesDAO) + Entity.getCollectionDAO().testCaseResolutionStatusTimeSeriesDao()) + .listTestCaseResolutionStatusesForStateId(stateId.toString()); + return !records.isEmpty(); + } + + private void insertIncidentResolutionStatus(String testCaseFqn, TestCaseResolutionStatus tcrs) { + Entity.getCollectionDAO() + .testCaseResolutionStatusTimeSeriesDao() + .insert(testCaseFqn, Entity.TEST_CASE_RESOLUTION_STATUS, JsonUtils.pojoToJson(tcrs)); + + EntityReference testCaseRef = + Entity.getEntityReferenceByName(Entity.TEST_CASE, testCaseFqn, Include.ALL); + Entity.getCollectionDAO() + .relationshipDAO() + .insert( + testCaseRef.getId(), + tcrs.getId(), + Entity.TEST_CASE, + Entity.TEST_CASE_RESOLUTION_STATUS, + Relationship.PARENT_OF.ordinal()); + } + + private String extractStringValue(Object value) { + if (value instanceof String s) { + return (s.length() >= 2 && s.startsWith("\"") && s.endsWith("\"")) + ? s.substring(1, s.length() - 1) + : s; + } + return value != null ? value.toString() : null; + } + + private List safeList(List list) { + return list != null ? list : Collections.emptyList(); + } +} diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/nodes/manualTask/impl/SetupImpl.java b/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/nodes/manualTask/impl/SetupImpl.java index 4159630ad50e..18ad1ad251f8 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/nodes/manualTask/impl/SetupImpl.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/nodes/manualTask/impl/SetupImpl.java @@ -9,6 +9,7 @@ import lombok.extern.slf4j.Slf4j; import org.openmetadata.schema.EntityInterface; import org.openmetadata.schema.entity.tasks.Task; +import org.openmetadata.schema.tests.TestCase; import org.openmetadata.schema.type.ChangeEvent; import org.openmetadata.schema.type.EntityReference; import org.openmetadata.schema.type.EventType; @@ -53,6 +54,7 @@ public static UUID createTask( Entity.getEntityReferenceByName(Entity.USER, resolvedCreator, Include.NON_DELETED); List assignees = resolveAssignees(entity, assigneesConfig); + String aboutEntityLink = buildAboutEntityLink(entity, entityLinkStr, taskType); Task task = new Task() @@ -62,6 +64,7 @@ public static UUID createTask( .withStatus(TaskEntityStatus.Open) .withPriority(TaskPriority.Medium) .withAbout(aboutRef) + .withAboutEntityLink(aboutEntityLink) .withAssignees(assignees) .withCreatedBy(createdByRef) .withWorkflowInstanceId(workflowInstanceId) @@ -84,6 +87,27 @@ public static UUID createTask( return task.getId(); } + private static String buildAboutEntityLink( + EntityInterface entity, String entityLinkStr, TaskEntityType taskType) { + switch (taskType) { + case TestCaseResolution, IncidentResolution -> { + if (entity instanceof TestCase testCase && testCase.getIncidentId() != null) { + return new MessageParser.EntityLink( + "testCase", + testCase.getFullyQualifiedName(), + "incidents", + testCase.getIncidentId().toString(), + null) + .getLinkString(); + } + return entityLinkStr; + } + default -> { + return entityLinkStr; + } + } + } + @SuppressWarnings("unchecked") private static List resolveAssignees( EntityInterface entity, Map assigneesConfig) { diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/TestCaseResolutionStatusRepository.java b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/TestCaseResolutionStatusRepository.java index 1a755c69f38b..3c822d85beff 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/TestCaseResolutionStatusRepository.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/TestCaseResolutionStatusRepository.java @@ -247,6 +247,17 @@ private void openOrAssignTask(TestCaseResolutionStatus incidentStatus) { LOG.debug( "openOrAssignTask called with status: {}", incidentStatus.getTestCaseResolutionStatusType()); + + // Guard: skip if a workflow-managed Task already exists for this incident + Task existingWorkflowTask = getIncidentTask(incidentStatus); + if (existingWorkflowTask != null && existingWorkflowTask.getWorkflowInstanceId() != null) { + LOG.debug( + "[TCRS Guard] Workflow-managed Task {} exists for stateId={}, skipping.", + existingWorkflowTask.getId(), + incidentStatus.getStateId()); + return; + } + switch (incidentStatus.getTestCaseResolutionStatusType()) { case Ack -> { // If the incident has been acknowledged, the task will be assigned to the user diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/WorkflowDefinitionRepository.java b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/WorkflowDefinitionRepository.java index 028715c5629f..310ad5913844 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/WorkflowDefinitionRepository.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/WorkflowDefinitionRepository.java @@ -374,7 +374,8 @@ private Map> buildCycleCheckGraph( } validLoopSources.put( entry.getKey(), - nonTerminalReachable(entry.getKey(), (ManualTaskDefinition) entry.getValue(), edges)); + nonTerminalReachable( + entry.getKey(), (ManualTaskDefinition) entry.getValue(), edges, outgoingEdges)); } Map> graph = new java.util.HashMap<>(); @@ -397,7 +398,10 @@ private Map> buildCycleCheckGraph( * and multi-hop loops are validated uniformly. */ private Set nonTerminalReachable( - String mtId, ManualTaskDefinition manualTask, List edges) { + String mtId, + ManualTaskDefinition manualTask, + List edges, + Map> outgoingEdges) { Set terminal = Set.copyOf( @@ -419,12 +423,9 @@ private Set nonTerminalReachable( while (!queue.isEmpty()) { String current = queue.poll(); - for (EdgeDefinition edge : edges) { - if (edge.getFrom().equals(current)) { - String neighbor = edge.getTo(); - if (!neighbor.equals(mtId) && reachable.add(neighbor)) { - queue.add(neighbor); - } + for (String neighbor : outgoingEdges.getOrDefault(current, List.of())) { + if (!neighbor.equals(mtId) && reachable.add(neighbor)) { + queue.add(neighbor); } } } diff --git a/openmetadata-service/src/test/java/org/openmetadata/service/events/lifecycle/handlers/IncidentTcrsSyncHandlerTest.java b/openmetadata-service/src/test/java/org/openmetadata/service/events/lifecycle/handlers/IncidentTcrsSyncHandlerTest.java new file mode 100644 index 000000000000..7c3f5557f5a6 --- /dev/null +++ b/openmetadata-service/src/test/java/org/openmetadata/service/events/lifecycle/handlers/IncidentTcrsSyncHandlerTest.java @@ -0,0 +1,100 @@ +package org.openmetadata.service.events.lifecycle.handlers; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.util.Set; +import java.util.UUID; +import org.junit.jupiter.api.Test; +import org.openmetadata.schema.entity.tasks.Task; +import org.openmetadata.schema.type.TaskCategory; +import org.openmetadata.schema.type.TaskEntityType; +import org.openmetadata.service.Entity; +import org.openmetadata.service.resources.feeds.MessageParser; + +class IncidentTcrsSyncHandlerTest { + + private final IncidentTcrsSyncHandler handler = new IncidentTcrsSyncHandler(); + + @Test + void handlerName() { + assertEquals("IncidentTcrsSyncHandler", handler.getHandlerName()); + } + + @Test + void supportedEntityTypes_onlyTask() { + assertEquals(Set.of(Entity.TASK), handler.getSupportedEntityTypes()); + } + + @Test + void isAsync_true() { + assertTrue(handler.isAsync()); + } + + @Test + void parseAboutEntityLink_extractsTestCaseFqnAndStateId() { + UUID stateId = UUID.randomUUID(); + String link = "<#E::testCase::myService.myDb.mySchema.myTestCase::incidents::" + stateId + ">"; + + MessageParser.EntityLink parsed = MessageParser.EntityLink.parse(link); + + assertEquals("testCase", parsed.getEntityType()); + assertEquals("myService.myDb.mySchema.myTestCase", parsed.getEntityFQN()); + assertEquals("incidents", parsed.getFieldName()); + assertEquals(stateId.toString(), parsed.getArrayFieldName()); + } + + @Test + void isIncidentTask_trueForIncidentCategoryWithLink() { + Task task = + new Task() + .withCategory(TaskCategory.Incident) + .withType(TaskEntityType.TestCaseResolution) + .withAboutEntityLink("<#E::testCase::fqn::incidents::" + UUID.randomUUID() + ">"); + + assertTrue(handler.isIncidentTask(task)); + } + + @Test + void isIncidentTask_falseForNonIncidentCategory() { + Task task = + new Task() + .withCategory(TaskCategory.Approval) + .withAboutEntityLink("<#E::glossaryTerm::fqn>"); + + assertFalse(handler.isIncidentTask(task)); + } + + @Test + void isIncidentTask_falseForMissingLink() { + Task task = new Task().withCategory(TaskCategory.Incident); + + assertFalse(handler.isIncidentTask(task)); + } + + @Test + void isIncidentTask_falseForLinkWithoutIncidentsField() { + Task task = + new Task().withCategory(TaskCategory.Incident).withAboutEntityLink("<#E::testCase::fqn>"); + + assertFalse(handler.isIncidentTask(task)); + } + + @Test + void isIncidentTask_falseForBlankLink() { + Task task = new Task().withCategory(TaskCategory.Incident).withAboutEntityLink(""); + + assertFalse(handler.isIncidentTask(task)); + } + + @Test + void isIncidentTask_falseForIncidentsFieldWithoutStateId() { + Task task = + new Task() + .withCategory(TaskCategory.Incident) + .withAboutEntityLink("<#E::testCase::fqn::incidents>"); + + assertFalse(handler.isIncidentTask(task)); + } +} diff --git a/openmetadata-spec/src/main/resources/json/schema/entity/tasks/task.json b/openmetadata-spec/src/main/resources/json/schema/entity/tasks/task.json index 7d792198af4e..997c18b375d0 100644 --- a/openmetadata-spec/src/main/resources/json/schema/entity/tasks/task.json +++ b/openmetadata-spec/src/main/resources/json/schema/entity/tasks/task.json @@ -253,6 +253,10 @@ "description": "Reference to the entity this task is about.", "$ref": "../../type/entityReference.json" }, + "aboutEntityLink": { + "description": "EntityLink string providing hierarchical entity identity for this task. Format: <#E::entityType::fqn[::fieldName[::arrayFieldName]]>. For incident tasks, encodes testCase FQN and incident stateId. Used by lifecycle handlers for zero-resolution data extraction.", + "type": "string" + }, "aboutFqnHash": { "description": "Hash of the target entity's fully qualified name for efficient querying. Computed from about.fullyQualifiedName using FullyQualifiedName.buildHash().", "type": "string" diff --git a/openmetadata-spec/src/main/resources/json/schema/type/workflowTriggerFields.json b/openmetadata-spec/src/main/resources/json/schema/type/workflowTriggerFields.json index 5eafa3866112..9b6e127d2bf2 100644 --- a/openmetadata-spec/src/main/resources/json/schema/type/workflowTriggerFields.json +++ b/openmetadata-spec/src/main/resources/json/schema/type/workflowTriggerFields.json @@ -24,6 +24,7 @@ "glossary", "parent", "children", - "experts" + "experts", + "testCaseStatus" ] } \ No newline at end of file diff --git a/openmetadata-ui/src/main/resources/ui/src/generated/entity/tasks/task.ts b/openmetadata-ui/src/main/resources/ui/src/generated/entity/tasks/task.ts index 8af7325891ed..3cf07d09debf 100644 --- a/openmetadata-ui/src/main/resources/ui/src/generated/entity/tasks/task.ts +++ b/openmetadata-ui/src/main/resources/ui/src/generated/entity/tasks/task.ts @@ -20,6 +20,13 @@ export interface Task { * Reference to the entity this task is about. */ about?: EntityReference; + /** + * EntityLink string providing hierarchical entity identity for this task. Format: + * <#E::entityType::fqn[::fieldName[::arrayFieldName]]>. For incident tasks, encodes + * testCase FQN and incident stateId. Used by lifecycle handlers for zero-resolution data + * extraction. + */ + aboutEntityLink?: string; /** * Hash of the target entity's fully qualified name for efficient querying. Computed from * about.fullyQualifiedName using FullyQualifiedName.buildHash(). diff --git a/openmetadata-ui/src/main/resources/ui/src/generated/type/workflowTriggerFields.ts b/openmetadata-ui/src/main/resources/ui/src/generated/type/workflowTriggerFields.ts index f64e8b2cdb18..d0bd9821c8bd 100644 --- a/openmetadata-ui/src/main/resources/ui/src/generated/type/workflowTriggerFields.ts +++ b/openmetadata-ui/src/main/resources/ui/src/generated/type/workflowTriggerFields.ts @@ -33,4 +33,5 @@ export enum WorkflowTriggerFields { Reviewers = "reviewers", Synonyms = "synonyms", Tags = "tags", + TestCaseStatus = "testCaseStatus", }