Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions docs/v2-spec.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
- WHEN multiple routing rules match the same agent for the same message THEN the system SHALL nudge it exactly once
- WHEN the nudge target is stopped AND has a session ID THEN the system SHALL resume the agent before delivering the message
- WHEN the nudge target is stopped AND has no session ID THEN the system SHALL spawn a new agent with the same configuration (kind, agent_type, channel, task_id, bound_thread_id) and deliver the nudge to it
- WHEN the nudge target is stopped AND a SpawnFailure cooldown is active for that agent THEN the system SHALL drop the nudge
- WHEN the nudge target is unknown THEN the system SHALL drop the nudge

---
Expand Down Expand Up @@ -104,6 +105,7 @@
- WHEN spawning fails THEN the system SHALL emit an AgentSpawnFailed event with the agent configuration and error reason
- WHEN a session is spawned THEN stdout/stderr SHALL be drained in a background task
- WHEN an agent produces assistant text on stdout THEN the system SHALL auto-post it to the agent's bound channel
- WHEN a session exits with an error (is_error: true) THEN the system SHALL suppress auto-posting the final output to the channel — errors are logged to daemon.log only
- WHEN an agent has no bound channel THEN stdout text SHALL be posted to the agent's DM channel
- WHEN multiple stdout events accumulate THEN the system SHALL flush and post at most every 2 seconds

Expand All @@ -122,6 +124,7 @@
- WHEN any agent (lead OR worker) dies within 60 seconds of starting THEN the system SHALL record a SpawnFailure cooldown
- WHEN a SpawnFailure cooldown is active for a worker's task THEN health checks and dispatch SHALL NOT re-spawn that worker until the cooldown expires (120 seconds)
- WHEN a SpawnFailure cooldown is active for a channel THEN lead spawning SHALL NOT occur until the cooldown expires
- WHEN a lead has failed 3 consecutive spawn attempts (died within 60s of start) THEN the system SHALL stop retrying AND post to the ops channel
- The cooldown key for leads is the channel name; the cooldown key for workers is the task ID

### 4.5 Garbage Collection
Expand Down
21 changes: 16 additions & 5 deletions src/daemon_v2/daemon.rs
Original file line number Diff line number Diff line change
Expand Up @@ -365,7 +365,6 @@ impl DaemonV2 {
channels_dir: self.config.channels_dir.clone(),
event_tx: self.event_tx.clone(),
command_tx: web_cmd_tx.clone(),
pending_auth_login: std::sync::Arc::new(tokio::sync::Mutex::new(None)),
repo_name: self.config.dir_key.clone(),
repo_full_name,
});
Expand Down Expand Up @@ -590,7 +589,8 @@ impl DaemonV2 {
.started_at
.is_some_and(|t| (chrono::Utc::now() - t).num_seconds() < 60);

let cooldown_key = match agent.kind {
let agent_kind = agent.kind.clone();
let cooldown_key = match agent_kind {
crate::daemon_v2::events::AgentKind::Lead => agent.channel.clone(),
crate::daemon_v2::events::AgentKind::Worker => agent.task_id.clone(),
_ => None,
Expand All @@ -599,7 +599,7 @@ impl DaemonV2 {
if died_quickly {
if let Some(ref key) = cooldown_key {
tracing::warn!(
%id, key = %key, kind = ?agent.kind,
%id, key = %key, kind = ?agent_kind,
"agent died within 60s of start — applying spawn cooldown"
);
proj.cooldowns.record(
Expand All @@ -613,8 +613,19 @@ impl DaemonV2 {
crate::daemon_v2::projections::cooldowns::CooldownCategory::SpawnFailure,
key,
);
if failures >= crate::daemon_v2::decisions::health::MAX_WORKER_RESTARTS {
let esc_key = format!("worker-escalation-{key}");
let max_restarts = match agent_kind {
crate::daemon_v2::events::AgentKind::Lead => {
crate::daemon_v2::decisions::health::MAX_LEAD_RESTARTS
}
_ => crate::daemon_v2::decisions::health::MAX_WORKER_RESTARTS,
};
if failures >= max_restarts {
let esc_key = match agent_kind {
crate::daemon_v2::events::AgentKind::Lead => {
format!("lead-escalation-{key}")
}
_ => format!("worker-escalation-{key}"),
};
proj.cooldowns.record(
crate::daemon_v2::projections::cooldowns::CooldownCategory::TaskNudge,
esc_key,
Expand Down
18 changes: 17 additions & 1 deletion src/daemon_v2/decisions/chat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,11 @@ fn route_refs(
}
} else {
// Topic channel @all: nudge channel lead + in-progress task agents
// in THIS channel only
// in THIS channel only.
//
// Workers live in dm-{name} channels (not the task channel), so
// by_channel alone misses them. We also scan all agents for those
// whose task belongs to this channel.
if let Some(agents) = proj.agents.by_channel.get(channel) {
for agent_id in agents {
if let Some(agent) = proj.agents.by_id.get(agent_id) {
Expand All @@ -195,6 +199,18 @@ fn route_refs(
}
}
}
// Also find workers whose task is in this channel but who are
// indexed under a different channel (dm-{name}).
for agent in proj.agents.by_id.values().filter(|a| !a.gc) {
if let Some(tid) = &agent.task_id {
let task_in_channel = proj.work.tasks.get(tid).is_some_and(|t| {
t.channel == channel && t.status == TaskStatus::InProgress
});
if task_in_channel {
nudge(agent, sender, nudge_msg, nudged, commands);
}
}
}
}
} else if target == "lead" {
nudge_channel_lead(proj, channel, sender, nudge_msg, nudged, commands);
Expand Down
62 changes: 62 additions & 0 deletions src/daemon_v2/decisions/chat_spec_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -778,3 +778,65 @@ fn stopped_fork_still_targeted_by_thread_reply() {
targets
);
}

/// Spec 1.2: @all in topic channel should find workers even when they are
/// indexed under their DM channel (dm-{name}), not the task's channel.
/// In production, dispatch assigns `channel: dm-{name}` to workers.
#[test]
fn at_all_in_topic_finds_workers_in_dm_channels() {
let mut proj = Projections::default();
let _main_lead_id = make_lead(&mut proj, "main-lead", "main");
let web_lead_id = make_channel_lead(&mut proj, "web-lead", "web");

// Create a worker with dm-channel (matching production behavior)
let worker_id = "worker-park".to_string();
proj.apply(&DomainEvent::AgentCreated {
id: worker_id.clone(),
name: "park".into(),
kind: AgentKind::Worker,
agent_type: "midtown-code-author".into(),
provider: Provider::ClaudeCode,
channel: Some("dm-park".into()), // Production uses dm-{name}
task_id: Some("t1".into()),
bound_thread_id: None,
icon: None,
color: None,
});
proj.apply(&DomainEvent::AgentStarted {
id: worker_id.clone(),
pid: 2000,
session_id: Some("sess-w".into()),
});
proj.apply(&DomainEvent::TaskCreated {
id: "t1".into(),
subject: "Fix bug".into(),
channel: "web".into(), // Task belongs to #web
blocked_by: vec![],
agent_type: None,
agent_name: None,
icon: None,
color: None,
parent: None,
thread_id: None,
message_id: None,
});
proj.apply(&DomainEvent::TaskAssigned {
task_id: "t1".into(),
agent_id: worker_id.clone(),
});

// @all in #web should still find the worker despite it being in dm-park
let cmds = route_message(&proj, "web", "user", "@all update please", None, None);
let targets = nudge_targets(&cmds);

assert!(
targets.contains(&web_lead_id),
"@all in topic should nudge topic lead"
);
assert!(
targets.contains(&worker_id),
"@all in topic should nudge worker whose task is in this channel, \
even though worker.channel is dm-park, got: {:?}",
targets
);
}
21 changes: 21 additions & 0 deletions src/daemon_v2/decisions/health.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ use crate::daemon_v2::projections::Projections;
use crate::daemon_v2::projections::cooldowns::CooldownCategory;

pub(crate) const MAX_WORKER_RESTARTS: usize = 3;
pub(crate) const MAX_LEAD_RESTARTS: usize = 3;

/// Find workers that are stopped but have in-progress tasks.
/// Per spec 2.2: resume the worker (not reset the task).
Expand Down Expand Up @@ -201,6 +202,26 @@ fn ensure_lead_for_channel(proj: &Projections, channel: &str, agent_type: &str)

let lead = proj.agents.channel_lead(channel);

// Spec 4.4: stop retrying after MAX_LEAD_RESTARTS consecutive failures
let failures = proj
.cooldowns
.failure_count(CooldownCategory::SpawnFailure, channel);
if failures >= MAX_LEAD_RESTARTS {
let esc_key = format!("lead-escalation-{channel}");
if !proj
.cooldowns
.is_active(CooldownCategory::TaskNudge, &esc_key)
{
return Some(Command::PostSystem {
channel: "ops".into(),
content: format!(
"Lead for #{channel} failed {failures} consecutive spawns. Manual intervention needed (check auth, run /login).",
),
});
}
return None;
}

match lead {
// Running lead — nothing to do
Some(agent) if proj.agents.running.contains(&agent.id) => None,
Expand Down
93 changes: 93 additions & 0 deletions src/daemon_v2/decisions/health_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -897,3 +897,96 @@ fn cleared_session_id_causes_fresh_spawn() {
commands,
);
}

/// Spec 4.4: WHEN a lead has failed 3 consecutive spawn attempts THEN stop
/// retrying AND post to ops channel (mirrors worker behavior from Spec 2.2).
#[test]
fn lead_gives_up_after_max_spawn_failures() {
let events = vec![
// Main channel lead (running — not the subject of this test)
DomainEvent::AgentCreated {
id: "lead-main".into(),
name: "main".into(),
kind: AgentKind::Lead,
agent_type: "midtown-project-lead".into(),
provider: Provider::ClaudeCode,
channel: Some("main".into()),
task_id: None,
bound_thread_id: None,
icon: None,
color: None,
},
DomainEvent::AgentStarted {
id: "lead-main".into(),
pid: 50,
session_id: Some("session-main".into()),
},
// daemon-core lead (stopped — subject of this test)
DomainEvent::AgentCreated {
id: "lead-1".into(),
name: "daemon-core".into(),
kind: AgentKind::Lead,
agent_type: "midtown-channel-lead".into(),
provider: Provider::ClaudeCode,
channel: Some("daemon-core".into()),
task_id: None,
bound_thread_id: None,
icon: None,
color: None,
},
DomainEvent::AgentStarted {
id: "lead-1".into(),
pid: 100,
session_id: Some("session-1".into()),
},
DomainEvent::AgentStopped {
id: "lead-1".into(),
reason: "process died".into(),
},
];

let mut proj = make_projections(&events);

// Ensure the channel exists in the projection
proj.apply(&DomainEvent::MessagePosted {
id: "msg-1".into(),
channel: "daemon-core".into(),
sender: "user".into(),
content: "hello".into(),
thread_id: None,
tool_data: None,
auto_output: false,
});

// Simulate 3 consecutive spawn failures
for _ in 0..super::MAX_LEAD_RESTARTS {
proj.cooldowns
.record(CooldownCategory::SpawnFailure, "daemon-core".to_string());
}

// Expire the cooldown timer so ensure_channel_leads_alive doesn't skip
proj.cooldowns
.expire_for_test(CooldownCategory::SpawnFailure, "daemon-core");

let commands = ensure_channel_leads_alive(&proj, "main");

// Should NOT try to resume/spawn — max failures reached
assert!(
!commands
.iter()
.any(|c| matches!(c, Command::ResumeAgent { .. } | Command::SpawnAgent(_))),
"should not respawn lead after {} failures, got: {:?}",
super::MAX_LEAD_RESTARTS,
commands,
);

// Should post to ops channel about the failure
assert!(
commands.iter().any(|c| matches!(c, Command::PostSystem {
channel,
..
} if channel == "ops")),
"should escalate to ops after max lead failures, got: {:?}",
commands,
);
}
93 changes: 93 additions & 0 deletions src/daemon_v2/executor/auto_output_tests.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
use super::*;
use crate::channel::Channel;
use crate::headless::StreamEvent;
use tempfile::TempDir;

/// Normal (non-error) assistant text should be posted.
#[test]
fn flush_auto_output_posts_normal_results() {
let dir = TempDir::new().unwrap();
let (event_tx, _rx) = tokio::sync::broadcast::channel(16);

let mut events = vec![
StreamEvent::Assistant {
message: serde_json::json!({
"content": [{"type": "text", "text": "Here is the status report."}]
}),
session_id: None,
extra: serde_json::Value::Null,
},
StreamEvent::Result {
subtype: "success".into(),
is_error: false,
result: None,
duration_ms: None,
total_cost_usd: None,
session_id: None,
usage: None,
extra: serde_json::json!({}),
},
];

flush_auto_output(
"daemon-core",
&Some("daemon-core".into()),
None,
dir.path(),
&mut events,
&event_tx,
);

let ch = Channel::new(dir.path(), "daemon-core").unwrap();
let msgs = ch.read_all().unwrap();
assert_eq!(msgs.len(), 1, "normal output should be posted");
}

/// Error suppression is handled by the drain loop (session_errored flag),
/// not by flush_auto_output. Verify flush_auto_output itself is a pure
/// "extract and post" function — it posts even when is_error is true,
/// because the drain loop is expected to skip the call entirely.
#[test]
fn flush_auto_output_does_not_filter_errors_itself() {
let dir = TempDir::new().unwrap();
let (event_tx, _rx) = tokio::sync::broadcast::channel(16);

let mut events = vec![
StreamEvent::Assistant {
message: serde_json::json!({
"content": [{"type": "text", "text": "Failed to authenticate. API Error: 401"}]
}),
session_id: None,
extra: serde_json::Value::Null,
},
StreamEvent::Result {
subtype: "error".into(),
is_error: true,
result: None,
duration_ms: None,
total_cost_usd: None,
session_id: None,
usage: None,
extra: serde_json::json!({}),
},
];

flush_auto_output(
"daemon-core",
&Some("daemon-core".into()),
None,
dir.path(),
&mut events,
&event_tx,
);

// flush_auto_output is now a pure post function — the drain loop
// is responsible for NOT calling it when session_errored is true.
let ch = Channel::new(dir.path(), "daemon-core").unwrap();
let msgs = ch.read_all().unwrap();
assert_eq!(
msgs.len(),
1,
"flush_auto_output posts unconditionally — drain loop gates the call"
);
}
Loading
Loading