diff --git a/Cargo.lock b/Cargo.lock index 9faa50d..fdb72f2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -592,6 +592,7 @@ dependencies = [ name = "embers-core" version = "0.1.0" dependencies = [ + "serde", "thiserror 2.0.18", "tracing", "tracing-subscriber", @@ -616,6 +617,8 @@ dependencies = [ "embers-protocol", "portable-pty", "proptest", + "serde", + "serde_json", "tempfile", "tokio", "tracing", diff --git a/Cargo.toml b/Cargo.toml index bd2010e..79f1637 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -29,6 +29,8 @@ predicates = "3" proptest = "1" rhai = "1" rhai-autodocs = "0.11" +serde = { version = "1", features = ["derive"] } +serde_json = "1" tempfile = "3" thiserror = "2" tokio = { version = "1", features = ["fs", "io-util", "macros", "net", "process", "rt-multi-thread", "signal", "sync", "time"] } diff --git a/crates/embers-core/Cargo.toml b/crates/embers-core/Cargo.toml index 705813f..26b475d 100644 --- a/crates/embers-core/Cargo.toml +++ b/crates/embers-core/Cargo.toml @@ -6,6 +6,7 @@ rust-version.workspace = true version.workspace = true [dependencies] +serde.workspace = true thiserror.workspace = true tracing.workspace = true tracing-subscriber.workspace = true diff --git a/crates/embers-core/src/geometry.rs b/crates/embers-core/src/geometry.rs index 8efeb59..a1f8dde 100644 --- a/crates/embers-core/src/geometry.rs +++ b/crates/embers-core/src/geometry.rs @@ -1,4 +1,6 @@ -#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)] +use serde::{Deserialize, Serialize}; + +#[derive(Clone, Copy, Debug, Default, PartialEq, Eq, Serialize, Deserialize)] pub struct PtySize { pub cols: u16, pub rows: u16, @@ -17,19 +19,19 @@ impl PtySize { } } -#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)] +#[derive(Clone, Copy, Debug, Default, PartialEq, Eq, Serialize, Deserialize)] pub struct Point { pub x: i32, pub y: i32, } -#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)] +#[derive(Clone, Copy, Debug, Default, PartialEq, Eq, Serialize, Deserialize)] pub struct Size { pub width: u16, pub height: u16, } -#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)] +#[derive(Clone, Copy, Debug, Default, PartialEq, Eq, Serialize, Deserialize)] pub struct Rect { pub origin: Point, pub size: Size, @@ -44,7 +46,7 @@ impl Rect { } } -#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)] +#[derive(Clone, Copy, Debug, Default, PartialEq, Eq, Serialize, Deserialize)] pub struct FloatGeometry { pub x: u16, pub y: u16, @@ -63,7 +65,7 @@ impl FloatGeometry { } } -#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)] +#[derive(Clone, Copy, Debug, Default, PartialEq, Eq, Serialize, Deserialize)] pub enum SplitDirection { #[default] Horizontal, diff --git a/crates/embers-protocol/schema/embers.fbs b/crates/embers-protocol/schema/embers.fbs index f902ef7..925a956 100644 --- a/crates/embers-protocol/schema/embers.fbs +++ b/crates/embers-protocol/schema/embers.fbs @@ -111,7 +111,8 @@ enum ActivityStateWire : ubyte { enum BufferStateWire : ubyte { Created = 0, Running = 1, - Exited = 2, + Interrupted = 2, + Exited = 3, } enum NodeRecordKindWire : ubyte { diff --git a/crates/embers-protocol/src/codec.rs b/crates/embers-protocol/src/codec.rs index 2aef7f9..263f404 100644 --- a/crates/embers-protocol/src/codec.rs +++ b/crates/embers-protocol/src/codec.rs @@ -1554,6 +1554,7 @@ fn encode_buffer_record<'a>( let state = match record.state { BufferRecordState::Created => fb::BufferStateWire::Created, BufferRecordState::Running => fb::BufferStateWire::Running, + BufferRecordState::Interrupted => fb::BufferStateWire::Interrupted, BufferRecordState::Exited => fb::BufferStateWire::Exited, }; @@ -2371,6 +2372,7 @@ fn decode_buffer_record(record: fb::BufferRecord) -> Result BufferRecordState::Created, fb::BufferStateWire::Running => BufferRecordState::Running, + fb::BufferStateWire::Interrupted => BufferRecordState::Interrupted, fb::BufferStateWire::Exited => BufferRecordState::Exited, _ => return Err(ProtocolError::InvalidMessage("unknown buffer state")), }; diff --git a/crates/embers-protocol/src/types.rs b/crates/embers-protocol/src/types.rs index aebc5fc..548efa2 100644 --- a/crates/embers-protocol/src/types.rs +++ b/crates/embers-protocol/src/types.rs @@ -335,6 +335,7 @@ impl ClientMessage { pub enum BufferRecordState { Created, Running, + Interrupted, Exited, } diff --git a/crates/embers-server/Cargo.toml b/crates/embers-server/Cargo.toml index c2a986e..68b6c63 100644 --- a/crates/embers-server/Cargo.toml +++ b/crates/embers-server/Cargo.toml @@ -10,6 +10,8 @@ alacritty_terminal = "0.25.1" embers-core = { path = "../embers-core" } embers-protocol = { path = "../embers-protocol" } portable-pty.workspace = true +serde.workspace = true +serde_json.workspace = true tokio.workspace = true tracing.workspace = true diff --git a/crates/embers-server/src/config.rs b/crates/embers-server/src/config.rs index 674f2f4..782a70a 100644 --- a/crates/embers-server/src/config.rs +++ b/crates/embers-server/src/config.rs @@ -7,6 +7,7 @@ pub const SOCKET_ENV_VAR: &str = "EMBERS_SOCKET"; #[derive(Clone, Debug, PartialEq, Eq)] pub struct ServerConfig { pub socket_path: PathBuf, + pub workspace_path: PathBuf, pub buffer_env: BTreeMap, } @@ -17,8 +18,10 @@ impl ServerConfig { SOCKET_ENV_VAR.to_owned(), socket_path.as_os_str().to_owned(), ); + let workspace_path = socket_path.with_extension("workspace.json"); Self { socket_path, + workspace_path, buffer_env, } } diff --git a/crates/embers-server/src/lib.rs b/crates/embers-server/src/lib.rs index 85615b8..6d2867d 100644 --- a/crates/embers-server/src/lib.rs +++ b/crates/embers-server/src/lib.rs @@ -3,6 +3,7 @@ pub mod state; mod buffer_runtime; mod config; +mod persist; mod protocol; mod server; mod terminal_backend; @@ -11,7 +12,7 @@ pub use buffer_runtime::{BufferRuntimeCallbacks, BufferRuntimeHandle}; pub use config::{SOCKET_ENV_VAR, ServerConfig}; pub use model::{ Buffer, BufferAttachment, BufferState, BufferViewNode, BufferViewState, ExitedBuffer, - FloatingWindow, Node, RunningBuffer, Session, SplitNode, TabEntry, TabsNode, + FloatingWindow, InterruptedBuffer, Node, RunningBuffer, Session, SplitNode, TabEntry, TabsNode, }; pub use server::{Server, ServerHandle}; pub use state::ServerState; diff --git a/crates/embers-server/src/model.rs b/crates/embers-server/src/model.rs index c266daf..f8dc1de 100644 --- a/crates/embers-server/src/model.rs +++ b/crates/embers-server/src/model.rs @@ -37,6 +37,11 @@ pub struct RunningBuffer { pub pid: Option, } +#[derive(Clone, Debug, Default, PartialEq, Eq)] +pub struct InterruptedBuffer { + pub last_known_pid: Option, +} + #[derive(Clone, Debug, PartialEq, Eq)] pub struct ExitedBuffer { pub exit_code: Option, @@ -48,6 +53,7 @@ pub enum BufferState { #[default] Created, Running(RunningBuffer), + Interrupted(InterruptedBuffer), Exited(ExitedBuffer), } diff --git a/crates/embers-server/src/persist.rs b/crates/embers-server/src/persist.rs new file mode 100644 index 0000000..c4e41ab --- /dev/null +++ b/crates/embers-server/src/persist.rs @@ -0,0 +1,444 @@ +use std::collections::BTreeMap; +use std::fs; +use std::path::{Path, PathBuf}; +use std::time::{Duration, UNIX_EPOCH}; + +use embers_core::{ + ActivityState, BufferId, FloatGeometry, FloatingId, MuxError, NodeId, PtySize, Result, + SessionId, SplitDirection, Timestamp, +}; +use serde::{Deserialize, Serialize}; + +use crate::model::{ + Buffer, BufferAttachment, BufferState, BufferViewNode, BufferViewState, ExitedBuffer, + FloatingWindow, InterruptedBuffer, Node, RunningBuffer, Session, SplitNode, TabEntry, TabsNode, +}; +use crate::state::ServerState; + +#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)] +pub struct PersistedWorkspace { + pub sessions: Vec, + pub buffers: Vec, + pub nodes: Vec, + pub floating: Vec, + pub next_session_id: u64, + pub next_buffer_id: u64, + pub next_node_id: u64, + pub next_floating_id: u64, +} + +#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)] +pub struct PersistedSession { + pub id: u64, + pub name: String, + pub root_node: u64, + pub floating: Vec, + pub focused_leaf: Option, + pub focused_floating: Option, + pub created_at_ms: u64, +} + +#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)] +pub struct PersistedBuffer { + pub id: u64, + pub title: String, + pub command: Vec, + pub cwd: Option, + pub env: BTreeMap, + pub state: PersistedBufferState, + pub attachment: PersistedBufferAttachment, + pub pty_size: PtySize, + pub activity: PersistedActivityState, + pub last_snapshot_seq: u64, + pub created_at_ms: u64, +} + +#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)] +#[serde(tag = "kind", rename_all = "snake_case")] +pub enum PersistedBufferState { + Created, + Running { + pid: Option, + }, + Interrupted { + last_known_pid: Option, + }, + Exited { + exit_code: Option, + exited_at_ms: u64, + }, +} + +#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)] +#[serde(tag = "kind", rename_all = "snake_case")] +pub enum PersistedBufferAttachment { + Attached { node_id: u64 }, + Detached, +} + +#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)] +#[serde(tag = "kind", rename_all = "snake_case")] +pub enum PersistedNode { + BufferView { + id: u64, + session_id: u64, + parent: Option, + buffer_id: u64, + focused: bool, + zoomed: bool, + follow_output: bool, + last_render_size: PtySize, + }, + Split { + id: u64, + session_id: u64, + parent: Option, + direction: PersistedSplitDirection, + children: Vec, + sizes: Vec, + last_focused_descendant: Option, + }, + Tabs { + id: u64, + session_id: u64, + parent: Option, + tabs: Vec, + active: usize, + last_focused_descendant: Option, + }, +} + +#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)] +pub struct PersistedTabEntry { + pub title: String, + pub child: u64, +} + +#[derive(Clone, Copy, Debug, Serialize, Deserialize, PartialEq, Eq)] +#[serde(rename_all = "snake_case")] +pub enum PersistedSplitDirection { + Horizontal, + Vertical, +} + +#[derive(Clone, Copy, Debug, Serialize, Deserialize, PartialEq, Eq)] +#[serde(rename_all = "snake_case")] +pub enum PersistedActivityState { + Idle, + Activity, + Bell, +} + +#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)] +pub struct PersistedFloatingWindow { + pub id: u64, + pub session_id: u64, + pub root_node: u64, + pub title: Option, + pub geometry: FloatGeometry, + pub focused: bool, + pub visible: bool, + pub close_on_empty: bool, + pub last_focused_leaf: Option, +} + +pub fn load_workspace(path: &Path) -> Result> { + match fs::read(path) { + Ok(bytes) => { + let persisted: PersistedWorkspace = serde_json::from_slice(&bytes) + .map_err(|error| MuxError::internal(error.to_string()))?; + Ok(Some(ServerState::from_persisted(persisted)?)) + } + Err(error) if error.kind() == std::io::ErrorKind::NotFound => Ok(None), + Err(error) => Err(error.into()), + } +} + +pub fn save_workspace(path: &Path, state: &ServerState) -> Result<()> { + if let Some(parent) = path.parent() { + fs::create_dir_all(parent)?; + } + let bytes = serde_json::to_vec_pretty(&state.to_persisted()) + .map_err(|error| MuxError::internal(error.to_string()))?; + fs::write(path, bytes)?; + Ok(()) +} + +pub fn persisted_session(session: &Session) -> PersistedSession { + PersistedSession { + id: session.id.0, + name: session.name.clone(), + root_node: session.root_node.0, + floating: session.floating.iter().map(|id| id.0).collect(), + focused_leaf: session.focused_leaf.map(|id| id.0), + focused_floating: session.focused_floating.map(|id| id.0), + created_at_ms: timestamp_to_millis(session.created_at), + } +} + +pub fn restored_session(session: PersistedSession) -> Session { + Session { + id: SessionId(session.id), + name: session.name, + root_node: NodeId(session.root_node), + floating: session.floating.into_iter().map(FloatingId).collect(), + focused_leaf: session.focused_leaf.map(NodeId), + focused_floating: session.focused_floating.map(FloatingId), + created_at: timestamp_from_millis(session.created_at_ms), + } +} + +pub fn persisted_buffer(buffer: &Buffer) -> PersistedBuffer { + PersistedBuffer { + id: buffer.id.0, + title: buffer.title.clone(), + command: buffer.command.clone(), + cwd: buffer.cwd.clone(), + env: buffer.env.clone(), + state: persisted_buffer_state(&buffer.state), + attachment: persisted_buffer_attachment(&buffer.attachment), + pty_size: buffer.pty_size, + activity: persisted_activity(buffer.activity), + last_snapshot_seq: buffer.last_snapshot_seq, + created_at_ms: timestamp_to_millis(buffer.created_at), + } +} + +pub fn restored_buffer(buffer: PersistedBuffer) -> Buffer { + Buffer { + id: BufferId(buffer.id), + title: buffer.title, + command: buffer.command, + cwd: buffer.cwd, + env: buffer.env, + state: restored_buffer_state(buffer.state), + attachment: restored_buffer_attachment(buffer.attachment), + pty_size: buffer.pty_size, + activity: restored_activity(buffer.activity), + last_snapshot_seq: buffer.last_snapshot_seq, + created_at: timestamp_from_millis(buffer.created_at_ms), + } +} + +pub fn persisted_node(node: &Node) -> PersistedNode { + match node { + Node::BufferView(node) => PersistedNode::BufferView { + id: node.id.0, + session_id: node.session_id.0, + parent: node.parent.map(|id| id.0), + buffer_id: node.buffer_id.0, + focused: node.view.focused, + zoomed: node.view.zoomed, + follow_output: node.view.follow_output, + last_render_size: node.view.last_render_size, + }, + Node::Split(node) => PersistedNode::Split { + id: node.id.0, + session_id: node.session_id.0, + parent: node.parent.map(|id| id.0), + direction: persisted_split_direction(node.direction), + children: node.children.iter().map(|id| id.0).collect(), + sizes: node.sizes.clone(), + last_focused_descendant: node.last_focused_descendant.map(|id| id.0), + }, + Node::Tabs(node) => PersistedNode::Tabs { + id: node.id.0, + session_id: node.session_id.0, + parent: node.parent.map(|id| id.0), + tabs: node + .tabs + .iter() + .map(|tab| PersistedTabEntry { + title: tab.title.clone(), + child: tab.child.0, + }) + .collect(), + active: node.active, + last_focused_descendant: node.last_focused_descendant.map(|id| id.0), + }, + } +} + +pub fn restored_node(node: PersistedNode) -> Node { + match node { + PersistedNode::BufferView { + id, + session_id, + parent, + buffer_id, + focused, + zoomed, + follow_output, + last_render_size, + } => Node::BufferView(BufferViewNode { + id: NodeId(id), + session_id: SessionId(session_id), + parent: parent.map(NodeId), + buffer_id: BufferId(buffer_id), + view: BufferViewState { + focused, + zoomed, + follow_output, + last_render_size, + }, + }), + PersistedNode::Split { + id, + session_id, + parent, + direction, + children, + sizes, + last_focused_descendant, + } => Node::Split(SplitNode { + id: NodeId(id), + session_id: SessionId(session_id), + parent: parent.map(NodeId), + direction: restored_split_direction(direction), + children: children.into_iter().map(NodeId).collect(), + sizes, + last_focused_descendant: last_focused_descendant.map(NodeId), + }), + PersistedNode::Tabs { + id, + session_id, + parent, + tabs, + active, + last_focused_descendant, + } => Node::Tabs(TabsNode { + id: NodeId(id), + session_id: SessionId(session_id), + parent: parent.map(NodeId), + tabs: tabs + .into_iter() + .map(|tab| TabEntry { + title: tab.title, + child: NodeId(tab.child), + }) + .collect(), + active, + last_focused_descendant: last_focused_descendant.map(NodeId), + }), + } +} + +pub fn persisted_floating(window: &FloatingWindow) -> PersistedFloatingWindow { + PersistedFloatingWindow { + id: window.id.0, + session_id: window.session_id.0, + root_node: window.root_node.0, + title: window.title.clone(), + geometry: window.geometry, + focused: window.focused, + visible: window.visible, + close_on_empty: window.close_on_empty, + last_focused_leaf: window.last_focused_leaf.map(|id| id.0), + } +} + +pub fn restored_floating(window: PersistedFloatingWindow) -> FloatingWindow { + FloatingWindow { + id: FloatingId(window.id), + session_id: SessionId(window.session_id), + root_node: NodeId(window.root_node), + title: window.title, + geometry: window.geometry, + focused: window.focused, + visible: window.visible, + close_on_empty: window.close_on_empty, + last_focused_leaf: window.last_focused_leaf.map(NodeId), + } +} + +fn persisted_buffer_state(state: &BufferState) -> PersistedBufferState { + match state { + BufferState::Created => PersistedBufferState::Created, + BufferState::Running(running) => PersistedBufferState::Running { pid: running.pid }, + BufferState::Interrupted(interrupted) => PersistedBufferState::Interrupted { + last_known_pid: interrupted.last_known_pid, + }, + BufferState::Exited(exited) => PersistedBufferState::Exited { + exit_code: exited.exit_code, + exited_at_ms: timestamp_to_millis(exited.exited_at), + }, + } +} + +fn restored_buffer_state(state: PersistedBufferState) -> BufferState { + match state { + PersistedBufferState::Created => BufferState::Created, + PersistedBufferState::Running { pid } => BufferState::Running(RunningBuffer { pid }), + PersistedBufferState::Interrupted { last_known_pid } => { + BufferState::Interrupted(InterruptedBuffer { last_known_pid }) + } + PersistedBufferState::Exited { + exit_code, + exited_at_ms, + } => BufferState::Exited(ExitedBuffer { + exit_code, + exited_at: timestamp_from_millis(exited_at_ms), + }), + } +} + +fn persisted_buffer_attachment(attachment: &BufferAttachment) -> PersistedBufferAttachment { + match attachment { + BufferAttachment::Attached(node_id) => { + PersistedBufferAttachment::Attached { node_id: node_id.0 } + } + BufferAttachment::Detached => PersistedBufferAttachment::Detached, + } +} + +fn restored_buffer_attachment(attachment: PersistedBufferAttachment) -> BufferAttachment { + match attachment { + PersistedBufferAttachment::Attached { node_id } => { + BufferAttachment::Attached(NodeId(node_id)) + } + PersistedBufferAttachment::Detached => BufferAttachment::Detached, + } +} + +fn persisted_split_direction(direction: SplitDirection) -> PersistedSplitDirection { + match direction { + SplitDirection::Horizontal => PersistedSplitDirection::Horizontal, + SplitDirection::Vertical => PersistedSplitDirection::Vertical, + } +} + +fn restored_split_direction(direction: PersistedSplitDirection) -> SplitDirection { + match direction { + PersistedSplitDirection::Horizontal => SplitDirection::Horizontal, + PersistedSplitDirection::Vertical => SplitDirection::Vertical, + } +} + +fn persisted_activity(activity: ActivityState) -> PersistedActivityState { + match activity { + ActivityState::Idle => PersistedActivityState::Idle, + ActivityState::Activity => PersistedActivityState::Activity, + ActivityState::Bell => PersistedActivityState::Bell, + } +} + +fn restored_activity(activity: PersistedActivityState) -> ActivityState { + match activity { + PersistedActivityState::Idle => ActivityState::Idle, + PersistedActivityState::Activity => ActivityState::Activity, + PersistedActivityState::Bell => ActivityState::Bell, + } +} + +fn timestamp_to_millis(timestamp: Timestamp) -> u64 { + timestamp + .0 + .duration_since(UNIX_EPOCH) + .unwrap_or_default() + .as_millis() + .min(u128::from(u64::MAX)) as u64 +} + +fn timestamp_from_millis(millis: u64) -> Timestamp { + Timestamp(UNIX_EPOCH + Duration::from_millis(millis)) +} diff --git a/crates/embers-server/src/protocol.rs b/crates/embers-server/src/protocol.rs index 9f0ae37..e907b88 100644 --- a/crates/embers-server/src/protocol.rs +++ b/crates/embers-server/src/protocol.rs @@ -22,6 +22,11 @@ pub fn buffer_record(buffer: &Buffer) -> BufferRecord { let (state, pid, exit_code) = match &buffer.state { BufferState::Created => (BufferRecordState::Created, None, None), BufferState::Running(running) => (BufferRecordState::Running, running.pid, None), + BufferState::Interrupted(interrupted) => ( + BufferRecordState::Interrupted, + interrupted.last_known_pid, + None, + ), BufferState::Exited(exited) => (BufferRecordState::Exited, None, exited.exit_code), }; diff --git a/crates/embers-server/src/server.rs b/crates/embers-server/src/server.rs index 9504431..984bcc1 100644 --- a/crates/embers-server/src/server.rs +++ b/crates/embers-server/src/server.rs @@ -52,7 +52,12 @@ impl Server { let listener = UnixListener::bind(&self.config.socket_path)?; set_socket_permissions(&self.config.socket_path)?; let socket_path = self.config.socket_path.clone(); - let runtime = Arc::new(Runtime::new(self.config.buffer_env.clone())); + let restored_state = load_workspace(&self.config.workspace_path)?; + let runtime = Arc::new(Runtime::new( + restored_state.unwrap_or_default(), + self.config.workspace_path.clone(), + self.config.buffer_env.clone(), + )); let (shutdown_tx, mut shutdown_rx) = oneshot::channel(); let join = tokio::spawn(async move { @@ -100,6 +105,9 @@ impl Server { } } + if let Err(error) = runtime.persist_workspace().await { + error!(%error, "failed to persist workspace during shutdown"); + } runtime.shutdown_runtimes().await; Ok(()) }); @@ -163,6 +171,7 @@ struct Runtime { state: Mutex, buffer_runtimes: Mutex>, buffer_surfaces: Mutex>, + workspace_path: PathBuf, buffer_env: BTreeMap, subscriptions: Mutex>, next_connection_id: AtomicU64, @@ -241,11 +250,16 @@ impl BufferSurface { } impl Runtime { - fn new(buffer_env: BTreeMap) -> Self { + fn new( + state: ServerState, + workspace_path: PathBuf, + buffer_env: BTreeMap, + ) -> Self { Self { - state: Mutex::new(ServerState::new()), + state: Mutex::new(state), buffer_runtimes: Mutex::new(BTreeMap::new()), buffer_surfaces: Mutex::new(BTreeMap::new()), + workspace_path, buffer_env, subscriptions: Mutex::new(BTreeMap::new()), next_connection_id: AtomicU64::new(1), @@ -255,6 +269,11 @@ impl Runtime { } impl Runtime { + async fn persist_workspace(&self) -> Result<()> { + let state = self.state.lock().await; + save_workspace(&self.workspace_path, &state) + } + async fn dispatch_request( self: &Arc, connection_id: u64, @@ -1230,6 +1249,9 @@ impl Runtime { BufferState::Running(_) => Err(MuxError::internal(format!( "buffer {buffer_id} is marked running without an active runtime" ))), + BufferState::Interrupted(_) => Err(MuxError::conflict(format!( + "buffer {buffer_id} was restored without a running runtime" + ))), BufferState::Exited(_) => Err(MuxError::conflict(format!( "buffer {buffer_id} has already exited" ))), diff --git a/crates/embers-server/src/state.rs b/crates/embers-server/src/state.rs index 96cf73f..4235b9d 100644 --- a/crates/embers-server/src/state.rs +++ b/crates/embers-server/src/state.rs @@ -8,7 +8,11 @@ use embers_core::{ use crate::model::{ Buffer, BufferAttachment, BufferState, BufferViewNode, BufferViewState, ExitedBuffer, - FloatingWindow, Node, RunningBuffer, Session, SplitNode, TabEntry, TabsNode, + FloatingWindow, InterruptedBuffer, Node, RunningBuffer, Session, SplitNode, TabEntry, TabsNode, +}; +use crate::persist::{ + PersistedWorkspace, persisted_buffer, persisted_floating, persisted_node, persisted_session, + restored_buffer, restored_floating, restored_node, restored_session, }; #[derive(Debug)] @@ -43,6 +47,63 @@ impl ServerState { } } + pub fn from_persisted(workspace: PersistedWorkspace) -> Result { + let mut state = Self { + sessions: workspace + .sessions + .into_iter() + .map(|session| { + let session = restored_session(session); + (session.id, session) + }) + .collect(), + buffers: workspace + .buffers + .into_iter() + .map(|buffer| { + let buffer = restored_buffer(buffer); + (buffer.id, buffer) + }) + .collect(), + nodes: workspace + .nodes + .into_iter() + .map(|node| { + let node = restored_node(node); + (node.id(), node) + }) + .collect(), + floating: workspace + .floating + .into_iter() + .map(|floating| { + let floating = restored_floating(floating); + (floating.id, floating) + }) + .collect(), + session_ids: IdAllocator::new(workspace.next_session_id), + buffer_ids: IdAllocator::new(workspace.next_buffer_id), + node_ids: IdAllocator::new(workspace.next_node_id), + floating_ids: IdAllocator::new(workspace.next_floating_id), + }; + state.interrupt_unrecoverable_buffers(); + state.validate()?; + Ok(state) + } + + pub fn to_persisted(&self) -> PersistedWorkspace { + PersistedWorkspace { + sessions: self.sessions.values().map(persisted_session).collect(), + buffers: self.buffers.values().map(persisted_buffer).collect(), + nodes: self.nodes.values().map(persisted_node).collect(), + floating: self.floating.values().map(persisted_floating).collect(), + next_session_id: next_id_after_max(self.sessions.keys().map(|id| id.0)), + next_buffer_id: next_id_after_max(self.buffers.keys().map(|id| id.0)), + next_node_id: next_id_after_max(self.nodes.keys().map(|id| id.0)), + next_floating_id: next_id_after_max(self.floating.keys().map(|id| id.0)), + } + } + pub fn session(&self, session_id: SessionId) -> Result<&Session> { self.sessions .get(&session_id) @@ -282,6 +343,23 @@ impl ServerState { Ok(()) } + pub fn interrupt_unrecoverable_buffers(&mut self) { + for buffer in self.buffers.values_mut() { + buffer.state = match &buffer.state { + BufferState::Exited(exited) => BufferState::Exited(exited.clone()), + BufferState::Running(running) => BufferState::Interrupted(InterruptedBuffer { + last_known_pid: running.pid, + }), + BufferState::Interrupted(interrupted) => { + BufferState::Interrupted(interrupted.clone()) + } + BufferState::Created => BufferState::Interrupted(InterruptedBuffer { + last_known_pid: None, + }), + }; + } + } + pub fn set_buffer_size(&mut self, buffer_id: BufferId, size: PtySize) -> Result<()> { self.buffer_mut(buffer_id)?.pty_size = size; Ok(()) @@ -1967,3 +2045,7 @@ impl ServerState { .ok_or_else(|| MuxError::not_found(format!("unknown floating window {floating_id}"))) } } + +fn next_id_after_max(ids: impl Iterator) -> u64 { + ids.max().unwrap_or(0).saturating_add(1) +} diff --git a/crates/embers-server/tests/persistence.rs b/crates/embers-server/tests/persistence.rs new file mode 100644 index 0000000..ca6be64 --- /dev/null +++ b/crates/embers-server/tests/persistence.rs @@ -0,0 +1,173 @@ +use embers_core::{BufferId, RequestId, init_test_tracing}; +use embers_protocol::{ + BufferRecordState, BufferRequest, BufferResponse, BuffersResponse, ClientMessage, + ProtocolClient, ServerResponse, SessionRequest, SessionSnapshotResponse, +}; +use embers_server::{Server, ServerConfig}; +use tempfile::tempdir; + +async fn request_session_snapshot( + client: &mut ProtocolClient, + request: SessionRequest, +) -> SessionSnapshotResponse { + match client + .request(&ClientMessage::Session(request)) + .await + .expect("session request succeeds") + { + ServerResponse::SessionSnapshot(response) => response, + other => panic!("expected session snapshot, got {other:?}"), + } +} + +async fn request_buffer(client: &mut ProtocolClient, request: BufferRequest) -> BufferResponse { + match client + .request(&ClientMessage::Buffer(request)) + .await + .expect("buffer request succeeds") + { + ServerResponse::Buffer(response) => response, + other => panic!("expected buffer response, got {other:?}"), + } +} + +async fn request_buffers(client: &mut ProtocolClient, request: BufferRequest) -> BuffersResponse { + match client + .request(&ClientMessage::Buffer(request)) + .await + .expect("buffer list succeeds") + { + ServerResponse::Buffers(response) => response, + other => panic!("expected buffers response, got {other:?}"), + } +} + +#[tokio::test] +async fn clean_restart_restores_workspace_and_marks_live_buffers_interrupted() { + init_test_tracing(); + + let tempdir = tempdir().expect("tempdir"); + let socket_path = tempdir.path().join("mux.sock"); + let config = ServerConfig::new(socket_path.clone()); + let workspace_path = config.workspace_path.clone(); + + let handle = Server::new(config.clone()) + .start() + .await + .expect("start server"); + let mut client = ProtocolClient::connect(&socket_path) + .await + .expect("connect client"); + + let session = request_session_snapshot( + &mut client, + SessionRequest::Create { + request_id: RequestId(1), + name: "main".to_owned(), + }, + ) + .await; + let session_id = session.snapshot.session.id; + + let attached = request_buffer( + &mut client, + BufferRequest::Create { + request_id: RequestId(2), + title: Some("attached".to_owned()), + command: vec!["/bin/sh".to_owned()], + cwd: None, + env: Default::default(), + }, + ) + .await + .buffer; + + let detached = request_buffer( + &mut client, + BufferRequest::Create { + request_id: RequestId(3), + title: Some("detached".to_owned()), + command: vec!["/bin/sh".to_owned()], + cwd: None, + env: Default::default(), + }, + ) + .await + .buffer; + + let attached_id = attached.id; + let detached_id = detached.id; + + let restored_layout = request_session_snapshot( + &mut client, + SessionRequest::AddRootTab { + request_id: RequestId(4), + session_id, + title: "shell".to_owned(), + buffer_id: Some(attached_id), + child_node_id: None, + }, + ) + .await; + assert_eq!(restored_layout.snapshot.session.id, session_id); + + handle.shutdown().await.expect("shutdown server"); + assert!(workspace_path.exists()); + + let handle = Server::new(config).start().await.expect("restart server"); + let mut client = ProtocolClient::connect(&socket_path) + .await + .expect("reconnect client"); + + let session = request_session_snapshot( + &mut client, + SessionRequest::Get { + request_id: RequestId(5), + session_id, + }, + ) + .await; + assert_eq!(session.snapshot.session.name, "main"); + let attached_buffer = session + .snapshot + .buffers + .iter() + .find(|buffer| buffer.id == attached_id) + .expect("attached buffer restored"); + assert_eq!(attached_buffer.state, BufferRecordState::Interrupted); + assert!(attached_buffer.attachment_node_id.is_some()); + + let buffers = request_buffers( + &mut client, + BufferRequest::List { + request_id: RequestId(6), + session_id: None, + attached_only: false, + detached_only: false, + }, + ) + .await; + let detached_buffer = buffers + .buffers + .iter() + .find(|buffer| buffer.id == detached_id) + .expect("detached buffer restored"); + assert_eq!(detached_buffer.state, BufferRecordState::Interrupted); + assert_eq!(detached_buffer.attachment_node_id, None); + + let send_err = client + .request(&ClientMessage::Buffer(BufferRequest::Get { + request_id: RequestId(7), + buffer_id: BufferId(detached_id.0), + })) + .await + .expect("buffer get succeeds"); + match send_err { + ServerResponse::Buffer(response) => { + assert_eq!(response.buffer.state, BufferRecordState::Interrupted); + } + other => panic!("expected restored interrupted buffer, got {other:?}"), + } + + handle.shutdown().await.expect("shutdown restarted server"); +}