Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions code-rs/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions code-rs/app-server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ code-app-server-protocol = { workspace = true }
code-protocol = { workspace = true }
code-utils-absolute-path = { workspace = true }
code-utils-json-to-toml = { workspace = true }
chrono = { workspace = true }
futures = { workspace = true }
# We should only be using mcp-types for JSON-RPC types: it would be nice to
# split this out into a separate crate at some point.
Expand Down
272 changes: 272 additions & 0 deletions code-rs/app-server/src/code_message_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@ use code_app_server_protocol::GetAccountResponse;
use code_app_server_protocol::LoginAccountParams;
use code_app_server_protocol::LoginAccountResponse;
use code_app_server_protocol::LogoutAccountResponse;
use code_app_server_protocol::Thread;
use code_app_server_protocol::ThreadResumeParams;
use code_app_server_protocol::ThreadResumeResponse;
use code_app_server_protocol::ToolRequestUserInputOption;
use code_app_server_protocol::ToolRequestUserInputParams;
use code_app_server_protocol::ToolRequestUserInputQuestion;
Expand All @@ -21,6 +24,7 @@ use code_core::CodexConversation;
use code_core::ConversationManager;
use code_core::NewConversation;
use code_core::RolloutRecorder;
use code_core::SessionCatalog;
use code_core::Cursor;
use code_core::config::Config;
use code_core::config::ConfigOverrides;
Expand Down Expand Up @@ -49,6 +53,7 @@ use tokio::time::Duration;
use tokio::time::timeout;
use tracing::error;
use uuid::Uuid;
use chrono::DateTime;

use crate::error_code::INTERNAL_ERROR_CODE;
use crate::error_code::INVALID_REQUEST_ERROR_CODE;
Expand Down Expand Up @@ -145,6 +150,7 @@ pub struct CodexMessageProcessor {
active_login: Arc<Mutex<Option<ActiveLogin>>>,
// Queue of pending interrupt requests per conversation. We reply when TurnAborted arrives.
pending_interrupts: Arc<Mutex<HashMap<Uuid, Vec<RequestId>>>>,
resumed_conversation_aliases: Arc<Mutex<HashMap<ConversationId, ConversationId>>>,
#[allow(dead_code)]
pending_fuzzy_searches: Arc<Mutex<HashMap<String, Arc<AtomicBool>>>>,
}
Expand All @@ -166,10 +172,23 @@ impl CodexMessageProcessor {
conversation_listeners: HashMap::new(),
active_login: Arc::new(Mutex::new(None)),
pending_interrupts: Arc::new(Mutex::new(HashMap::new())),
resumed_conversation_aliases: Arc::new(Mutex::new(HashMap::new())),
pending_fuzzy_searches: Arc::new(Mutex::new(HashMap::new())),
}
}

async fn resolve_conversation_id_alias(
&self,
conversation_id: ConversationId,
) -> ConversationId {
self.resumed_conversation_aliases
.lock()
.await
.get(&conversation_id)
.copied()
.unwrap_or(conversation_id)
}

pub async fn process_request(&mut self, request: ClientRequest) {
self.process_request_for_connection(ConnectionId(0), request)
.await;
Expand Down Expand Up @@ -589,6 +608,7 @@ impl CodexMessageProcessor {
conversation_id,
items,
} = params;
let conversation_id = self.resolve_conversation_id_alias(conversation_id).await;
let Ok(conversation) = self
.conversation_manager
.get_conversation(conversation_id)
Expand Down Expand Up @@ -682,6 +702,7 @@ impl CodexMessageProcessor {
params: InterruptConversationParams,
) {
let InterruptConversationParams { conversation_id } = params;
let conversation_id = self.resolve_conversation_id_alias(conversation_id).await;
let Ok(conversation) = self
.conversation_manager
.get_conversation(conversation_id)
Expand Down Expand Up @@ -709,6 +730,7 @@ impl CodexMessageProcessor {
params: AddConversationListenerParams,
) {
let AddConversationListenerParams { conversation_id } = params;
let conversation_id = self.resolve_conversation_id_alias(conversation_id).await;
let Ok(conversation) = self
.conversation_manager
.get_conversation(conversation_id)
Expand Down Expand Up @@ -954,6 +976,133 @@ impl CodexMessageProcessor {
}
}

pub(crate) async fn thread_resume_v2(
&self,
request_id: RequestId,
params: ThreadResumeParams,
) {
let unsupported_history = params.history.is_some();
let thread_id = params.thread_id.clone();
let catalog = SessionCatalog::new(self.config.code_home.clone());

let catalog_entry = match catalog.find_by_id(&thread_id).await {
Ok(entry) => entry,
Err(err) => {
let error = JSONRPCErrorError {
code: INTERNAL_ERROR_CODE,
message: format!("failed to resolve thread: {err}"),
data: None,
};
self.outgoing.send_error(request_id, error).await;
return;
}
};

let rollout_path = match catalog_entry
.as_ref()
.map(|entry| catalog.entry_rollout_path(entry))
.or_else(|| params.path.clone())
Comment thread
cbusillo marked this conversation as resolved.
Outdated
{
Some(path) => path,
None => {
let message = if unsupported_history {
"thread/resume.history is not supported by the Every Code app-server without a rollout path"
} else {
"thread not found"
};
let error = JSONRPCErrorError {
code: INVALID_REQUEST_ERROR_CODE,
message: message.to_string(),
data: None,
};
self.outgoing.send_error(request_id, error).await;
return;
}
};

let overrides = NewConversationParams {
model: params.model.clone(),
profile: None,
cwd: params.cwd.clone(),
Comment thread
cbusillo marked this conversation as resolved.
approval_policy: params
.approval_policy
.clone()
.map(|approval_policy| approval_policy.to_core()),
sandbox: params.sandbox.map(|sandbox| sandbox.to_core()),
config: params.config.clone(),
base_instructions: params.base_instructions.clone(),
include_plan_tool: None,
dynamic_tools: None,
include_apply_patch_tool: None,
};
let config = match derive_config_from_params(overrides, self.code_linux_sandbox_exe.clone()) {
Ok(config) => config,
Err(err) => {
let error = JSONRPCErrorError {
code: INVALID_REQUEST_ERROR_CODE,
message: format!("error deriving config: {err}"),
data: None,
};
self.outgoing.send_error(request_id, error).await;
return;
}
};

match self
.conversation_manager
.resume_conversation_from_rollout(
config.clone(),
rollout_path.clone(),
Arc::clone(&self.auth_manager),
)
.await
{
Ok(NewConversation {
conversation_id,
session_configured: _,
..
}) => {
let thread = thread_resume_response_thread(
&thread_id,
catalog_entry.as_ref(),
&config,
rollout_path,
);
self.outgoing
.send_response(
request_id,
ThreadResumeResponse {
thread,
model: config.model.clone(),
model_provider: config.model_provider_id.clone(),
cwd: config.cwd.clone(),
approval_policy: map_ask_for_approval_to_wire(config.approval_policy).into(),
sandbox: map_sandbox_policy_to_wire(config.sandbox_policy.clone()).into(),
reasoning_effort: Some(config.model_reasoning_effort.into()),
},
)
.await;

if let Ok(requested_conversation_id) = ConversationId::from_string(&thread_id)
&& requested_conversation_id != conversation_id
{
self.resumed_conversation_aliases
.lock()
.await
.insert(requested_conversation_id, conversation_id);
}
}
Err(err) => {
let error = JSONRPCErrorError {
code: INTERNAL_ERROR_CODE,
message: format!("error resuming thread: {err}"),
data: None,
};
self.outgoing.send_error(request_id, error).await;
}
}
}

async fn archive_conversation(
&self,
request_id: RequestId,
Expand All @@ -963,6 +1112,7 @@ impl CodexMessageProcessor {
conversation_id,
rollout_path,
} = params;
let conversation_id = self.resolve_conversation_id_alias(conversation_id).await;

if self
.conversation_manager
Expand Down Expand Up @@ -1413,6 +1563,52 @@ impl CodexMessageProcessor {
}
}

fn parse_rfc3339_timestamp_seconds(value: &str) -> i64 {
DateTime::parse_from_rfc3339(value)
.map(|timestamp| timestamp.timestamp())
.unwrap_or_default()
}

fn thread_resume_response_thread(
thread_id: &str,
entry: Option<&code_core::SessionIndexEntry>,
config: &Config,
rollout_path: PathBuf,
) -> Thread {
let created_at = entry
.map(|item| parse_rfc3339_timestamp_seconds(&item.created_at))
.unwrap_or_default();
let updated_at = entry
.map(|item| parse_rfc3339_timestamp_seconds(&item.last_event_at))
.unwrap_or(created_at);

Thread {
id: thread_id.to_string(),
Comment thread
cbusillo marked this conversation as resolved.
Outdated
preview: entry
.and_then(|item| item.last_user_snippet.clone())
.unwrap_or_default(),
model_provider: entry
.and_then(|item| item.model_provider.clone())
.unwrap_or_else(|| config.model_provider_id.clone()),
created_at,
updated_at,
path: Some(rollout_path),
cwd: entry
.map(|item| item.cwd_real.clone())
.unwrap_or_else(|| config.cwd.clone()),
cli_version: env!("CARGO_PKG_VERSION").to_string(),
source: entry
.map(|item| item.session_source.clone().into())
.unwrap_or(code_app_server_protocol::SessionSource::AppServer),
git_info: entry.map(|item| code_app_server_protocol::GitInfo {
sha: None,
branch: item.git_branch.clone(),
origin_url: None,
}),
turns: Vec::new(),
}
}

impl CodexMessageProcessor {
// Minimal compatibility layer: translate SendUserTurn into our current
// flow by submitting only the user items. We intentionally do not attempt
Expand All @@ -1429,6 +1625,7 @@ impl CodexMessageProcessor {
items,
..
} = params;
let conversation_id = self.resolve_conversation_id_alias(conversation_id).await;

let Ok(conversation) = self
.conversation_manager
Expand Down Expand Up @@ -1923,6 +2120,7 @@ mod tests {
use code_core::auth::CodexAuth;
use code_core::auth::RefreshTokenError;
use code_core::config::ConfigOverrides;
use code_core::SessionIndexEntry;
use code_protocol::mcp_protocol::RemoveConversationListenerParams;
use code_protocol::protocol::SessionSource;
use mcp_types::RequestId;
Expand Down Expand Up @@ -2179,6 +2377,51 @@ mod tests {
vec!["selected".to_string()]
);
}

#[test]
fn thread_resume_response_thread_uses_catalog_metadata() {
let config =
Config::load_with_cli_overrides(Vec::new(), ConfigOverrides::default())
.expect("load default config");
let entry = SessionIndexEntry {
session_id: Uuid::new_v4(),
rollout_path: std::path::PathBuf::from("sessions/test.jsonl"),
snapshot_path: None,
created_at: "2026-04-03T10:00:00.000Z".to_string(),
last_event_at: "2026-04-03T10:05:00.000Z".to_string(),
cwd_real: std::path::PathBuf::from("/tmp/test-thread"),
cwd_display: "/tmp/test-thread".to_string(),
git_project_root: None,
git_branch: Some("main".to_string()),
model_provider: Some("openai".to_string()),
session_source: SessionSource::Mcp,
message_count: 3,
user_message_count: 1,
last_user_snippet: Some("resume me".to_string()),
nickname: None,
sync_origin_device: None,
sync_version: 0,
archived: false,
deleted: false,
};

let thread = thread_resume_response_thread(
&entry.session_id.to_string(),
Some(&entry),
&config,
std::path::PathBuf::from("/tmp/test.jsonl"),
);

assert_eq!(thread.id, entry.session_id.to_string());
assert_eq!(thread.preview, "resume me");
assert_eq!(thread.model_provider, "openai");
assert_eq!(thread.cwd, std::path::PathBuf::from("/tmp/test-thread"));
assert_eq!(thread.path, Some(std::path::PathBuf::from("/tmp/test.jsonl")));
assert_eq!(thread.source, code_app_server_protocol::SessionSource::AppServer);
assert_eq!(thread.git_info.and_then(|info| info.branch), Some("main".to_string()));
assert_eq!(thread.created_at, 1_775_210_400);
assert_eq!(thread.updated_at, 1_775_210_700);
}
}

impl IntoWireAuthMode for code_protocol::mcp_protocol::AuthMode {
Expand Down Expand Up @@ -2227,6 +2470,35 @@ fn map_ask_for_approval_to_wire(a: core_protocol::AskForApproval) -> code_protoc
}
}

fn map_sandbox_policy_to_wire(
policy: core_protocol::SandboxPolicy,
) -> code_protocol::protocol::SandboxPolicy {
match policy {
core_protocol::SandboxPolicy::DangerFullAccess => {
code_protocol::protocol::SandboxPolicy::DangerFullAccess
}
core_protocol::SandboxPolicy::ReadOnly => code_protocol::protocol::SandboxPolicy::ReadOnly,
core_protocol::SandboxPolicy::WorkspaceWrite {
writable_roots,
network_access,
exclude_tmpdir_env_var,
exclude_slash_tmp,
allow_git_writes,
} => code_protocol::protocol::SandboxPolicy::WorkspaceWrite {
writable_roots: writable_roots
.into_iter()
.filter_map(|path| {
code_utils_absolute_path::AbsolutePathBuf::from_absolute_path(path).ok()
})
.collect(),
network_access,
exclude_tmpdir_env_var,
exclude_slash_tmp,
allow_git_writes,
},
}
}

fn map_reasoning_effort_to_wire(
effort: code_core::config_types::ReasoningEffort,
) -> code_protocol::config_types::ReasoningEffort {
Expand Down
Loading