Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
288 changes: 269 additions & 19 deletions crates/goose/src/agents/agent.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::collections::HashMap;
use std::fmt;
use std::future::Future;
use std::io::Write;
use std::pin::Pin;
use std::sync::Arc;

Expand Down Expand Up @@ -62,6 +63,181 @@ use tracing::{debug, error, info, instrument, warn};
const DEFAULT_MAX_TURNS: u32 = 1000;
const COMPACTION_THINKING_TEXT: &str = "goose is compacting the conversation...";

/// Derive a one-sentence reason for the current LLM call based on conversation state.
fn derive_call_reason(turns_taken: u32, conversation: &Conversation) -> String {
if turns_taken == 1 {
// Find the last user text message
let user_text = conversation
.messages()
.iter()
.rev()
.find(|m| m.role == rmcp::model::Role::User)
.and_then(|m| {
m.content.iter().find_map(|c| {
if let MessageContent::Text(t) = c {
Some(t.text.clone())
} else {
None
}
})
})
.unwrap_or_default();
let truncated = if user_text.chars().count() > 100 {
let prefix: String = user_text.chars().take(100).collect();
format!("{}...", prefix)
} else {
user_text
};
format!(
"Responding to user message: \"{}\"",
truncated.replace('\n', " ")
)
} else {
// Find tool names from the most recent assistant tool-request messages
let tool_names: Vec<String> = conversation
.messages()
.iter()
.rev()
.take_while(|m| m.role == rmcp::model::Role::Assistant)
.flat_map(|m| {
m.content.iter().filter_map(|c| {
if let MessageContent::ToolRequest(req) = c {
req.tool_call.as_ref().ok().map(|tc| tc.name.to_string())
} else {
None
}
})
})
.collect();
if tool_names.is_empty() {
"Continuing after context compaction or recovery".to_string()
} else {
format!(
"Processing results from tool call(s): {}",
tool_names.join(", ")
)
}
}
}

fn llm_log_path() -> std::path::PathBuf {
crate::config::paths::Paths::data_dir().join("llm_calls.log")
}

fn open_log_file() -> Option<std::fs::File> {
let path = llm_log_path();
if let Some(parent) = path.parent() {
let _ = std::fs::create_dir_all(parent);
}
std::fs::OpenOptions::new()
.create(true)
.append(true)
.open(&path)
.ok()
}

/// Format a slice of messages into human-readable text for logging.
fn format_messages_for_log(messages: &[Message]) -> String {
let mut out = String::new();
for msg in messages {
let role = match msg.role {
rmcp::model::Role::User => "user",
rmcp::model::Role::Assistant => "assistant",
};
for content in &msg.content {
match content {
MessageContent::Text(t) => {
out.push_str(&format!("[{role}] {}\n", t.text));
}
MessageContent::ToolRequest(r) => {
out.push_str(&format!("[{role}→tool] {}\n", r.to_readable_string()));
}
MessageContent::ToolResponse(r) => {
let body = match &r.tool_result {
Ok(res) => {
let text = res
.content
.iter()
.map(|c| format!("{c:?}"))
.collect::<Vec<_>>()
.join("\n");
if text.chars().count() > 2000 {
let prefix: String = text.chars().take(2000).collect();
let remaining = text.len() - prefix.len();
format!("{}...[{} chars truncated]", prefix, remaining)
} else {
text
}
}
Err(e) => format!("error: {e}"),
};
out.push_str(&format!("[tool_result] {body}\n"));
}
other => {
out.push_str(&format!("[{role}:other] {other}\n"));
}
}
}
}
out
}

/// Write the request half of an LLM call to the log file, then open the response section.
/// Streaming tokens will be written directly after this via `log_response_token`.
fn log_llm_call_request(
session_id: &str,
turn: u32,
reason: &str,
system_prompt: &str,
messages: &[Message],
) {
let Some(mut file) = open_log_file() else {
return;
};
let now = chrono::Utc::now().format("%Y-%m-%dT%H:%M:%SZ");
let sep = "=".repeat(72);
let _ = writeln!(file, "\n{sep}");
let _ = writeln!(file, "TIME: {now}");
let _ = writeln!(file, "SESSION: {session_id} TURN: {turn}");
let _ = writeln!(file, "REASON: {reason}");
let _ = writeln!(file, "\n--- SYSTEM PROMPT ---");
let _ = writeln!(file, "{system_prompt}");
let _ = writeln!(file, "\n--- MESSAGES ({}) ---", messages.len());
let _ = write!(file, "{}", format_messages_for_log(messages));
// Leave the file open at the start of the response section so tokens stream in.
let _ = writeln!(file, "\n--- RESPONSE (streaming) ---");
let _ = write!(file, "[assistant] ");
}

/// Write a streaming text token directly to the log file (no newline — tokens flow inline).
fn log_response_token(text: &str) {
if text.is_empty() {
return;
}
let Some(mut file) = open_log_file() else {
return;
};
let _ = write!(file, "{text}");
}

/// Write a tool request that arrived mid-stream to the log file.
fn log_response_tool_request(r: &str) {
let Some(mut file) = open_log_file() else {
return;
};
let _ = writeln!(file, "\n[assistant→tool] {r}");
}

/// Write the timing footer after the stream finishes.
fn log_response_end(duration: std::time::Duration) {
let Some(mut file) = open_log_file() else {
return;
};
let sep = "=".repeat(72);
let _ = writeln!(file, "\n--- COMPLETE ({}ms) ---", duration.as_millis());
let _ = writeln!(file, "{sep}");
}

/// Context needed for the reply function
pub struct ReplyContext {
pub conversation: Conversation,
Expand Down Expand Up @@ -1161,6 +1337,16 @@ impl Agent {
&working_dir,
).await;

let call_reason = derive_call_reason(turns_taken, &conversation);
log_llm_call_request(
&session_config.id,
turns_taken,
&call_reason,
&system_prompt,
conversation_with_moim.messages(),
);
let call_start = std::time::Instant::now();

let mut stream = Self::stream_response_from_provider(
self.provider().await?,
&session_config.id,
Expand Down Expand Up @@ -1210,6 +1396,14 @@ impl Agent {
}

if let Some(response) = response {
// Stream tokens and tool requests to the log file as they arrive.
for content in &response.content {
match content {
MessageContent::Text(t) => log_response_token(&t.text),
MessageContent::ToolRequest(r) => log_response_tool_request(&r.to_readable_string()),
_ => {}
}
}
let ToolCategorizeResult {
frontend_requests,
remaining_requests,
Expand Down Expand Up @@ -1482,12 +1676,12 @@ impl Agent {

if compaction_attempts >= 2 {
error!("Context limit exceeded after compaction - prompt too large");
yield AgentEvent::Message(
Message::assistant().with_system_notification(
SystemNotificationType::InlineMessage,
"Unable to continue: Context limit still exceeded after compaction. Try using a shorter message, a model with a larger context window, or start a new session."
)
let msg = Message::assistant().with_system_notification(
SystemNotificationType::InlineMessage,
"Unable to continue: Context limit still exceeded after compaction. Try using a shorter message, a model with a larger context window, or start a new session."
);
messages_to_add.push(msg.clone());
yield AgentEvent::Message(msg);
break;
}

Expand Down Expand Up @@ -1541,37 +1735,38 @@ impl Agent {
"top_up_url": top_up_url,
});

yield AgentEvent::Message(
Message::assistant().with_system_notification_with_data(
SystemNotificationType::CreditsExhausted,
user_msg,
notification_data,
)
let msg = Message::assistant().with_system_notification_with_data(
SystemNotificationType::CreditsExhausted,
user_msg,
notification_data,
);
messages_to_add.push(msg.clone());
yield AgentEvent::Message(msg);
break;
}
Err(ref provider_err @ ProviderError::NetworkError(_)) => {
crate::posthog::emit_error(provider_err.telemetry_type(), &provider_err.to_string());
error!("Error: {}", provider_err);
yield AgentEvent::Message(
Message::assistant().with_text(
format!("{provider_err}\n\nPlease resend your message to try again.")
)
let msg = Message::assistant().with_text(
format!("{provider_err}\n\nPlease resend your message to try again.")
);
messages_to_add.push(msg.clone());
yield AgentEvent::Message(msg);
break;
}
Err(ref provider_err) => {
crate::posthog::emit_error(provider_err.telemetry_type(), &provider_err.to_string());
error!("Error: {}", provider_err);
yield AgentEvent::Message(
Message::assistant().with_text(
format!("Ran into this error: {provider_err}.\n\nPlease retry if you think this is a transient or recoverable error.")
)
let msg = Message::assistant().with_text(
format!("Ran into this error: {provider_err}.\n\nPlease retry if you think this is a transient or recoverable error.")
);
messages_to_add.push(msg.clone());
yield AgentEvent::Message(msg);
break;
}
}
}
log_response_end(call_start.elapsed());
if tools_updated {
(tools, toolshim_tools, system_prompt) =
self.prepare_tools_and_prompt(&session_config.id, &session.working_dir).await?;
Expand Down Expand Up @@ -2154,6 +2349,61 @@ mod tests {
assert_eq!(id, "any");
}

// --- derive_call_reason / format_messages_for_log UTF-8 safety ---

#[test]
fn test_derive_call_reason_ascii_truncation() {
let long_ascii = "a".repeat(200);
let conv = Conversation::new_unvalidated([Message::user().with_text(&long_ascii)]);
let result = derive_call_reason(1, &conv);
// Should contain "..." and not panic
assert!(result.contains("..."));
}

#[test]
fn test_derive_call_reason_multibyte_boundary() {
// Each emoji is 4 bytes; 26 emojis = 104 bytes, crossing the old byte-100 boundary
let emoji_text = "🎉".repeat(26);
let conv = Conversation::new_unvalidated([Message::user().with_text(&emoji_text)]);
// Must not panic
let result = derive_call_reason(1, &conv);
assert!(result.contains("..."));
}

#[test]
fn test_derive_call_reason_short_message() {
let conv = Conversation::new_unvalidated([Message::user().with_text("hello")]);
let result = derive_call_reason(1, &conv);
assert!(result.contains("hello"));
assert!(!result.contains("..."));
}

#[test]
fn test_format_messages_for_log_multibyte_boundary() {
use crate::conversation::message::MessageContent;
use rmcp::model::CallToolResult;

// Build a tool-response message whose content body crosses the old char-2000 boundary.
// Each "日" is 3 bytes; 700 of them = 2100 bytes. The debug representation will exceed
// 2000 chars, exercising the truncation path.
let long_body = "日".repeat(700);
let tool_result: CallToolResult = serde_json::from_value(serde_json::json!({
"content": [{"type": "text", "text": long_body}],
"isError": false
}))
.expect("valid CallToolResult JSON");
let response_content =
MessageContent::ToolResponse(crate::conversation::message::ToolResponse {
id: "test-id".to_string(),
tool_result: Ok(tool_result),
metadata: None,
});
let msg = Message::new(rmcp::model::Role::User, 0, vec![response_content]);
// Must not panic
let log = format_messages_for_log(&[msg]);
assert!(log.contains("truncated"));
}

#[tokio::test]
async fn test_add_final_output_tool() -> Result<()> {
let agent = Agent::new();
Expand Down
15 changes: 12 additions & 3 deletions crates/goose/src/conversation/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -406,9 +406,18 @@ pub fn merge_consecutive_messages(messages: Vec<Message>) -> (Vec<Message>, Vec<
if let Some(last) = merged_messages.last_mut() {
let effective = effective_role(&message);
if effective_role(last) == effective {
last.content.extend(message.content);
issues.push(format!("Merged consecutive {} messages", effective));
continue;
// Don't merge if either message contains tool responses — they must stay as
// separate messages so providers can correlate them with the tool requests.
let has_tool_response = |msg: &Message| {
msg.content
.iter()
.any(|c| matches!(c, MessageContent::ToolResponse(_)))
};
if !has_tool_response(last) && !has_tool_response(&message) {
last.content.extend(message.content);
issues.push(format!("Merged consecutive {} messages", effective));
continue;
}
}
}
merged_messages.push(message);
Expand Down
Loading