Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 53 additions & 0 deletions pkg/agent/agent_message.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,21 @@ func (al *AgentLoop) ProcessDirect(
func (al *AgentLoop) ProcessDirectWithChannel(
ctx context.Context,
content, sessionKey, channel, chatID string,
) (string, error) {
return al.processDirectWithChannel(ctx, content, sessionKey, channel, chatID, false)
}

func (al *AgentLoop) ProcessScheduledWithChannel(
ctx context.Context,
content, sessionKey, channel, chatID string,
) (string, error) {
return al.processDirectWithChannel(ctx, content, sessionKey, channel, chatID, true)
}

func (al *AgentLoop) processDirectWithChannel(
ctx context.Context,
content, sessionKey, channel, chatID string,
scheduled bool,
) (string, error) {
if err := al.ensureHooksInitialized(ctx); err != nil {
return "", err
Expand All @@ -61,10 +76,48 @@ func (al *AgentLoop) ProcessDirectWithChannel(
Content: content,
SessionKey: sessionKey,
}
if scheduled {
return al.processScheduledMessage(ctx, msg)
}

return al.processMessage(ctx, msg)
}

func (al *AgentLoop) processScheduledMessage(ctx context.Context, msg bus.InboundMessage) (string, error) {
msg = bus.NormalizeInboundMessage(msg)
route, agent, routeErr := al.resolveMessageRoute(msg)
if routeErr != nil {
return "", routeErr
}
allocation := al.allocateRouteSession(route, msg)
sessionKey := resolveScopeKey(allocation.SessionKey, msg.SessionKey)

if tool, ok := agent.Tools.Get("message"); ok {
if resetter, ok := tool.(interface{ ResetSentInRound(sessionKey string) }); ok {
resetter.ResetSentInRound(sessionKey)
}
}

return al.runAgentLoop(ctx, agent, processOptions{
Dispatch: DispatchRequest{
SessionKey: sessionKey,
SessionAliases: buildSessionAliases(sessionKey, append(allocation.SessionAliases, msg.SessionKey)...),
InboundContext: cloneInboundContext(&msg.Context),
RouteResult: cloneResolvedRoute(&route),
SessionScope: session.CloneScope(&allocation.Scope),
UserMessage: msg.Content,
Media: append([]string(nil), msg.Media...),
},
SenderID: msg.SenderID,
SenderDisplayName: msg.Sender.DisplayName,
DefaultResponse: defaultResponse,
EnableSummary: false,
SendResponse: false,
SuppressToolFeedback: true,
NoHistory: true,
})
}

func (al *AgentLoop) ProcessHeartbeat(
ctx context.Context,
content, channel, chatID string,
Expand Down
52 changes: 52 additions & 0 deletions pkg/agent/agent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4002,6 +4002,58 @@ func TestProcessHeartbeat_DoesNotPublishToolFeedback(t *testing.T) {
}
}

func TestProcessScheduledWithChannel_DoesNotPublishToolFeedback(t *testing.T) {
tmpDir := t.TempDir()
heartbeatFile := filepath.Join(tmpDir, "scheduled-task.txt")
if err := os.WriteFile(heartbeatFile, []byte("scheduled task"), 0o644); err != nil {
t.Fatalf("WriteFile() error = %v", err)
}

cfg := &config.Config{
Agents: config.AgentsConfig{
Defaults: config.AgentDefaults{
Workspace: tmpDir,
ModelName: "test-model",
MaxTokens: 4096,
MaxToolIterations: 10,
ToolFeedback: config.ToolFeedbackConfig{
Enabled: true,
MaxArgsLength: 300,
},
},
},
Tools: config.ToolsConfig{
ReadFile: config.ReadFileToolConfig{
Enabled: true,
},
},
}

msgBus := bus.NewMessageBus()
provider := &toolFeedbackProvider{filePath: heartbeatFile}
al := NewAgentLoop(cfg, msgBus, provider)

response, err := al.ProcessScheduledWithChannel(
context.Background(),
"run scheduled task",
"agent:cron-test",
"telegram",
"chat-1",
)
if err != nil {
t.Fatalf("ProcessScheduledWithChannel() error = %v", err)
}
if response != "HEARTBEAT_OK" {
t.Fatalf("ProcessScheduledWithChannel() response = %q, want %q", response, "HEARTBEAT_OK")
}

select {
case outbound := <-msgBus.OutboundChan():
t.Fatalf("expected no outbound tool feedback during scheduled turn, got %+v", outbound)
case <-time.After(200 * time.Millisecond):
}
}

func TestProcessMessage_PublishesToolFeedbackWhenEnabled(t *testing.T) {
tmpDir := t.TempDir()
heartbeatFile := filepath.Join(tmpDir, "tool-feedback.txt")
Expand Down
34 changes: 26 additions & 8 deletions pkg/tools/cron.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@ type JobExecutor interface {
PublishResponseIfNeeded(ctx context.Context, channel, chatID, sessionKey, response string)
}

type scheduledJobExecutor interface {
ProcessScheduledWithChannel(ctx context.Context, content, sessionKey, channel, chatID string) (string, error)
}

// CronTool provides scheduling capabilities for the agent
type CronTool struct {
cronService *cron.CronService
Expand Down Expand Up @@ -344,14 +348,28 @@ func (t *CronTool) ExecuteJob(ctx context.Context, job *cron.CronJob) string {

sessionKey := fmt.Sprintf("agent:cron-%s-%s", job.ID, uuid.New().String())

// Call agent with the job message
response, err := t.executor.ProcessDirectWithChannel(
ctx,
job.Payload.Message,
sessionKey,
channel,
chatID,
)
// Call agent with the job message. Scheduled agent turns should not emit
// interactive progress/tool-feedback messages; they should only publish a
// final response when the job has something actionable to say.
var response string
var err error
if scheduledExecutor, ok := t.executor.(scheduledJobExecutor); ok {
response, err = scheduledExecutor.ProcessScheduledWithChannel(
ctx,
job.Payload.Message,
sessionKey,
channel,
chatID,
)
} else {
response, err = t.executor.ProcessDirectWithChannel(
ctx,
job.Payload.Message,
sessionKey,
channel,
chatID,
)
}
if err != nil {
return fmt.Sprintf("Error: %v", err)
}
Expand Down
16 changes: 16 additions & 0 deletions pkg/tools/cron_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ type stubJobExecutor struct {
lastKey string
lastChan string
lastChatID string
scheduledUsed bool
publishedResp string
publishedChan string
publishedChatID string
Expand All @@ -38,6 +39,18 @@ func (s *stubJobExecutor) ProcessDirectWithChannel(
return s.response, s.err
}

func (s *stubJobExecutor) ProcessScheduledWithChannel(
_ context.Context,
content, sessionKey, channel, chatID string,
) (string, error) {
s.scheduledUsed = true
s.lastPrompt = content
s.lastKey = sessionKey
s.lastChan = channel
s.lastChatID = chatID
return s.response, s.err
}

func (s *stubJobExecutor) PublishResponseIfNeeded(
_ context.Context,
channel, chatID, sessionKey, response string,
Expand Down Expand Up @@ -282,6 +295,9 @@ func TestCronTool_ExecuteJobPublishesAgentResponse(t *testing.T) {
if executor.lastPrompt != "send me a poem" {
t.Fatalf("prompt = %q, want original message", executor.lastPrompt)
}
if !executor.scheduledUsed {
t.Fatal("expected cron agent job to use scheduled executor path")
}
if executor.publishedResp != "generated reply" {
t.Fatalf("published response = %q, want generated reply", executor.publishedResp)
}
Expand Down