From 22004fa6df101e9ae263aca4e82292704ec9ed11 Mon Sep 17 00:00:00 2001 From: James Dumay Date: Wed, 15 Apr 2026 17:42:39 +1000 Subject: [PATCH] Rename mesh protobuf module and split config schema --- mesh-llm/build.rs | 4 +- mesh-llm/docs/DESIGN.md | 4 +- mesh-llm/docs/TESTING.md | 2 +- mesh-llm/docs/message_protocol.md | 13 ++- mesh-llm/proto/config.proto | 90 ++++++++++++++++++ mesh-llm/proto/node.proto | 88 ------------------ mesh-llm/src/lib.rs | 6 +- mesh-llm/src/mesh/heartbeat.rs | 4 +- mesh-llm/src/mesh/mod.rs | 68 +++++++------- mesh-llm/src/mesh/tests.rs | 76 +++++++-------- mesh-llm/src/models/inventory.rs | 16 ++-- mesh-llm/src/protocol/convert.rs | 148 +++++++++++++++--------------- mesh-llm/src/protocol/mod.rs | 84 ++++++++--------- mesh-llm/src/protocol/v0.rs | 6 +- 14 files changed, 312 insertions(+), 297 deletions(-) create mode 100644 mesh-llm/proto/config.proto diff --git a/mesh-llm/build.rs b/mesh-llm/build.rs index 756d4716..7bcd556c 100644 --- a/mesh-llm/build.rs +++ b/mesh-llm/build.rs @@ -27,6 +27,6 @@ fn compile_node_proto() { std::env::set_var("PROTOC", protoc); prost_build::Config::new() - .compile_protos(&["proto/node.proto"], &["proto"]) - .expect("compile node proto"); + .compile_protos(&["proto/node.proto", "proto/config.proto"], &["proto"]) + .expect("compile mesh and config proto"); } diff --git a/mesh-llm/docs/DESIGN.md b/mesh-llm/docs/DESIGN.md index b121fd4e..7d8640f4 100644 --- a/mesh-llm/docs/DESIGN.md +++ b/mesh-llm/docs/DESIGN.md @@ -49,13 +49,13 @@ enum NodeRole { } ``` -Roles are exchanged via gossip. Preferred peers use `meshllm.node.v1` protobuf on QUIC ALPN `mesh-llm/1`; legacy peers may still negotiate `mesh-llm/0` and use the older JSON gossip payloads. A node transitions Worker → Host when elected. +Roles are exchanged via gossip. Preferred peers use protobuf on QUIC ALPN `mesh-llm/1`, with `meshllm.node.v1` for mesh state and `meshllm.config.v1` for owner-gated config sync; legacy peers may still negotiate `mesh-llm/0` and use the older JSON gossip payloads. A node transitions Worker → Host when elected. A newly connected peer is quarantined until it sends a valid `GossipFrame` with `gen = 1` (quarantine-until-gossip admission model). Only streams 0x01 (GOSSIP) and 0x05 (ROUTE_REQUEST) are accepted before admission. All other streams are rejected until the peer is admitted. ## Control-Plane Protocol -The control plane prefers QUIC ALPN `mesh-llm/1` using the `meshllm.node.v1` protobuf schema. Scoped control-plane streams on `/1` use 4-byte LE framing followed by protobuf bytes. For backward compatibility, peers may also negotiate `mesh-llm/0`, which preserves the legacy JSON/raw payloads on those same streams. +The control plane prefers QUIC ALPN `mesh-llm/1` using split protobuf schemas: `meshllm.node.v1` for mesh state and `meshllm.config.v1` for config sync. Scoped control-plane streams on `/1` use 4-byte LE framing followed by protobuf bytes. For backward compatibility, peers may also negotiate `mesh-llm/0`, which preserves the legacy JSON/raw payloads on those same streams. Mixed meshes containing both `/0` and `/1` nodes are supported. `/0` links are compatibility mode only, so they do not carry protobuf-only fields. diff --git a/mesh-llm/docs/TESTING.md b/mesh-llm/docs/TESTING.md index 6b1d6f21..d246e476 100644 --- a/mesh-llm/docs/TESTING.md +++ b/mesh-llm/docs/TESTING.md @@ -479,7 +479,7 @@ mesh-llm --model Qwen3-Coder-Next-Q4_K_M --auto --no-self-update --split --join ## Control-Plane Protocol (Protobuf v1) -The control plane prefers QUIC ALPN `mesh-llm/1` using the `meshllm.node.v1` protobuf schema. On `/1`, all five scoped control-plane streams use 4-byte LE framing followed by protobuf bytes. For backward compatibility, nodes may also negotiate `mesh-llm/0`, which keeps the legacy JSON/raw payloads on those same streams. +The control plane prefers QUIC ALPN `mesh-llm/1` using split protobuf schemas: `meshllm.node.v1` for mesh state and `meshllm.config.v1` for config sync. On `/1`, all scoped protobuf control-plane streams use 4-byte LE framing followed by protobuf bytes. For backward compatibility, nodes may also negotiate `mesh-llm/0`, which keeps the legacy JSON/raw payloads on those same streams. | Stream | Type | Format | |--------|------|--------| diff --git a/mesh-llm/docs/message_protocol.md b/mesh-llm/docs/message_protocol.md index b513cef7..7ad19c72 100644 --- a/mesh-llm/docs/message_protocol.md +++ b/mesh-llm/docs/message_protocol.md @@ -1,6 +1,6 @@ # mesh-llm Message Protocol -This document describes the wire protocol for control-plane communication between mesh-llm nodes. Control-plane traffic prefers the `meshllm.node.v1` protobuf schema on QUIC ALPN `mesh-llm/1`, with backward-compatible support for the legacy `mesh-llm/0` JSON/raw payloads. +This document describes the wire protocol for control-plane communication between mesh-llm nodes. Control-plane traffic on QUIC ALPN `mesh-llm/1` uses protobuf, split between `meshllm.node.v1` for mesh state and `meshllm.config.v1` for owner-gated config sync, with backward-compatible support for the legacy `mesh-llm/0` JSON/raw payloads. ## ALPN @@ -24,12 +24,14 @@ Each QUIC connection carries multiple logical streams, distinguished by a 1-byte | 0x08 | BLACKBOARD | bidirectional | admission-gated auxiliary channel | | 0x09 | PLUGIN_CHANNEL | bidirectional | plugin protocol (see Out-of-Scope) | | 0x0a | PLUGIN_BULK_TRANSFER | send | plugin protocol bulk data (see Out-of-Scope) | +| 0x0b | CONFIG_SUBSCRIBE | bidirectional | protobuf `ConfigSubscribe` / `ConfigSnapshotResponse` / `ConfigUpdateNotification` | +| 0x0c | CONFIG_PUSH | bidirectional | protobuf `ConfigPush` / `ConfigPushResponse` | Streams 0x02 and 0x04 are raw TCP relay tunnels. They carry llama.cpp RPC and HTTP traffic respectively and are not subject to protobuf framing or generation validation. ## Framing -All protobuf control-plane streams (0x01, 0x03, 0x05, 0x06, 0x07) use the same framing: +All protobuf control-plane streams (0x01, 0x03, 0x05, 0x06, 0x07, 0x0b, 0x0c) use the same framing: ``` [1 byte stream type][4 bytes LE length][N bytes protobuf body] @@ -48,13 +50,18 @@ Every protobuf message that carries a `gen` field must have `gen == 1`. Frames w - `RouteTable.gen` - `PeerDown.gen` - `PeerLeaving.gen` +- `ConfigSubscribe.gen` +- `ConfigSnapshotResponse.gen` +- `ConfigUpdateNotification.gen` +- `ConfigPush.gen` +- `ConfigPushResponse.gen` ## Admission (Quarantine-Until-Gossip) A newly connected peer is quarantined until it sends a valid `GossipFrame` with `gen = 1`. Until admission: - Only stream 0x01 (GOSSIP) and 0x05 (ROUTE_REQUEST) are accepted. -- All other streams (0x02, 0x03, 0x04, 0x06, 0x07, 0x08, 0x09, 0x0a) are rejected and the stream is closed. +- All other streams (0x02, 0x03, 0x04, 0x06, 0x07, 0x08, 0x09, 0x0a, 0x0b, 0x0c) are rejected and the stream is closed. - The QUIC connection itself stays open so gossip can complete. A peer is admitted when its negotiated gossip payload decodes successfully and passes validation checks. On `/1` this is a protobuf `GossipFrame`; on `/0` this is the legacy JSON gossip payload. diff --git a/mesh-llm/proto/config.proto b/mesh-llm/proto/config.proto new file mode 100644 index 00000000..9f03a20b --- /dev/null +++ b/mesh-llm/proto/config.proto @@ -0,0 +1,90 @@ +syntax = "proto3"; +package meshllm.config.v1; + +message NodeConfigSnapshot { + uint32 version = 1; // config schema version (currently 1) + NodeGpuConfig gpu = 2; + repeated NodeModelEntry models = 3; + repeated NodePluginEntry plugins = 4; +} + +enum GpuAssignment { + GPU_ASSIGNMENT_UNSPECIFIED = 0; + GPU_ASSIGNMENT_AUTO = 1; + GPU_ASSIGNMENT_PINNED = 2; +} + +message NodeGpuConfig { + GpuAssignment assignment = 1; +} + +message ConfiguredModelRef { + string declared_ref = 1; + optional string source_kind = 2; + optional string revision = 3; +} + +message NodeModelEntry { + string model = 1; + optional string mmproj = 2; + optional uint32 ctx_size = 3; + optional string gpu_id = 4; + ConfiguredModelRef model_ref = 5; + ConfiguredModelRef mmproj_ref = 6; +} + +message NodePluginEntry { + string name = 1; + optional bool enabled = 2; + optional string command = 3; + repeated string args = 4; +} + +message ConfigSubscribe { + uint32 gen = 1; // must equal NODE_PROTOCOL_GENERATION + bytes subscriber_id = 2; // 32 bytes — subscribing node's endpoint id +} + +message ConfigSnapshotResponse { + uint32 gen = 1; // must equal NODE_PROTOCOL_GENERATION + bytes node_id = 2; // 32 bytes — the node this config belongs to + uint64 revision = 4; + bytes config_hash = 5; // SHA-256 of canonical proto bytes (32 bytes) + NodeConfigSnapshot config = 6; + optional string hostname = 7; // convenience: node hostname for display + optional string error = 8; // set when an error occurred; config/hash/node_id may be empty +} + +message ConfigUpdateNotification { + uint32 gen = 1; + bytes node_id = 2; + uint64 revision = 4; + bytes config_hash = 5; + NodeConfigSnapshot config = 6; +} + +message ConfigPush { + uint32 gen = 1; + bytes requester_id = 2; + bytes target_node_id = 3; + uint64 expected_revision = 5; + NodeConfigSnapshot config = 6; + bytes owner_signing_public_key = 7; + bytes signature = 8; +} + +enum ConfigApplyMode { + CONFIG_APPLY_MODE_UNSPECIFIED = 0; + CONFIG_APPLY_MODE_STAGED = 1; + CONFIG_APPLY_MODE_LIVE = 2; + CONFIG_APPLY_MODE_NOOP = 3; +} + +message ConfigPushResponse { + uint32 gen = 1; + bool success = 2; + uint64 current_revision = 3; + bytes config_hash = 4; + optional string error = 5; + ConfigApplyMode apply_mode = 8; +} diff --git a/mesh-llm/proto/node.proto b/mesh-llm/proto/node.proto index ded15f0d..a7e5e3f7 100644 --- a/mesh-llm/proto/node.proto +++ b/mesh-llm/proto/node.proto @@ -225,91 +225,3 @@ enum NodeRole { HOST = 2; CLIENT = 3; } - -message NodeConfigSnapshot { - uint32 version = 1; // config schema version (currently 1) - NodeGpuConfig gpu = 2; - repeated NodeModelEntry models = 3; - repeated NodePluginEntry plugins = 4; -} - -enum GpuAssignment { - GPU_ASSIGNMENT_UNSPECIFIED = 0; - GPU_ASSIGNMENT_AUTO = 1; - GPU_ASSIGNMENT_PINNED = 2; -} - -message NodeGpuConfig { - GpuAssignment assignment = 1; -} - -message ConfiguredModelRef { - string declared_ref = 1; - optional string source_kind = 2; - optional string revision = 3; -} - -message NodeModelEntry { - string model = 1; - optional string mmproj = 2; - optional uint32 ctx_size = 3; - optional string gpu_id = 4; - ConfiguredModelRef model_ref = 5; - ConfiguredModelRef mmproj_ref = 6; -} - -message NodePluginEntry { - string name = 1; - optional bool enabled = 2; - optional string command = 3; - repeated string args = 4; -} - -message ConfigSubscribe { - uint32 gen = 1; // must equal NODE_PROTOCOL_GENERATION - bytes subscriber_id = 2; // 32 bytes — subscribing node's endpoint id -} - -message ConfigSnapshotResponse { - uint32 gen = 1; // must equal NODE_PROTOCOL_GENERATION - bytes node_id = 2; // 32 bytes — the node this config belongs to - uint64 revision = 4; - bytes config_hash = 5; // SHA-256 of canonical proto bytes (32 bytes) - NodeConfigSnapshot config = 6; - optional string hostname = 7; // convenience: node hostname for display - optional string error = 8; // set when an error occurred; config/hash/node_id may be empty -} - -message ConfigUpdateNotification { - uint32 gen = 1; - bytes node_id = 2; - uint64 revision = 4; - bytes config_hash = 5; - NodeConfigSnapshot config = 6; -} - -message ConfigPush { - uint32 gen = 1; - bytes requester_id = 2; - bytes target_node_id = 3; - uint64 expected_revision = 5; - NodeConfigSnapshot config = 6; - bytes owner_signing_public_key = 7; - bytes signature = 8; -} - -enum ConfigApplyMode { - CONFIG_APPLY_MODE_UNSPECIFIED = 0; - CONFIG_APPLY_MODE_STAGED = 1; - CONFIG_APPLY_MODE_LIVE = 2; - CONFIG_APPLY_MODE_NOOP = 3; -} - -message ConfigPushResponse { - uint32 gen = 1; - bool success = 2; - uint64 current_revision = 3; - bytes config_hash = 4; - optional string error = 5; - ConfigApplyMode apply_mode = 8; -} diff --git a/mesh-llm/src/lib.rs b/mesh-llm/src/lib.rs index 2c7aab5e..ba523d11 100644 --- a/mesh-llm/src/lib.rs +++ b/mesh-llm/src/lib.rs @@ -12,7 +12,11 @@ mod runtime; mod system; pub mod proto { - pub mod node { + pub mod config { + include!(concat!(env!("OUT_DIR"), "/meshllm.config.v1.rs")); + } + + pub mod mesh { include!(concat!(env!("OUT_DIR"), "/meshllm.node.v1.rs")); } } diff --git a/mesh-llm/src/mesh/heartbeat.rs b/mesh-llm/src/mesh/heartbeat.rs index bf5e11e6..1f4567ac 100644 --- a/mesh-llm/src/mesh/heartbeat.rs +++ b/mesh-llm/src/mesh/heartbeat.rs @@ -671,7 +671,7 @@ impl Node { send.write_all(&[STREAM_PEER_DOWN]).await?; match protocol { ControlProtocol::ProtoV1 => { - let proto_msg = crate::proto::node::PeerDown { + let proto_msg = crate::proto::mesh::PeerDown { peer_id: bytes, gen: NODE_PROTOCOL_GENERATION, }; @@ -715,7 +715,7 @@ impl Node { send.write_all(&[STREAM_PEER_LEAVING]).await?; match protocol { ControlProtocol::ProtoV1 => { - let proto_msg = crate::proto::node::PeerLeaving { + let proto_msg = crate::proto::mesh::PeerLeaving { peer_id: bytes, gen: NODE_PROTOCOL_GENERATION, }; diff --git a/mesh-llm/src/mesh/mod.rs b/mesh-llm/src/mesh/mod.rs index fb207c3b..34164a21 100644 --- a/mesh-llm/src/mesh/mod.rs +++ b/mesh-llm/src/mesh/mod.rs @@ -855,8 +855,8 @@ pub(crate) struct PeerAnnouncement { pub(crate) gpu_mem_bandwidth_gbps: Option, pub(crate) gpu_compute_tflops_fp32: Option, pub(crate) gpu_compute_tflops_fp16: Option, - pub(crate) available_model_metadata: Vec, - pub(crate) experts_summary: Option, + pub(crate) available_model_metadata: Vec, + pub(crate) experts_summary: Option, pub(crate) available_model_sizes: HashMap, pub(crate) served_model_descriptors: Vec, pub(crate) served_model_runtime: Vec, @@ -908,8 +908,8 @@ pub struct PeerInfo { pub gpu_mem_bandwidth_gbps: Option, pub gpu_compute_tflops_fp32: Option, pub gpu_compute_tflops_fp16: Option, - pub available_model_metadata: Vec, - pub experts_summary: Option, + pub available_model_metadata: Vec, + pub experts_summary: Option, pub available_model_sizes: HashMap, pub served_model_descriptors: Vec, pub served_model_runtime: Vec, @@ -1234,7 +1234,7 @@ pub(crate) fn stream_allowed_before_admission(stream_type: u8) -> bool { pub(crate) fn ingest_tunnel_map( remote: EndpointId, - frame: &crate::proto::node::TunnelMap, + frame: &crate::proto::mesh::TunnelMap, remote_tunnel_maps: &mut HashMap>, ) -> Result<()> { if frame.owner_peer_id.as_slice() != remote.as_bytes() { @@ -1277,7 +1277,7 @@ pub(crate) fn ingest_tunnel_map( /// Returns `Err(ForgedSender)` if `frame.peer_id != remote` — no peer should be removed. pub(crate) fn resolve_peer_leaving( remote: EndpointId, - frame: &crate::proto::node::PeerLeaving, + frame: &crate::proto::mesh::PeerLeaving, ) -> Result { if frame.peer_id.as_slice() != remote.as_bytes() { return Err(ControlFrameError::ForgedSender); @@ -2797,7 +2797,7 @@ impl Node { let (mut send, mut recv) = conn.open_bi().await?; send.write_all(&[STREAM_ROUTE_REQUEST]).await?; if protocol == ControlProtocol::ProtoV1 { - let req = crate::proto::node::RouteTableRequest { + let req = crate::proto::mesh::RouteTableRequest { requester_id: self.endpoint.id().as_bytes().to_vec(), gen: NODE_PROTOCOL_GENERATION, }; @@ -2807,7 +2807,7 @@ impl Node { match protocol { ControlProtocol::ProtoV1 => { let buf = read_len_prefixed(&mut recv).await?; - let proto_table = crate::proto::node::RouteTable::decode(buf.as_slice()) + let proto_table = crate::proto::mesh::RouteTable::decode(buf.as_slice()) .map_err(|e| anyhow::anyhow!("route table decode failed: {e}"))?; proto_table .validate_frame() @@ -2906,16 +2906,16 @@ impl Node { .collect(); let legacy_bytes = serde_json::to_vec(&legacy_json)?; let owner_peer_id = self.endpoint.id().as_bytes().to_vec(); - let entries: Vec = my_tunnel_map + let entries: Vec = my_tunnel_map .iter() - .map(|(id, &port)| crate::proto::node::TunnelEntry { + .map(|(id, &port)| crate::proto::mesh::TunnelEntry { target_peer_id: id.as_bytes().to_vec(), tunnel_port: port as u32, relay_peer_id: None, }) .collect(); - let proto_msg = crate::proto::node::TunnelMap { + let proto_msg = crate::proto::mesh::TunnelMap { owner_peer_id, entries, }; @@ -3241,7 +3241,7 @@ impl Node { return; } }; - let req = match crate::proto::node::RouteTableRequest::decode( + let req = match crate::proto::mesh::RouteTableRequest::decode( proto_buf.as_slice(), ) { Ok(r) => r, @@ -3291,7 +3291,7 @@ impl Node { return; } }; - let frame = match crate::proto::node::PeerDown::decode( + let frame = match crate::proto::mesh::PeerDown::decode( proto_buf.as_slice(), ) { Ok(f) => f, @@ -3451,7 +3451,7 @@ impl Node { return; } }; - let frame = match crate::proto::node::PeerLeaving::decode( + let frame = match crate::proto::mesh::PeerLeaving::decode( proto_buf.as_slice(), ) { Ok(f) => f, @@ -3575,12 +3575,12 @@ impl Node { mut send: iroh::endpoint::SendStream, mut recv: iroh::endpoint::RecvStream, ) -> anyhow::Result<()> { - use crate::proto::node::{ConfigSnapshotResponse, ConfigUpdateNotification}; + use crate::proto::config::{ConfigSnapshotResponse, ConfigUpdateNotification}; use crate::protocol::convert::mesh_config_to_proto; use prost::Message as _; let buf = read_len_prefixed(&mut recv).await?; - let frame = crate::proto::node::ConfigSubscribe::decode(buf.as_slice()) + let frame = crate::proto::config::ConfigSubscribe::decode(buf.as_slice()) .map_err(|e| anyhow::anyhow!("ConfigSubscribe decode error: {e}"))?; frame .validate_frame() @@ -3589,7 +3589,7 @@ impl Node { let local_owner_id = match self.local_verified_owner_id().await { Some(id) => id, None => { - let error_snapshot = crate::proto::node::ConfigSnapshotResponse { + let error_snapshot = crate::proto::config::ConfigSnapshotResponse { gen: NODE_PROTOCOL_GENERATION, node_id: vec![], revision: 0, @@ -3608,7 +3608,7 @@ impl Node { "config subscribe from {}: subscriber_id does not match connection identity", remote.fmt_short() ); - let error_snapshot = crate::proto::node::ConfigSnapshotResponse { + let error_snapshot = crate::proto::config::ConfigSnapshotResponse { gen: NODE_PROTOCOL_GENERATION, node_id: vec![], revision: 0, @@ -3624,7 +3624,7 @@ impl Node { let (subscriber_owner_id, _) = match self.peer_verified_owner(remote).await { Some(owner) => owner, None => { - let error_snapshot = crate::proto::node::ConfigSnapshotResponse { + let error_snapshot = crate::proto::config::ConfigSnapshotResponse { gen: NODE_PROTOCOL_GENERATION, node_id: vec![], revision: 0, @@ -3645,7 +3645,7 @@ impl Node { local_owner_id, subscriber_owner_id ); - let error_snapshot = crate::proto::node::ConfigSnapshotResponse { + let error_snapshot = crate::proto::config::ConfigSnapshotResponse { gen: NODE_PROTOCOL_GENERATION, node_id: vec![], revision: 0, @@ -3671,7 +3671,7 @@ impl Node { if config_uses_pinned_gpu(state.config()) && !peer_supports_pinned_gpu_config(subscriber_version.as_deref()) { - let error_snapshot = crate::proto::node::ConfigSnapshotResponse { + let error_snapshot = crate::proto::config::ConfigSnapshotResponse { gen: NODE_PROTOCOL_GENERATION, node_id: vec![], revision: 0, @@ -3803,7 +3803,7 @@ impl Node { // 1. Read + decode + validate ConfigPush let buf = read_len_prefixed(&mut recv).await?; - let push = crate::proto::node::ConfigPush::decode(buf.as_slice())?; + let push = crate::proto::config::ConfigPush::decode(buf.as_slice())?; push.validate_frame() .map_err(|e| anyhow::anyhow!("invalid push frame: {e}"))?; @@ -3910,7 +3910,7 @@ impl Node { }; // 7. Build + send response - use crate::proto::node::ConfigApplyMode as ProtoApplyMode; + use crate::proto::config::ConfigApplyMode as ProtoApplyMode; use crate::runtime::config_state::{ApplyResult, ConfigApplyMode}; let response = match result { ApplyResult::Applied { @@ -3921,7 +3921,7 @@ impl Node { if apply_mode == ConfigApplyMode::Staged { let _ = self.config_revision_tx.send(revision); } - crate::proto::node::ConfigPushResponse { + crate::proto::config::ConfigPushResponse { gen: NODE_PROTOCOL_GENERATION, success: true, current_revision: revision, @@ -3934,7 +3934,7 @@ impl Node { } } ApplyResult::RevisionConflict { current_revision } => { - crate::proto::node::ConfigPushResponse { + crate::proto::config::ConfigPushResponse { gen: NODE_PROTOCOL_GENERATION, success: false, current_revision, @@ -3951,7 +3951,7 @@ impl Node { error, } => { let _ = self.config_revision_tx.send(revision); - crate::proto::node::ConfigPushResponse { + crate::proto::config::ConfigPushResponse { gen: NODE_PROTOCOL_GENERATION, success: false, current_revision: revision, @@ -3961,7 +3961,7 @@ impl Node { } } ApplyResult::ValidationError(msg) | ApplyResult::PersistError(msg) => { - crate::proto::node::ConfigPushResponse { + crate::proto::config::ConfigPushResponse { gen: NODE_PROTOCOL_GENERATION, success: false, current_revision, @@ -3986,10 +3986,10 @@ impl Node { &self, conn: &iroh::endpoint::Connection, ) -> anyhow::Result<( - crate::proto::node::ConfigSnapshotResponse, - tokio::sync::watch::Receiver, + crate::proto::config::ConfigSnapshotResponse, + tokio::sync::watch::Receiver, )> { - use crate::proto::node::{ + use crate::proto::config::{ ConfigSnapshotResponse, ConfigSubscribe, ConfigUpdateNotification, }; @@ -4148,7 +4148,7 @@ impl Node { let buf = read_len_prefixed(&mut recv).await?; let frame = match protocol { - ControlProtocol::ProtoV1 => crate::proto::node::TunnelMap::decode(buf.as_slice()) + ControlProtocol::ProtoV1 => crate::proto::mesh::TunnelMap::decode(buf.as_slice()) .map_err(|e| anyhow::anyhow!("TunnelMap decode error: {e}"))?, ControlProtocol::JsonV0 => { let mut frame = decode_legacy_tunnel_map_frame(&buf)?; @@ -4177,7 +4177,7 @@ impl Node { } } -pub(crate) fn config_push_signature_payload(push: &crate::proto::node::ConfigPush) -> Vec { +pub(crate) fn config_push_signature_payload(push: &crate::proto::config::ConfigPush) -> Vec { use prost::Message as _; let mut unsigned = push.clone(); unsigned.signature.clear(); @@ -4187,13 +4187,13 @@ pub(crate) fn config_push_signature_payload(push: &crate::proto::node::ConfigPus async fn send_push_error(send: &mut iroh::endpoint::SendStream, msg: &str) -> anyhow::Result<()> { use crate::protocol::write_len_prefixed; use prost::Message as _; - let response = crate::proto::node::ConfigPushResponse { + let response = crate::proto::config::ConfigPushResponse { gen: NODE_PROTOCOL_GENERATION, success: false, current_revision: 0, config_hash: vec![], error: Some(msg.to_string()), - apply_mode: crate::proto::node::ConfigApplyMode::Unspecified as i32, + apply_mode: crate::proto::config::ConfigApplyMode::Unspecified as i32, }; write_len_prefixed(send, &response.encode_to_vec()).await?; Ok(()) diff --git a/mesh-llm/src/mesh/tests.rs b/mesh-llm/src/mesh/tests.rs index 7a62aaf1..dded7a5d 100644 --- a/mesh-llm/src/mesh/tests.rs +++ b/mesh-llm/src/mesh/tests.rs @@ -4,7 +4,7 @@ use super::heartbeat::{ RELAY_ONLY_RECONNECT_SECS, RELAY_RECONNECT_COOLDOWN_SECS, }; use super::*; -use crate::proto::node::{GossipFrame, NodeRole, PeerAnnouncement, RouteTableRequest}; +use crate::proto::mesh::{GossipFrame, NodeRole, PeerAnnouncement, RouteTableRequest}; use std::collections::HashSet; use tokio::sync::watch; @@ -1614,7 +1614,7 @@ fn control_frame_rejects_oversize_or_bad_generation() { #[test] fn gossip_frame_roundtrip_preserves_scanned_model_metadata() { - use crate::proto::node::{CompactModelMetadata, ExpertsSummary}; + use crate::proto::mesh::{CompactModelMetadata, ExpertsSummary}; let peer_id = EndpointId::from(SecretKey::from_bytes(&[0x01; 32]).public()); let peer_id_bytes = peer_id.as_bytes().to_vec(); @@ -1855,7 +1855,7 @@ fn gossip_rejects_sender_id_mismatch_or_invalid_endpoint_len() { #[test] fn transitive_peer_update_refreshes_metadata_fields() { - use crate::proto::node::CompactModelMetadata; + use crate::proto::mesh::CompactModelMetadata; let peer_id = EndpointId::from(SecretKey::from_bytes(&[0x10; 32]).public()); let mut existing = make_test_peer_info(peer_id); @@ -2051,7 +2051,7 @@ fn transitive_peer_merge_preserves_richer_direct_address() { #[test] fn tunnel_map_roundtrip_updates_remote_map() { - use crate::proto::node::{TunnelEntry, TunnelMap}; + use crate::proto::mesh::{TunnelEntry, TunnelMap}; let owner_key = SecretKey::from_bytes(&[0x10; 32]); let owner_id = EndpointId::from(owner_key.public()); @@ -2096,7 +2096,7 @@ fn tunnel_map_roundtrip_updates_remote_map() { #[test] fn tunnel_map_rejects_owner_mismatch_or_bad_target_id() { - use crate::proto::node::{TunnelEntry, TunnelMap}; + use crate::proto::mesh::{TunnelEntry, TunnelMap}; let owner_key = SecretKey::from_bytes(&[0x30; 32]); let owner_id = EndpointId::from(owner_key.public()); @@ -2178,7 +2178,7 @@ fn tunnel_map_rejects_owner_mismatch_or_bad_target_id() { #[test] fn route_table_request_roundtrip() { - use crate::proto::node::{RouteEntry as ProtoRouteEntry, RouteTable}; + use crate::proto::mesh::{RouteEntry as ProtoRouteEntry, RouteTable}; let peer_key = SecretKey::from_bytes(&[0x60; 32]); let peer_id = EndpointId::from(peer_key.public()); @@ -2234,7 +2234,7 @@ fn route_table_request_roundtrip() { /// Verifies that remote passive inventory metadata is ignored on ingest. #[test] fn proto_v1_route_table_rejects_bad_generation_or_legacy_payload() { - use crate::proto::node::RouteTable; + use crate::proto::mesh::RouteTable; let zero_gen_req = RouteTableRequest { requester_id: vec![0u8; 32], @@ -2305,7 +2305,7 @@ fn proto_v1_route_table_rejects_bad_generation_or_legacy_payload() { #[test] fn peer_lifecycle_messages_roundtrip() { - use crate::proto::node::{PeerDown, PeerLeaving}; + use crate::proto::mesh::{PeerDown, PeerLeaving}; let leaving_id = EndpointId::from(SecretKey::from_bytes(&[0x55; 32]).public()); @@ -2379,7 +2379,7 @@ fn peer_lifecycle_messages_roundtrip() { #[test] fn peer_lifecycle_rejects_forged_sender_or_unverified_down() { - use crate::proto::node::{PeerDown, PeerLeaving}; + use crate::proto::mesh::{PeerDown, PeerLeaving}; let valid_peer_bytes = EndpointId::from(SecretKey::from_bytes(&[0x77; 32]).public()) .as_bytes() @@ -2669,7 +2669,7 @@ fn direct_peer_survives_with_stale_last_mentioned() { /// gen=0 / wrong-gen frames. Legacy JSON/raw compatibility is only carried on `/0`. #[test] fn proto_v1_control_frames_reject_legacy_json_and_wrong_gen() { - use crate::proto::node::{PeerDown, PeerLeaving}; + use crate::proto::mesh::{PeerDown, PeerLeaving}; // JSON bytes that look plausible for the old wire format on each stream let json_gossip = b"[{\"addr\":{\"id\":\"aabbcc\",\"addrs\":[]}}]"; @@ -2780,7 +2780,7 @@ fn proto_v1_control_frames_reject_legacy_json_and_wrong_gen() { /// this is the unit-level proof of what `/api/status` exposes for remote `model_scans`. #[test] fn remote_model_scans_are_ignored_after_gossip() { - use crate::proto::node::{CompactModelMetadata, GossipFrame, PeerAnnouncement as ProtoPA}; + use crate::proto::mesh::{CompactModelMetadata, GossipFrame, PeerAnnouncement as ProtoPA}; let peer_key = SecretKey::from_bytes(&[0xC0; 32]); let peer_id = EndpointId::from(peer_key.public()); @@ -2868,7 +2868,7 @@ fn remote_model_scans_are_ignored_after_gossip() { /// correctly from protobuf RouteTable entries, and that mesh_id propagates through. #[test] fn passive_client_route_table_models_and_mesh_id_populated() { - use crate::proto::node::{RouteEntry as ProtoRouteEntry, RouteTable}; + use crate::proto::mesh::{RouteEntry as ProtoRouteEntry, RouteTable}; let host_key = SecretKey::from_bytes(&[0xD0; 32]); let host_id = EndpointId::from(host_key.public()); @@ -2994,7 +2994,7 @@ fn worker_only_legacy_models_are_excluded_from_http_routes() { /// no longer in peers set). #[test] fn dead_peer_cleanup_prevents_readmission() { - use crate::proto::node::PeerLeaving; + use crate::proto::mesh::PeerLeaving; let peer_key = SecretKey::from_bytes(&[0xE0; 32]); let peer_id = EndpointId::from(peer_key.public()); @@ -4053,7 +4053,7 @@ async fn test_connect_to_peer_skips_known_peer_without_connection() -> Result<() #[test] fn config_sync_subscribe_snapshot_encode_decode() { - use crate::proto::node::{ConfigSnapshotResponse, NodeConfigSnapshot, NodeGpuConfig}; + use crate::proto::config::{ConfigSnapshotResponse, NodeConfigSnapshot, NodeGpuConfig}; let snapshot = ConfigSnapshotResponse { gen: NODE_PROTOCOL_GENERATION, @@ -4063,7 +4063,7 @@ fn config_sync_subscribe_snapshot_encode_decode() { config: Some(NodeConfigSnapshot { version: 1, gpu: Some(NodeGpuConfig { - assignment: crate::proto::node::GpuAssignment::Auto as i32, + assignment: crate::proto::config::GpuAssignment::Auto as i32, }), models: vec![], plugins: vec![], @@ -4086,7 +4086,7 @@ fn config_sync_subscribe_snapshot_encode_decode() { let gpu = cfg.gpu.expect("gpu must be present"); assert_eq!( gpu.assignment, - crate::proto::node::GpuAssignment::Auto as i32 + crate::proto::config::GpuAssignment::Auto as i32 ); } @@ -4107,7 +4107,7 @@ fn test_signing_key() -> (ed25519_dalek::SigningKey, String) { #[test] fn config_sync_push_signature_payload_deterministic() { - use crate::proto::node::{ConfigPush, NodeConfigSnapshot}; + use crate::proto::config::{ConfigPush, NodeConfigSnapshot}; let push = ConfigPush { gen: NODE_PROTOCOL_GENERATION, @@ -4150,7 +4150,7 @@ fn config_sync_push_bad_signature_bytes_length() { #[test] fn config_sync_push_roundtrip_encode_decode() { - use crate::proto::node::{ConfigApplyMode, ConfigPushResponse}; + use crate::proto::config::{ConfigApplyMode, ConfigPushResponse}; use prost::Message as _; let response = ConfigPushResponse { @@ -4176,7 +4176,7 @@ fn config_sync_push_roundtrip_encode_decode() { #[test] fn config_sync_sign_and_verify_roundtrip() { - use crate::proto::node::{ConfigPush, NodeConfigSnapshot}; + use crate::proto::config::{ConfigPush, NodeConfigSnapshot}; use ed25519_dalek::Signer as _; let (signing_key, owner_id) = test_signing_key(); @@ -4217,7 +4217,7 @@ fn config_sync_sign_and_verify_roundtrip() { #[test] fn config_sync_signature_payload_excludes_signature_field() { - use crate::proto::node::{ConfigPush, NodeConfigSnapshot}; + use crate::proto::config::{ConfigPush, NodeConfigSnapshot}; let mut push = ConfigPush { gen: NODE_PROTOCOL_GENERATION, @@ -4380,13 +4380,13 @@ fn build_signed_config_push( requester_id: &EndpointId, target_node_id: &EndpointId, expected_revision: u64, - config: crate::proto::node::NodeConfigSnapshot, -) -> crate::proto::node::ConfigPush { + config: crate::proto::config::NodeConfigSnapshot, +) -> crate::proto::config::ConfigPush { use ed25519_dalek::Signer as _; let vk = owner_keypair.signing.verifying_key(); - let mut push = crate::proto::node::ConfigPush { + let mut push = crate::proto::config::ConfigPush { gen: NODE_PROTOCOL_GENERATION, requester_id: requester_id.as_bytes().to_vec(), target_node_id: target_node_id.as_bytes().to_vec(), @@ -5240,7 +5240,7 @@ async fn config_subscribe_keeps_stream_open_when_revision_becomes_pinned_for_sam #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn config_push_valid_signature_accepted() -> Result<()> { - use crate::proto::node::{NodeConfigSnapshot, NodeGpuConfig}; + use crate::proto::config::{NodeConfigSnapshot, NodeGpuConfig}; use crate::protocol::write_len_prefixed; use prost::Message as _; @@ -5287,7 +5287,7 @@ async fn config_push_valid_signature_accepted() -> Result<()> { let new_config = NodeConfigSnapshot { version: 1, gpu: Some(NodeGpuConfig { - assignment: crate::proto::node::GpuAssignment::Auto as i32, + assignment: crate::proto::config::GpuAssignment::Auto as i32, }), models: vec![], plugins: vec![], @@ -5301,7 +5301,7 @@ async fn config_push_valid_signature_accepted() -> Result<()> { send.finish()?; let buf = crate::protocol::read_len_prefixed(&mut recv).await?; - let response = crate::proto::node::ConfigPushResponse::decode(buf.as_slice())?; + let response = crate::proto::config::ConfigPushResponse::decode(buf.as_slice())?; assert!( response.success, @@ -5319,7 +5319,7 @@ async fn config_push_valid_signature_accepted() -> Result<()> { ); assert_eq!( response.apply_mode, - crate::proto::node::ConfigApplyMode::Staged as i32, + crate::proto::config::ConfigApplyMode::Staged as i32, "config push should report staged apply mode" ); @@ -5329,7 +5329,7 @@ async fn config_push_valid_signature_accepted() -> Result<()> { #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn config_push_revision_conflict_rejected() -> Result<()> { - use crate::proto::node::{NodeConfigSnapshot, NodeGpuConfig}; + use crate::proto::config::{NodeConfigSnapshot, NodeGpuConfig}; use crate::protocol::write_len_prefixed; use prost::Message as _; @@ -5376,7 +5376,7 @@ async fn config_push_revision_conflict_rejected() -> Result<()> { let good_config = NodeConfigSnapshot { version: 1, gpu: Some(NodeGpuConfig { - assignment: crate::proto::node::GpuAssignment::Auto as i32, + assignment: crate::proto::config::GpuAssignment::Auto as i32, }), models: vec![], plugins: vec![], @@ -5395,7 +5395,7 @@ async fn config_push_revision_conflict_rejected() -> Result<()> { write_len_prefixed(&mut send1, &push1.encode_to_vec()).await?; send1.finish()?; let buf1 = crate::protocol::read_len_prefixed(&mut recv1).await?; - let resp1 = crate::proto::node::ConfigPushResponse::decode(buf1.as_slice())?; + let resp1 = crate::proto::config::ConfigPushResponse::decode(buf1.as_slice())?; assert!(resp1.success, "first push must succeed: {:?}", resp1.error); // Second push with stale expected_revision=0 — must be rejected @@ -5405,7 +5405,7 @@ async fn config_push_revision_conflict_rejected() -> Result<()> { write_len_prefixed(&mut send2, &push2.encode_to_vec()).await?; send2.finish()?; let buf2 = crate::protocol::read_len_prefixed(&mut recv2).await?; - let resp2 = crate::proto::node::ConfigPushResponse::decode(buf2.as_slice())?; + let resp2 = crate::proto::config::ConfigPushResponse::decode(buf2.as_slice())?; assert!(!resp2.success, "push with stale revision must be rejected"); assert_eq!( @@ -5424,7 +5424,7 @@ async fn config_push_revision_conflict_rejected() -> Result<()> { #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn config_push_bad_signature_rejected() -> Result<()> { - use crate::proto::node::{NodeConfigSnapshot, NodeGpuConfig}; + use crate::proto::config::{NodeConfigSnapshot, NodeGpuConfig}; use crate::protocol::write_len_prefixed; use prost::Message as _; @@ -5471,7 +5471,7 @@ async fn config_push_bad_signature_rejected() -> Result<()> { let config = NodeConfigSnapshot { version: 1, gpu: Some(NodeGpuConfig { - assignment: crate::proto::node::GpuAssignment::Auto as i32, + assignment: crate::proto::config::GpuAssignment::Auto as i32, }), models: vec![], plugins: vec![], @@ -5487,7 +5487,7 @@ async fn config_push_bad_signature_rejected() -> Result<()> { send.finish()?; let buf = crate::protocol::read_len_prefixed(&mut recv).await?; - let response = crate::proto::node::ConfigPushResponse::decode(buf.as_slice())?; + let response = crate::proto::config::ConfigPushResponse::decode(buf.as_slice())?; assert!( !response.success, @@ -5505,7 +5505,7 @@ async fn config_push_bad_signature_rejected() -> Result<()> { #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn config_subscribe_delivers_update_notification_after_push() -> Result<()> { - use crate::proto::node::{NodeConfigSnapshot, NodeGpuConfig}; + use crate::proto::config::{NodeConfigSnapshot, NodeGpuConfig}; use crate::protocol::write_len_prefixed; use prost::Message as _; @@ -5557,9 +5557,9 @@ async fn config_subscribe_delivers_update_notification_after_push() -> Result<() let new_config = NodeConfigSnapshot { version: 1, gpu: Some(NodeGpuConfig { - assignment: crate::proto::node::GpuAssignment::Auto as i32, + assignment: crate::proto::config::GpuAssignment::Auto as i32, }), - models: vec![crate::proto::node::NodeModelEntry { + models: vec![crate::proto::config::NodeModelEntry { model: "test-model.gguf".to_string(), mmproj: None, ctx_size: None, @@ -5581,7 +5581,7 @@ async fn config_subscribe_delivers_update_notification_after_push() -> Result<() write_len_prefixed(&mut send, &push.encode_to_vec()).await?; send.finish()?; let buf = crate::protocol::read_len_prefixed(&mut recv).await?; - let push_resp = crate::proto::node::ConfigPushResponse::decode(buf.as_slice())?; + let push_resp = crate::proto::config::ConfigPushResponse::decode(buf.as_slice())?; assert!( push_resp.success, "push must be accepted for notification test: {:?}", diff --git a/mesh-llm/src/models/inventory.rs b/mesh-llm/src/models/inventory.rs index 822cab89..a2b6d461 100644 --- a/mesh-llm/src/models/inventory.rs +++ b/mesh-llm/src/models/inventory.rs @@ -11,7 +11,7 @@ use super::local::{ pub struct LocalModelInventorySnapshot { pub model_names: HashSet, pub size_by_name: HashMap, - pub metadata_by_name: HashMap, + pub metadata_by_name: HashMap, } #[derive(Clone, Copy, Debug, Default, Serialize)] @@ -52,8 +52,8 @@ struct InventoryScanEntry { } impl CachedCompactModelMetadata { - fn into_proto(self) -> crate::proto::node::CompactModelMetadata { - crate::proto::node::CompactModelMetadata { + fn into_proto(self) -> crate::proto::mesh::CompactModelMetadata { + crate::proto::mesh::CompactModelMetadata { model_key: self.model_key, context_length: self.context_length, vocab_size: self.vocab_size, @@ -75,7 +75,7 @@ impl CachedCompactModelMetadata { } } - fn from_proto(meta: &crate::proto::node::CompactModelMetadata) -> Self { + fn from_proto(meta: &crate::proto::mesh::CompactModelMetadata) -> Self { Self { model_key: meta.model_key.clone(), context_length: meta.context_length, @@ -181,9 +181,9 @@ fn compact_metadata_from_gguf( path: &Path, model_key: String, quantization_type: String, -) -> crate::proto::node::CompactModelMetadata { +) -> crate::proto::mesh::CompactModelMetadata { if let Some(m) = crate::models::gguf::scan_gguf_compact_meta(path) { - crate::proto::node::CompactModelMetadata { + crate::proto::mesh::CompactModelMetadata { model_key: model_key.clone(), context_length: m.context_length, vocab_size: m.vocab_size, @@ -204,7 +204,7 @@ fn compact_metadata_from_gguf( quantization_type, } } else { - crate::proto::node::CompactModelMetadata { + crate::proto::mesh::CompactModelMetadata { model_key, quantization_type, ..Default::default() @@ -216,7 +216,7 @@ fn cached_compact_metadata_for_path( path: &Path, model_key: String, quantization_type: String, -) -> crate::proto::node::CompactModelMetadata { +) -> crate::proto::mesh::CompactModelMetadata { let computed = || compact_metadata_from_gguf(path, model_key.clone(), quantization_type.clone()); let Some(cache_path) = gguf_metadata_cache_path(path) else { diff --git a/mesh-llm/src/protocol/convert.rs b/mesh-llm/src/protocol/convert.rs index 1fe036eb..7bedd0ed 100644 --- a/mesh-llm/src/protocol/convert.rs +++ b/mesh-llm/src/protocol/convert.rs @@ -45,7 +45,7 @@ fn join_optional_csv(values: &[Option]) -> Option { fn local_owner_attestation_to_proto( attestation: &crate::crypto::SignedNodeOwnership, -) -> Option { +) -> Option { let owner_sign_public_key = match hex::decode(&attestation.claim.owner_sign_public_key) { Ok(bytes) => bytes, Err(err) => { @@ -73,7 +73,7 @@ fn local_owner_attestation_to_proto( return None; } }; - Some(crate::proto::node::SignedNodeOwnership { + Some(crate::proto::mesh::SignedNodeOwnership { version: attestation.claim.version, cert_id: attestation.claim.cert_id.clone(), owner_id: attestation.claim.owner_id.clone(), @@ -88,7 +88,7 @@ fn local_owner_attestation_to_proto( } fn proto_owner_attestation_to_local( - attestation: &crate::proto::node::SignedNodeOwnership, + attestation: &crate::proto::mesh::SignedNodeOwnership, ) -> crate::crypto::SignedNodeOwnership { crate::crypto::SignedNodeOwnership { claim: crate::crypto::NodeOwnershipClaim { @@ -109,65 +109,65 @@ fn proto_owner_attestation_to_local( fn local_source_kind_to_proto(kind: crate::mesh::ModelSourceKind) -> i32 { match kind { crate::mesh::ModelSourceKind::Catalog => { - crate::proto::node::ModelSourceKind::Catalog as i32 + crate::proto::mesh::ModelSourceKind::Catalog as i32 } crate::mesh::ModelSourceKind::HuggingFace => { - crate::proto::node::ModelSourceKind::HuggingFace as i32 + crate::proto::mesh::ModelSourceKind::HuggingFace as i32 } crate::mesh::ModelSourceKind::LocalGguf => { - crate::proto::node::ModelSourceKind::LocalGguf as i32 + crate::proto::mesh::ModelSourceKind::LocalGguf as i32 } crate::mesh::ModelSourceKind::DirectUrl => { - crate::proto::node::ModelSourceKind::DirectUrl as i32 + crate::proto::mesh::ModelSourceKind::DirectUrl as i32 } crate::mesh::ModelSourceKind::Unknown => { - crate::proto::node::ModelSourceKind::Unknown as i32 + crate::proto::mesh::ModelSourceKind::Unknown as i32 } } } fn proto_source_kind_to_local(kind: i32) -> crate::mesh::ModelSourceKind { - match crate::proto::node::ModelSourceKind::try_from(kind) - .unwrap_or(crate::proto::node::ModelSourceKind::Unknown) + match crate::proto::mesh::ModelSourceKind::try_from(kind) + .unwrap_or(crate::proto::mesh::ModelSourceKind::Unknown) { - crate::proto::node::ModelSourceKind::Catalog => crate::mesh::ModelSourceKind::Catalog, - crate::proto::node::ModelSourceKind::HuggingFace => { + crate::proto::mesh::ModelSourceKind::Catalog => crate::mesh::ModelSourceKind::Catalog, + crate::proto::mesh::ModelSourceKind::HuggingFace => { crate::mesh::ModelSourceKind::HuggingFace } - crate::proto::node::ModelSourceKind::LocalGguf => crate::mesh::ModelSourceKind::LocalGguf, - crate::proto::node::ModelSourceKind::DirectUrl => crate::mesh::ModelSourceKind::DirectUrl, - crate::proto::node::ModelSourceKind::Unknown - | crate::proto::node::ModelSourceKind::Unspecified => crate::mesh::ModelSourceKind::Unknown, + crate::proto::mesh::ModelSourceKind::LocalGguf => crate::mesh::ModelSourceKind::LocalGguf, + crate::proto::mesh::ModelSourceKind::DirectUrl => crate::mesh::ModelSourceKind::DirectUrl, + crate::proto::mesh::ModelSourceKind::Unknown + | crate::proto::mesh::ModelSourceKind::Unspecified => crate::mesh::ModelSourceKind::Unknown, } } fn local_capability_level_to_proto(level: crate::models::CapabilityLevel) -> i32 { match level { - crate::models::CapabilityLevel::None => crate::proto::node::CapabilityLevel::None as i32, + crate::models::CapabilityLevel::None => crate::proto::mesh::CapabilityLevel::None as i32, crate::models::CapabilityLevel::Likely => { - crate::proto::node::CapabilityLevel::Likely as i32 + crate::proto::mesh::CapabilityLevel::Likely as i32 } crate::models::CapabilityLevel::Supported => { - crate::proto::node::CapabilityLevel::Supported as i32 + crate::proto::mesh::CapabilityLevel::Supported as i32 } } } fn proto_capability_level_to_local(level: i32) -> crate::models::CapabilityLevel { - match crate::proto::node::CapabilityLevel::try_from(level) - .unwrap_or(crate::proto::node::CapabilityLevel::None) + match crate::proto::mesh::CapabilityLevel::try_from(level) + .unwrap_or(crate::proto::mesh::CapabilityLevel::None) { - crate::proto::node::CapabilityLevel::Likely => crate::models::CapabilityLevel::Likely, - crate::proto::node::CapabilityLevel::Supported => crate::models::CapabilityLevel::Supported, - crate::proto::node::CapabilityLevel::None - | crate::proto::node::CapabilityLevel::Unspecified => crate::models::CapabilityLevel::None, + crate::proto::mesh::CapabilityLevel::Likely => crate::models::CapabilityLevel::Likely, + crate::proto::mesh::CapabilityLevel::Supported => crate::models::CapabilityLevel::Supported, + crate::proto::mesh::CapabilityLevel::None + | crate::proto::mesh::CapabilityLevel::Unspecified => crate::models::CapabilityLevel::None, } } fn descriptor_identity_to_proto( identity: &crate::mesh::ServedModelIdentity, -) -> crate::proto::node::ServedModelIdentity { - crate::proto::node::ServedModelIdentity { +) -> crate::proto::mesh::ServedModelIdentity { + crate::proto::mesh::ServedModelIdentity { model_name: identity.model_name.clone(), is_primary: identity.is_primary, source_kind: local_source_kind_to_proto(identity.source_kind), @@ -181,7 +181,7 @@ fn descriptor_identity_to_proto( } fn proto_identity_to_local( - identity: &crate::proto::node::ServedModelIdentity, + identity: &crate::proto::mesh::ServedModelIdentity, ) -> crate::mesh::ServedModelIdentity { crate::mesh::ServedModelIdentity { model_name: identity.model_name.clone(), @@ -197,7 +197,7 @@ fn proto_identity_to_local( } fn legacy_descriptor_from_identity( - identity: &crate::proto::node::ServedModelIdentity, + identity: &crate::proto::mesh::ServedModelIdentity, ) -> crate::mesh::ServedModelDescriptor { crate::mesh::ServedModelDescriptor { identity: proto_identity_to_local(identity), @@ -208,8 +208,8 @@ fn legacy_descriptor_from_identity( fn runtime_descriptor_to_proto( descriptor: &crate::mesh::ModelRuntimeDescriptor, -) -> crate::proto::node::ModelRuntimeDescriptor { - crate::proto::node::ModelRuntimeDescriptor { +) -> crate::proto::mesh::ModelRuntimeDescriptor { + crate::proto::mesh::ModelRuntimeDescriptor { model_name: descriptor.model_name.clone(), identity_hash: descriptor.identity_hash.clone(), context_length: descriptor.context_length, @@ -218,7 +218,7 @@ fn runtime_descriptor_to_proto( } fn proto_runtime_descriptor_to_local( - descriptor: &crate::proto::node::ModelRuntimeDescriptor, + descriptor: &crate::proto::mesh::ModelRuntimeDescriptor, ) -> crate::mesh::ModelRuntimeDescriptor { crate::mesh::ModelRuntimeDescriptor { model_name: descriptor.model_name.clone(), @@ -228,7 +228,7 @@ fn proto_runtime_descriptor_to_local( } } -fn local_gpu_info_to_proto(ann: &PeerAnnouncement) -> Vec { +fn local_gpu_info_to_proto(ann: &PeerAnnouncement) -> Vec { let legacy_field_count = [ split_optional_csv(ann.gpu_vram.as_deref()).len(), split_optional_csv(ann.gpu_reserved_bytes.as_deref()).len(), @@ -260,7 +260,7 @@ fn local_gpu_info_to_proto(ann: &PeerAnnouncement) -> Vec Vec Option { +) -> Option { let gpus = local_gpu_info_to_proto(ann); if ann.hostname.is_none() && ann.is_soc.is_none() && gpus.is_empty() { None } else { - Some(crate::proto::node::HardwareInfo { + Some(crate::proto::mesh::HardwareInfo { is_soc: ann.is_soc, hostname: ann.hostname.clone(), gpus, @@ -287,7 +287,7 @@ fn local_hardware_info_to_proto( } fn proto_gpu_info_to_legacy_fields( - gpus: &[crate::proto::node::GpuInfo], + gpus: &[crate::proto::mesh::GpuInfo], ) -> ( Option, Option, @@ -343,7 +343,7 @@ fn proto_gpu_info_to_legacy_fields( /// Descriptors without a valid identity are discarded so a partial list /// cannot suppress the legacy-identity backfill fallback. fn proto_descriptor_has_valid_identity( - descriptor: &crate::proto::node::ServedModelDescriptor, + descriptor: &crate::proto::mesh::ServedModelDescriptor, ) -> bool { descriptor .identity @@ -362,36 +362,36 @@ pub(crate) fn sanitize_gossip_announcement_for_wire(ann: &PeerAnnouncement) -> P pub(crate) fn local_role_to_proto(role: &NodeRole) -> (i32, Option) { match role { - NodeRole::Worker => (crate::proto::node::NodeRole::Worker as i32, None), + NodeRole::Worker => (crate::proto::mesh::NodeRole::Worker as i32, None), NodeRole::Host { http_port } => ( - crate::proto::node::NodeRole::Host as i32, + crate::proto::mesh::NodeRole::Host as i32, Some(*http_port as u32), ), - NodeRole::Client => (crate::proto::node::NodeRole::Client as i32, None), + NodeRole::Client => (crate::proto::mesh::NodeRole::Client as i32, None), } } pub(crate) fn proto_role_to_local(role_int: i32, http_port: Option) -> NodeRole { - match crate::proto::node::NodeRole::try_from(role_int).unwrap_or_default() { - crate::proto::node::NodeRole::Host => NodeRole::Host { + match crate::proto::mesh::NodeRole::try_from(role_int).unwrap_or_default() { + crate::proto::mesh::NodeRole::Host => NodeRole::Host { http_port: http_port.unwrap_or(0) as u16, }, - crate::proto::node::NodeRole::Client => NodeRole::Client, + crate::proto::mesh::NodeRole::Client => NodeRole::Client, _ => NodeRole::Worker, } } pub(crate) fn local_ann_to_proto_ann( ann: &PeerAnnouncement, -) -> crate::proto::node::PeerAnnouncement { +) -> crate::proto::mesh::PeerAnnouncement { let ann = sanitize_gossip_announcement_for_wire(ann); let (role_int, http_port) = local_role_to_proto(&ann.role); let serialized_addr = serde_json::to_vec(&ann.addr).unwrap_or_default(); - let demand: Vec = ann + let demand: Vec = ann .model_demand .iter() .map( - |(name, d): (&String, &ModelDemand)| crate::proto::node::ModelDemandEntry { + |(name, d): (&String, &ModelDemand)| crate::proto::mesh::ModelDemandEntry { model_name: name.clone(), last_active: d.last_active, request_count: d.request_count, @@ -406,9 +406,9 @@ pub(crate) fn local_ann_to_proto_ann( let served_model_descriptors = ann .served_model_descriptors .iter() - .map(|descriptor| crate::proto::node::ServedModelDescriptor { + .map(|descriptor| crate::proto::mesh::ServedModelDescriptor { identity: Some(descriptor_identity_to_proto(&descriptor.identity)), - capabilities: Some(crate::proto::node::ModelCapabilities { + capabilities: Some(crate::proto::mesh::ModelCapabilities { vision: local_capability_level_to_proto(descriptor.capabilities.vision), reasoning: local_capability_level_to_proto(descriptor.capabilities.reasoning), tool_use: local_capability_level_to_proto(descriptor.capabilities.tool_use), @@ -417,11 +417,11 @@ pub(crate) fn local_ann_to_proto_ann( audio: local_capability_level_to_proto(descriptor.capabilities.audio), }), topology: descriptor.topology.as_ref().map(|topology| { - crate::proto::node::ModelTopology { + crate::proto::mesh::ModelTopology { moe: topology .moe .as_ref() - .map(|moe| crate::proto::node::ModelMoeInfo { + .map(|moe| crate::proto::mesh::ModelMoeInfo { expert_count: moe.expert_count, used_expert_count: moe.used_expert_count, min_experts_per_node: moe.min_experts_per_node, @@ -443,7 +443,7 @@ pub(crate) fn local_ann_to_proto_ann( .map(runtime_descriptor_to_proto) .collect(); let hardware = local_hardware_info_to_proto(&ann); - crate::proto::node::PeerAnnouncement { + crate::proto::mesh::PeerAnnouncement { endpoint_id: ann.addr.id.as_bytes().to_vec(), role: role_int, http_port, @@ -489,10 +489,10 @@ pub(crate) fn local_ann_to_proto_ann( pub(crate) fn build_gossip_frame( anns: &[PeerAnnouncement], sender_id: EndpointId, -) -> crate::proto::node::GossipFrame { - let peers: Vec = +) -> crate::proto::mesh::GossipFrame { + let peers: Vec = anns.iter().map(local_ann_to_proto_ann).collect(); - crate::proto::node::GossipFrame { + crate::proto::mesh::GossipFrame { gen: NODE_PROTOCOL_GENERATION, sender_id: sender_id.as_bytes().to_vec(), peers, @@ -500,7 +500,7 @@ pub(crate) fn build_gossip_frame( } pub(crate) fn proto_ann_to_local( - pa: &crate::proto::node::PeerAnnouncement, + pa: &crate::proto::mesh::PeerAnnouncement, ) -> Option<(EndpointAddr, PeerAnnouncement)> { let id_arr: [u8; 32] = pa.endpoint_id.as_slice().try_into().ok()?; let pk = iroh::PublicKey::from_bytes(&id_arr).ok()?; @@ -647,16 +647,16 @@ pub(crate) fn proto_ann_to_local( Some((addr, ann)) } -pub(crate) fn routing_table_to_proto(table: &RoutingTable) -> crate::proto::node::RouteTable { +pub(crate) fn routing_table_to_proto(table: &RoutingTable) -> crate::proto::mesh::RouteTable { let entries = table .hosts .iter() - .map(|e| crate::proto::node::RouteEntry { + .map(|e| crate::proto::mesh::RouteEntry { endpoint_id: e.endpoint_id.as_bytes().to_vec(), model: e.model.clone(), }) .collect(); - crate::proto::node::RouteTable { + crate::proto::mesh::RouteTable { entries, mesh_id: table.mesh_id.clone(), gen: NODE_PROTOCOL_GENERATION, @@ -665,10 +665,10 @@ pub(crate) fn routing_table_to_proto(table: &RoutingTable) -> crate::proto::node pub(crate) fn mesh_config_to_proto( config: &crate::plugin::MeshConfig, -) -> crate::proto::node::NodeConfigSnapshot { +) -> crate::proto::config::NodeConfigSnapshot { use crate::plugin::GpuAssignment; - fn configured_model_ref(declared_ref: &str) -> crate::proto::node::ConfiguredModelRef { - crate::proto::node::ConfiguredModelRef { + fn configured_model_ref(declared_ref: &str) -> crate::proto::config::ConfiguredModelRef { + crate::proto::config::ConfiguredModelRef { declared_ref: declared_ref.to_string(), source_kind: None, revision: None, @@ -676,13 +676,13 @@ pub(crate) fn mesh_config_to_proto( } let assignment = match config.gpu.assignment { - GpuAssignment::Auto => crate::proto::node::GpuAssignment::Auto as i32, - GpuAssignment::Pinned => crate::proto::node::GpuAssignment::Pinned as i32, + GpuAssignment::Auto => crate::proto::config::GpuAssignment::Auto as i32, + GpuAssignment::Pinned => crate::proto::config::GpuAssignment::Pinned as i32, }; let models = config .models .iter() - .map(|m| crate::proto::node::NodeModelEntry { + .map(|m| crate::proto::config::NodeModelEntry { model: m.model.clone(), mmproj: m.mmproj.clone(), ctx_size: m.ctx_size, @@ -694,29 +694,29 @@ pub(crate) fn mesh_config_to_proto( let plugins = config .plugins .iter() - .map(|p| crate::proto::node::NodePluginEntry { + .map(|p| crate::proto::config::NodePluginEntry { name: p.name.clone(), enabled: p.enabled, command: p.command.clone(), args: p.args.clone(), }) .collect(); - crate::proto::node::NodeConfigSnapshot { + crate::proto::config::NodeConfigSnapshot { version: config.version.unwrap_or(1), - gpu: Some(crate::proto::node::NodeGpuConfig { assignment }), + gpu: Some(crate::proto::config::NodeGpuConfig { assignment }), models, plugins, } } pub(crate) fn proto_config_to_mesh( - snapshot: &crate::proto::node::NodeConfigSnapshot, + snapshot: &crate::proto::config::NodeConfigSnapshot, ) -> crate::plugin::MeshConfig { use crate::plugin::{ GpuAssignment, GpuConfig, MeshConfig, ModelConfigEntry, PluginConfigEntry, }; fn declared_ref_or_none( - configured: Option<&crate::proto::node::ConfiguredModelRef>, + configured: Option<&crate::proto::config::ConfiguredModelRef>, ) -> Option { configured.and_then(|configured| { let declared_ref = configured.declared_ref.trim(); @@ -729,7 +729,7 @@ pub(crate) fn proto_config_to_mesh( } let assignment = match snapshot.gpu.as_ref().map(|g| g.assignment) { - Some(v) if v == crate::proto::node::GpuAssignment::Pinned as i32 => GpuAssignment::Pinned, + Some(v) if v == crate::proto::config::GpuAssignment::Pinned as i32 => GpuAssignment::Pinned, _ => GpuAssignment::Auto, }; let models = snapshot @@ -760,7 +760,9 @@ pub(crate) fn proto_config_to_mesh( } } -pub(crate) fn canonical_config_hash(snapshot: &crate::proto::node::NodeConfigSnapshot) -> [u8; 32] { +pub(crate) fn canonical_config_hash( + snapshot: &crate::proto::config::NodeConfigSnapshot, +) -> [u8; 32] { use prost::Message as _; use sha2::{Digest, Sha256}; let bytes = snapshot.encode_to_vec(); @@ -769,7 +771,7 @@ pub(crate) fn canonical_config_hash(snapshot: &crate::proto::node::NodeConfigSna } #[cfg(test)] -pub(crate) fn proto_route_table_to_local(table: &crate::proto::node::RouteTable) -> RoutingTable { +pub(crate) fn proto_route_table_to_local(table: &crate::proto::mesh::RouteTable) -> RoutingTable { let hosts = table .entries .iter() diff --git a/mesh-llm/src/protocol/mod.rs b/mesh-llm/src/protocol/mod.rs index 75ca246a..42332545 100644 --- a/mesh-llm/src/protocol/mod.rs +++ b/mesh-llm/src/protocol/mod.rs @@ -149,7 +149,7 @@ pub(crate) trait ValidateControlFrame: prost::Message + Default + Sized { } } -impl ValidateControlFrame for crate::proto::node::GossipFrame { +impl ValidateControlFrame for crate::proto::mesh::GossipFrame { fn validate_frame(&self) -> Result<(), ControlFrameError> { if self.gen != NODE_PROTOCOL_GENERATION { return Err(ControlFrameError::BadGeneration { got: self.gen }); @@ -166,7 +166,7 @@ impl ValidateControlFrame for crate::proto::node::GossipFrame { } } -impl ValidateControlFrame for crate::proto::node::TunnelMap { +impl ValidateControlFrame for crate::proto::mesh::TunnelMap { fn validate_frame(&self) -> Result<(), ControlFrameError> { if self.owner_peer_id.len() != 32 { return Err(ControlFrameError::InvalidEndpointId { @@ -183,7 +183,7 @@ impl ValidateControlFrame for crate::proto::node::TunnelMap { Ok(()) } } -impl ValidateControlFrame for crate::proto::node::RouteTableRequest { +impl ValidateControlFrame for crate::proto::mesh::RouteTableRequest { fn validate_frame(&self) -> Result<(), ControlFrameError> { if self.gen != NODE_PROTOCOL_GENERATION { return Err(ControlFrameError::BadGeneration { got: self.gen }); @@ -196,7 +196,7 @@ impl ValidateControlFrame for crate::proto::node::RouteTableRequest { Ok(()) } } -impl ValidateControlFrame for crate::proto::node::RouteTable { +impl ValidateControlFrame for crate::proto::mesh::RouteTable { fn validate_frame(&self) -> Result<(), ControlFrameError> { if self.gen != NODE_PROTOCOL_GENERATION { return Err(ControlFrameError::BadGeneration { got: self.gen }); @@ -211,7 +211,7 @@ impl ValidateControlFrame for crate::proto::node::RouteTable { Ok(()) } } -impl ValidateControlFrame for crate::proto::node::PeerDown { +impl ValidateControlFrame for crate::proto::mesh::PeerDown { fn validate_frame(&self) -> Result<(), ControlFrameError> { if self.gen != NODE_PROTOCOL_GENERATION { return Err(ControlFrameError::BadGeneration { got: self.gen }); @@ -224,7 +224,7 @@ impl ValidateControlFrame for crate::proto::node::PeerDown { Ok(()) } } -impl ValidateControlFrame for crate::proto::node::PeerLeaving { +impl ValidateControlFrame for crate::proto::mesh::PeerLeaving { fn validate_frame(&self) -> Result<(), ControlFrameError> { if self.gen != NODE_PROTOCOL_GENERATION { return Err(ControlFrameError::BadGeneration { got: self.gen }); @@ -238,7 +238,7 @@ impl ValidateControlFrame for crate::proto::node::PeerLeaving { } } -impl ValidateControlFrame for crate::proto::node::ConfigSubscribe { +impl ValidateControlFrame for crate::proto::config::ConfigSubscribe { fn validate_frame(&self) -> Result<(), ControlFrameError> { if self.gen != NODE_PROTOCOL_GENERATION { return Err(ControlFrameError::BadGeneration { got: self.gen }); @@ -248,7 +248,7 @@ impl ValidateControlFrame for crate::proto::node::ConfigSubscribe { } } -impl ValidateControlFrame for crate::proto::node::ConfigSnapshotResponse { +impl ValidateControlFrame for crate::proto::config::ConfigSnapshotResponse { fn validate_frame(&self) -> Result<(), ControlFrameError> { if self.gen != NODE_PROTOCOL_GENERATION { return Err(ControlFrameError::BadGeneration { got: self.gen }); @@ -265,7 +265,7 @@ impl ValidateControlFrame for crate::proto::node::ConfigSnapshotResponse { } } -impl ValidateControlFrame for crate::proto::node::ConfigUpdateNotification { +impl ValidateControlFrame for crate::proto::config::ConfigUpdateNotification { fn validate_frame(&self) -> Result<(), ControlFrameError> { if self.gen != NODE_PROTOCOL_GENERATION { return Err(ControlFrameError::BadGeneration { got: self.gen }); @@ -279,7 +279,7 @@ impl ValidateControlFrame for crate::proto::node::ConfigUpdateNotification { } } -impl ValidateControlFrame for crate::proto::node::ConfigPush { +impl ValidateControlFrame for crate::proto::config::ConfigPush { fn validate_frame(&self) -> Result<(), ControlFrameError> { if self.gen != NODE_PROTOCOL_GENERATION { return Err(ControlFrameError::BadGeneration { got: self.gen }); @@ -302,7 +302,7 @@ impl ValidateControlFrame for crate::proto::node::ConfigPush { } } -impl ValidateControlFrame for crate::proto::node::ConfigPushResponse { +impl ValidateControlFrame for crate::proto::config::ConfigPushResponse { fn validate_frame(&self) -> Result<(), ControlFrameError> { if self.gen != NODE_PROTOCOL_GENERATION { return Err(ControlFrameError::BadGeneration { got: self.gen }); @@ -315,14 +315,14 @@ impl ValidateControlFrame for crate::proto::node::ConfigPushResponse { } pub(crate) fn validate_peer_announcement( - pa: &crate::proto::node::PeerAnnouncement, + pa: &crate::proto::mesh::PeerAnnouncement, ) -> Result<(), ControlFrameError> { if pa.endpoint_id.len() != 32 { return Err(ControlFrameError::InvalidEndpointId { got: pa.endpoint_id.len(), }); } - if pa.role == crate::proto::node::NodeRole::Host as i32 && pa.http_port.is_none() { + if pa.role == crate::proto::mesh::NodeRole::Host as i32 && pa.http_port.is_none() { return Err(ControlFrameError::MissingHttpPort); } Ok(()) @@ -420,7 +420,7 @@ pub(crate) fn decode_gossip_payload( ) -> Result> { match protocol { ControlProtocol::ProtoV1 => { - let frame = crate::proto::node::GossipFrame::decode(buf) + let frame = crate::proto::mesh::GossipFrame::decode(buf) .map_err(|e| anyhow::anyhow!("gossip decode from {}: {e}", remote.fmt_short()))?; frame.validate_frame().map_err(|e| { anyhow::anyhow!("invalid gossip frame from {}: {e}", remote.fmt_short()) @@ -506,12 +506,12 @@ mod tests { use super::*; use crate::crypto::OwnershipSummary; use crate::mesh::{resolve_peer_down, resolve_peer_leaving, ModelDemand, PeerInfo}; - use crate::proto::node::{ + use crate::proto::config::{ ConfigPush, ConfigPushResponse, ConfigSnapshotResponse, ConfigSubscribe, - ConfigUpdateNotification, ConfiguredModelRef, GossipFrame, NodeConfigSnapshot, - NodeGpuConfig, NodeModelEntry, NodePluginEntry, NodeRole, PeerAnnouncement, - RouteTableRequest, + ConfigUpdateNotification, ConfiguredModelRef, NodeConfigSnapshot, NodeGpuConfig, + NodeModelEntry, NodePluginEntry, }; + use crate::proto::mesh::{GossipFrame, NodeRole, PeerAnnouncement, RouteTableRequest}; use iroh::{EndpointAddr, EndpointId, SecretKey}; use std::collections::{HashMap, HashSet}; @@ -531,7 +531,7 @@ mod tests { NodeConfigSnapshot { version: 1, gpu: Some(NodeGpuConfig { - assignment: crate::proto::node::GpuAssignment::Auto as i32, + assignment: crate::proto::config::GpuAssignment::Auto as i32, }), models: vec![NodeModelEntry { model: "Qwen3-8B".to_string(), @@ -889,7 +889,7 @@ mod tests { #[test] fn config_push_response_error_shape_passes_validation() { // Error responses from send_push_error have an empty config_hash. - let error_response = crate::proto::node::ConfigPushResponse { + let error_response = crate::proto::config::ConfigPushResponse { gen: NODE_PROTOCOL_GENERATION, success: false, current_revision: 0, @@ -898,7 +898,7 @@ mod tests { apply_mode: 0, }; let encoded = encode_control_frame(STREAM_CONFIG_PUSH, &error_response); - decode_control_frame::( + decode_control_frame::( STREAM_CONFIG_PUSH, &encoded, ) @@ -962,7 +962,7 @@ mod tests { #[test] fn proto_v1_route_table_rejects_bad_generation_or_legacy_payload() { - use crate::proto::node::RouteTable; + use crate::proto::mesh::RouteTable; let zero_gen_req = RouteTableRequest { requester_id: vec![0u8; 32], @@ -1033,7 +1033,7 @@ mod tests { #[test] fn peer_lifecycle_messages_roundtrip() { - use crate::proto::node::{PeerDown, PeerLeaving}; + use crate::proto::mesh::{PeerDown, PeerLeaving}; let leaving_id = EndpointId::from(SecretKey::from_bytes(&[0x55; 32]).public()); @@ -1107,7 +1107,7 @@ mod tests { #[test] fn peer_lifecycle_rejects_forged_sender_or_unverified_down() { - use crate::proto::node::{PeerDown, PeerLeaving}; + use crate::proto::mesh::{PeerDown, PeerLeaving}; let valid_peer_bytes = EndpointId::from(SecretKey::from_bytes(&[0x77; 32]).public()) .as_bytes() @@ -1186,7 +1186,7 @@ mod tests { #[test] fn proto_v1_control_frames_reject_legacy_json_and_wrong_gen() { - use crate::proto::node::{PeerDown, PeerLeaving}; + use crate::proto::mesh::{PeerDown, PeerLeaving}; // JSON bytes that look plausible for the old wire format on each stream let json_gossip = b"[{\"addr\":{\"id\":\"aabbcc\",\"addrs\":[]}}]"; @@ -1537,15 +1537,15 @@ mod tests { #[test] fn test_proto_backward_compat_missing_tflops() { let peer_id = EndpointId::from(SecretKey::from_bytes(&[0xCD; 32]).public()); - let proto_pa = crate::proto::node::PeerAnnouncement { + let proto_pa = crate::proto::mesh::PeerAnnouncement { endpoint_id: peer_id.as_bytes().to_vec(), role: NodeRole::Worker as i32, gpu_name: Some("NVIDIA A100".to_string()), gpu_vram: Some("51539607552".to_string()), - hardware: Some(crate::proto::node::HardwareInfo { + hardware: Some(crate::proto::mesh::HardwareInfo { is_soc: Some(false), hostname: None, - gpus: vec![crate::proto::node::GpuInfo { + gpus: vec![crate::proto::mesh::GpuInfo { name: Some("NVIDIA A100".to_string()), vram_bytes: Some("51539607552".to_string()), reserved_bytes: None, @@ -1571,14 +1571,14 @@ mod tests { #[test] fn test_proto_gpu_info_preserves_legacy_fields_for_old_consumers() { let peer_id = EndpointId::from(SecretKey::from_bytes(&[0xCE; 32]).public()); - let proto_pa = crate::proto::node::PeerAnnouncement { + let proto_pa = crate::proto::mesh::PeerAnnouncement { endpoint_id: peer_id.as_bytes().to_vec(), role: NodeRole::Worker as i32, - hardware: Some(crate::proto::node::HardwareInfo { + hardware: Some(crate::proto::mesh::HardwareInfo { is_soc: Some(false), hostname: Some("worker-01".to_string()), gpus: vec![ - crate::proto::node::GpuInfo { + crate::proto::mesh::GpuInfo { name: Some("NVIDIA A100".to_string()), vram_bytes: Some("51539607552".to_string()), reserved_bytes: Some("1073741824".to_string()), @@ -1586,7 +1586,7 @@ mod tests { compute_tflops_fp32: Some("19.50".to_string()), compute_tflops_fp16: Some("312.00".to_string()), }, - crate::proto::node::GpuInfo { + crate::proto::mesh::GpuInfo { name: Some("NVIDIA A100".to_string()), vram_bytes: Some("51539607552".to_string()), reserved_bytes: None, @@ -1644,7 +1644,7 @@ mod tests { assert_eq!(roundtripped.version, snapshot.version); assert_eq!( roundtripped.gpu.as_ref().map(|g| g.assignment), - Some(crate::proto::node::GpuAssignment::Auto as i32) + Some(crate::proto::config::GpuAssignment::Auto as i32) ); assert_eq!(roundtripped.models.len(), snapshot.models.len()); assert_eq!(roundtripped.models[0].model, snapshot.models[0].model); @@ -1668,7 +1668,7 @@ mod tests { let snapshot = NodeConfigSnapshot { version: 1, gpu: Some(NodeGpuConfig { - assignment: crate::proto::node::GpuAssignment::Auto as i32, + assignment: crate::proto::config::GpuAssignment::Auto as i32, }), models: vec![NodeModelEntry { model: "legacy.gguf".to_string(), @@ -1703,7 +1703,7 @@ mod tests { let snapshot = NodeConfigSnapshot { version: 1, gpu: Some(NodeGpuConfig { - assignment: crate::proto::node::GpuAssignment::Auto as i32, + assignment: crate::proto::config::GpuAssignment::Auto as i32, }), models: vec![NodeModelEntry { model: "legacy.gguf".to_string(), @@ -1999,7 +1999,7 @@ mod tests { let snapshot = mesh_config_to_proto(&config); assert_eq!( snapshot.gpu.as_ref().map(|gpu| gpu.assignment), - Some(crate::proto::node::GpuAssignment::Pinned as i32), + Some(crate::proto::config::GpuAssignment::Pinned as i32), "pinned snapshots must not be downgraded to auto" ); assert_eq!( @@ -2018,7 +2018,7 @@ mod tests { let roundtripped = mesh_config_to_proto(&restored); assert_eq!( roundtripped.gpu.as_ref().map(|gpu| gpu.assignment), - Some(crate::proto::node::GpuAssignment::Pinned as i32), + Some(crate::proto::config::GpuAssignment::Pinned as i32), "re-encoded snapshot must keep pinned assignment" ); assert_eq!( @@ -2048,7 +2048,7 @@ mod tests { let snapshot = NodeConfigSnapshot { version: 1, gpu: Some(NodeGpuConfig { - assignment: crate::proto::node::GpuAssignment::Pinned as i32, + assignment: crate::proto::config::GpuAssignment::Pinned as i32, }), models: vec![NodeModelEntry { model: "Qwen3-8B-Q4_K_M".to_string(), @@ -2282,7 +2282,7 @@ mod tests { #[test] fn config_sync_push_validates_signature_length_too_short() { - use crate::proto::node::ConfigPush; + use crate::proto::config::ConfigPush; let push = ConfigPush { gen: NODE_PROTOCOL_GENERATION, requester_id: vec![0x01; 32], @@ -2304,7 +2304,7 @@ mod tests { #[test] fn config_sync_push_validates_signature_length_empty() { - use crate::proto::node::ConfigPush; + use crate::proto::config::ConfigPush; let push = ConfigPush { gen: NODE_PROTOCOL_GENERATION, requester_id: vec![0x01; 32], @@ -2326,7 +2326,7 @@ mod tests { #[test] fn config_sync_snapshot_response_validates_config_present() { - use crate::proto::node::ConfigSnapshotResponse; + use crate::proto::config::ConfigSnapshotResponse; let response = ConfigSnapshotResponse { gen: NODE_PROTOCOL_GENERATION, node_id: vec![0x01; 32], @@ -2348,7 +2348,7 @@ mod tests { #[test] fn config_sync_update_notification_validates_config_present() { - use crate::proto::node::ConfigUpdateNotification; + use crate::proto::config::ConfigUpdateNotification; let notification = ConfigUpdateNotification { gen: NODE_PROTOCOL_GENERATION, node_id: vec![0x01; 32], diff --git a/mesh-llm/src/protocol/v0.rs b/mesh-llm/src/protocol/v0.rs index bd8df868..da865fbd 100644 --- a/mesh-llm/src/protocol/v0.rs +++ b/mesh-llm/src/protocol/v0.rs @@ -5,21 +5,21 @@ use std::collections::HashMap; pub const ALPN_V0: &[u8] = b"mesh-llm/0"; -pub(crate) fn decode_legacy_tunnel_map_frame(buf: &[u8]) -> Result { +pub(crate) fn decode_legacy_tunnel_map_frame(buf: &[u8]) -> Result { let serialized: HashMap = serde_json::from_slice(buf)?; let entries = serialized .into_iter() .filter_map(|(hex_id, port)| { let bytes = hex::decode(&hex_id).ok()?; let arr: [u8; 32] = bytes.try_into().ok()?; - Some(crate::proto::node::TunnelEntry { + Some(crate::proto::mesh::TunnelEntry { target_peer_id: arr.to_vec(), relay_peer_id: None, tunnel_port: port as u32, }) }) .collect(); - Ok(crate::proto::node::TunnelMap { + Ok(crate::proto::mesh::TunnelMap { owner_peer_id: Vec::new(), entries, })