Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 33 additions & 1 deletion packages/daemon/src/lib/space/runtime/channel-router.ts
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,9 @@ import { evaluateGate, type GateEvalResult, type GateScriptExecutorFn } from './
import type { GateScriptContext } from './gate-script-executor';
import { executeGateScript } from './gate-script-executor';
import type { NotificationSink, SpaceNotificationEvent } from './notification-sink';
import { Logger } from '../../logger';

const log = new Logger('channel-router');

// ---------------------------------------------------------------------------
// Gate result types (formerly in channel-gate-evaluator.ts)
Expand Down Expand Up @@ -226,6 +229,17 @@ export interface ChannelRouterConfig {
* propagate into message delivery / activation paths.
*/
notificationSink?: NotificationSink;
/**
* Called when a gate is actively waiting for human approval (gate data was
* written, but the gate is still not open because `approved` has not been set).
* Allows the caller to transition the canonical task to `review` status so the
* task appears in the correct group in the UI.
*
* Only invoked for external-approval gates (gates with an `approved` field whose
* `writers` array is empty). Not invoked when the gate opens, when it's blocked
* by a script failure, or when it has no external-approval field.
*/
onGatePendingApproval?: (runId: string, gateId: string) => Promise<void>;
}

// ---------------------------------------------------------------------------
Expand Down Expand Up @@ -637,7 +651,25 @@ export class ChannelRouter {

// Evaluate the gate once (shared across all channels referencing it)
const gateResult = await this.evaluateGateById(runId, gateId, workflow);
if (!gateResult.open) return [];
if (!gateResult.open) {
// Notify caller when the gate is waiting for human approval. Only fires for
// external-approval gates — those with an `approved` field with no declared
// writers (i.e. only a human can set `approved: true`).
if (this.config.onGatePendingApproval && gateDef) {
const needsHuman = (gateDef.fields ?? []).some(
(f) => f.name === 'approved' && Array.isArray(f.writers) && f.writers.length === 0
);
if (needsHuman) {
void this.config.onGatePendingApproval(runId, gateId).catch((err) => {
log.warn(
`onGatePendingApproval failed for gate "${gateId}" in run "${runId}": ` +
(err instanceof Error ? err.message : String(err))
);
});
}
}
return [];
}

// Determine if any of these channels are cyclic — if so, enforce the per-channel cap
// before activating any node (mirrors the guard in deliverMessage).
Expand Down
39 changes: 39 additions & 0 deletions packages/daemon/src/lib/space/runtime/space-runtime-service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -657,6 +657,44 @@ export class SpaceRuntimeService {
// No-op: shared runtime handles all spaces; use stop() to stop entirely.
}

/**
* Called when a gate is waiting for human approval (gate data exists but
* `approved` hasn't been set yet). Transitions the canonical task to `review`
* so the task appears in the "Needs Attention" group in the UI.
*
* No-op when:
* - The run or its tasks cannot be found
* - No non-archived task is currently `in_progress` or `open`
*/
async handleGatePendingApproval(runId: string, _gateId: string): Promise<void> {
const run = this.config.workflowRunRepo.getRun(runId);
if (!run) return;

const tasks = this.config.taskRepo.listByWorkflowRun(runId);
if (tasks.length === 0) return;

// Find the canonical task that is actively running. Gate pending approval
// happens while the agent is working or has just finished writing gate data.
const canonical =
tasks.find((t) => t.status === 'in_progress') ?? tasks.find((t) => t.status === 'open');
if (!canonical) return;

const updated = this.config.taskRepo.updateTask(canonical.id, {
status: 'review',
pendingCheckpointType: 'gate',
});
if (!updated) return;

if (this.config.daemonHub) {
await this.config.daemonHub.emit('space.task.updated', {
sessionId: 'global',
spaceId: run.spaceId,
taskId: updated.id,
task: updated,
});
}
}

/**
* Notify that gate data has changed for a given run/gate pair.
*
Expand Down Expand Up @@ -698,6 +736,7 @@ export class SpaceRuntimeService {
// Forward the runtime's current sink so a gate-driven reopen still
// surfaces `workflow_run_reopened` to the Space Agent session.
notificationSink: this.runtime.getNotificationSink(),
onGatePendingApproval: (runId, gateId) => this.handleGatePendingApproval(runId, gateId),
});
return router.onGateDataChanged(runId, gateId);
}
Expand Down
6 changes: 5 additions & 1 deletion packages/daemon/src/lib/space/runtime/space-runtime.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1681,12 +1681,16 @@ export class SpaceRuntime {
);
}
}
if (canonicalTask.status === 'open') {
if (
canonicalTask.status === 'open' ||
(canonicalTask.status === 'review' && canonicalTask.pendingCheckpointType === 'gate')
) {
const nowTs = Date.now();
await this.updateTaskAndEmit(meta.spaceId, canonicalTask.id, {
status: 'in_progress',
startedAt: canonicalTask.startedAt ?? nowTs,
completedAt: null,
pendingCheckpointType: null,
});
}
}
Expand Down
2 changes: 2 additions & 0 deletions packages/daemon/src/lib/space/runtime/task-agent-manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2649,6 +2649,8 @@ export class TaskAgentManager {
// that auto-reopens a terminal run still emits `workflow_run_reopened`
// into the Space Agent session.
notificationSink: this.config.spaceRuntimeService.getSharedRuntime().getNotificationSink(),
onGatePendingApproval: (runId, gateId) =>
this.config.spaceRuntimeService.handleGatePendingApproval(runId, gateId),
});
const agentMessageRouter = new AgentMessageRouter({
nodeExecutionRepo: this.config.nodeExecutionRepo,
Expand Down
207 changes: 207 additions & 0 deletions packages/daemon/tests/unit/5-space/other/channel-router.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2594,5 +2594,212 @@ describe('ChannelRouter', () => {
const result = await router.deliverMessage(run.id, 'coder', 'planner', 'approved');
expect(result.fromRole).toBe('coder');
});

// -----------------------------------------------------------------------
// onGatePendingApproval callback
// -----------------------------------------------------------------------

describe('onGatePendingApproval callback', () => {
test('fires when gate data is written but gate is still not open (external approval gate)', async () => {
const gate: Gate = {
id: 'external-approval-gate',
fields: [
{
name: 'approved',
type: 'boolean',
writers: [], // empty writers = external/human-only approval
check: { op: '==', value: true },
},
],
resetOnCycle: false,
};
const channels: WorkflowChannel[] = [
{ id: 'ch-1', from: 'coder', to: 'planner', gateId: 'external-approval-gate' },
];
const workflow = buildWorkflowWithGates(
SPACE_ID,
workflowManager,
[
{
id: NODE_A,
name: 'Coder Node',
agents: [{ agentId: AGENT_CODER, name: 'coder' }],
},
{
id: NODE_B,
name: 'Planner Node',
agents: [{ agentId: AGENT_PLANNER, name: 'planner' }],
},
],
channels,
[gate]
);

const run = workflowRunRepo.createRun({
spaceId: SPACE_ID,
workflowId: workflow.id,
title: 'External Approval Pending Run',
});
workflowRunRepo.transitionStatus(run.id, 'in_progress');

const pendingCalls: Array<{ runId: string; gateId: string }> = [];
const routerWithCallback = new ChannelRouter({
taskRepo,
workflowRunRepo,
workflowManager,
agentManager,
gateDataRepo,
channelCycleRepo,
db,
nodeExecutionRepo: new NodeExecutionRepository(db),
onGatePendingApproval: async (rId, gId) => {
pendingCalls.push({ runId: rId, gateId: gId });
},
});

// Write gate data that does NOT satisfy the condition (approved not set to true)
gateDataRepo.set(run.id, 'external-approval-gate', {});

const activated = await routerWithCallback.onGateDataChanged(
run.id,
'external-approval-gate'
);
expect(activated).toHaveLength(0);
expect(pendingCalls).toHaveLength(1);
expect(pendingCalls[0]).toEqual({ runId: run.id, gateId: 'external-approval-gate' });
});

test('does NOT fire when gate opens (auto-approved)', async () => {
const gate: Gate = {
id: 'auto-gate',
fields: [
{
name: 'approved',
type: 'boolean',
writers: [], // external approval gate
check: { op: '==', value: true },
},
],
resetOnCycle: false,
};
const channels: WorkflowChannel[] = [
{ id: 'ch-1', from: 'coder', to: 'planner', gateId: 'auto-gate' },
];
const workflow = buildWorkflowWithGates(
SPACE_ID,
workflowManager,
[
{
id: NODE_A,
name: 'Coder Node',
agents: [{ agentId: AGENT_CODER, name: 'coder' }],
},
{
id: NODE_B,
name: 'Planner Node',
agents: [{ agentId: AGENT_PLANNER, name: 'planner' }],
},
],
channels,
[gate]
);

const run = workflowRunRepo.createRun({
spaceId: SPACE_ID,
workflowId: workflow.id,
title: 'Auto-Approved Gate Run',
});
workflowRunRepo.transitionStatus(run.id, 'in_progress');

const pendingCalls: Array<{ runId: string; gateId: string }> = [];
const routerWithCallback = new ChannelRouter({
taskRepo,
workflowRunRepo,
workflowManager,
agentManager,
gateDataRepo,
channelCycleRepo,
db,
nodeExecutionRepo: new NodeExecutionRepository(db),
onGatePendingApproval: async (rId, gId) => {
pendingCalls.push({ runId: rId, gateId: gId });
},
});

// Write gate data that satisfies the condition — gate opens
gateDataRepo.set(run.id, 'auto-gate', { approved: true });

const activated = await routerWithCallback.onGateDataChanged(run.id, 'auto-gate');
// Gate opens → node activated, callback NOT fired
expect(activated.length).toBeGreaterThan(0);
expect(pendingCalls).toHaveLength(0);
});

test('does NOT fire for non-external-approval gates (writers not empty)', async () => {
const gate: Gate = {
id: 'agent-approval-gate',
fields: [
{
name: 'approved',
type: 'boolean',
writers: ['planner'], // non-empty writers — agents can write
check: { op: '==', value: true },
},
],
resetOnCycle: false,
};
const channels: WorkflowChannel[] = [
{ id: 'ch-1', from: 'coder', to: 'planner', gateId: 'agent-approval-gate' },
];
const workflow = buildWorkflowWithGates(
SPACE_ID,
workflowManager,
[
{
id: NODE_A,
name: 'Coder Node',
agents: [{ agentId: AGENT_CODER, name: 'coder' }],
},
{
id: NODE_B,
name: 'Planner Node',
agents: [{ agentId: AGENT_PLANNER, name: 'planner' }],
},
],
channels,
[gate]
);

const run = workflowRunRepo.createRun({
spaceId: SPACE_ID,
workflowId: workflow.id,
title: 'Agent Approval Gate Run',
});
workflowRunRepo.transitionStatus(run.id, 'in_progress');

const pendingCalls: Array<{ runId: string; gateId: string }> = [];
const routerWithCallback = new ChannelRouter({
taskRepo,
workflowRunRepo,
workflowManager,
agentManager,
gateDataRepo,
channelCycleRepo,
db,
nodeExecutionRepo: new NodeExecutionRepository(db),
onGatePendingApproval: async (rId, gId) => {
pendingCalls.push({ runId: rId, gateId: gId });
},
});

// Write data that does NOT satisfy the condition (approved = false)
gateDataRepo.set(run.id, 'agent-approval-gate', { approved: false });

const activated = await routerWithCallback.onGateDataChanged(run.id, 'agent-approval-gate');
// Gate is closed but this is NOT an external-approval gate → callback not fired
expect(activated).toHaveLength(0);
expect(pendingCalls).toHaveLength(0);
});
});
});
});
6 changes: 5 additions & 1 deletion packages/web/src/components/space/PendingGateBanner.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,11 @@ export function PendingGateBanner({ runId, spaceId, workflowId }: PendingGateBan

const pendingGates: PendingGate[] = [];
for (const gate of gates) {
const data = gateDataMap.get(gate.id) ?? {};
// Only evaluate gates that have been activated (data written to gate_data table).
// An external-approval gate with no data has never been triggered — it's merely
// configured, so the banner must not show.
if (!gateDataMap.has(gate.id)) continue;
const data = gateDataMap.get(gate.id)!;
const scriptResult = parseScriptResult(data);
if (evaluateGateStatus(gate, data, scriptResult.failed) === 'waiting_human') {
pendingGates.push({ gateId: gate.id, data, label: gate.label ?? gate.description });
Expand Down
16 changes: 10 additions & 6 deletions packages/web/src/components/space/TaskStatusActions.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -90,13 +90,17 @@ export function TaskStatusActions({
pendingCheckpointType,
}: TaskStatusActionsProps) {
const allActions = getTransitionActions(status);
// When a task is paused at a completion action or a submit_for_approval
// checkpoint, hide the generic Approve (review → done) and Cancel (review →
// cancelled) buttons. The dedicated banner owns those transitions so it can
// disclose what the approval will actually run / send. Non-checkpoint
// transitions (e.g. Reopen → in_progress, Archive) stay visible.
// When a task is paused at a completion action, a submit_for_approval
// checkpoint, or a channel gate awaiting human approval, hide the generic
// Approve (review → done) and Cancel (review → cancelled) buttons. The
// dedicated banner owns those transitions. For gate-pending tasks the
// PendingGateBanner provides the Approve/Reject UX; bypassing it via the
// generic button would mark the task done without opening the gate.
// Non-checkpoint transitions (e.g. Reopen → in_progress, Archive) stay visible.
const actions =
pendingCheckpointType === 'completion_action' || pendingCheckpointType === 'task_completion'
pendingCheckpointType === 'completion_action' ||
pendingCheckpointType === 'task_completion' ||
pendingCheckpointType === 'gate'
? allActions.filter(({ target }) => target !== 'done' && target !== 'cancelled')
: allActions;

Expand Down
Loading