Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,9 @@ dds = [
]
macos = ["osascript"]

[project.scripts]
om1-mcp-server = "expose.server:main"

[dependency-groups]
dev = [
"black==26.3.1",
Expand Down
Empty file added src/expose/__init__.py
Empty file.
6 changes: 6 additions & 0 deletions src/expose/__main__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
"""Entry point for `python -m expose`."""

from expose.server import main

if __name__ == "__main__":
main()
24 changes: 24 additions & 0 deletions src/expose/config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
"""Runtime configuration for the OM1 MCP server."""

from __future__ import annotations

import os
from dataclasses import dataclass


@dataclass(frozen=True)
class ServerConfig:
"""Immutable runtime configuration for the OM1 MCP server."""

websim_host: str = "127.0.0.1"
websim_port: int = 8000
log_level: str = "WARNING"

@classmethod
def from_env(cls) -> "ServerConfig":
"""Build a ServerConfig from OM1_WEBSIM_HOST / OM1_WEBSIM_PORT / OM1_LOG_LEVEL env vars."""
return cls(
websim_host=os.getenv("OM1_WEBSIM_HOST", "127.0.0.1"),
websim_port=int(os.getenv("OM1_WEBSIM_PORT", "8000")),
log_level=os.getenv("OM1_LOG_LEVEL", "WARNING"),
)
111 changes: 111 additions & 0 deletions src/expose/server.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
"""OM1 MCP server: expose OM1 tools to external MCP clients."""

from __future__ import annotations

import asyncio
import logging
from importlib.metadata import PackageNotFoundError, version
from typing import Any

import mcp.server.stdio
import mcp.types as types
from mcp.server import NotificationOptions, Server
from mcp.server.models import InitializationOptions

from expose.config import ServerConfig
from expose.tools import Emotion, MoveAction, build_tool_definitions
from expose.websim_adapter import WebSimAdapter

logger = logging.getLogger(__name__)


def _get_version() -> str:
try:
return version("om1")
except PackageNotFoundError:
return "0.0.0+unknown"


def _err(msg: str) -> list[types.TextContent]:
return [types.TextContent(type="text", text=f"Error: {msg}")]


def _ok(msg: str) -> list[types.TextContent]:
return [types.TextContent(type="text", text=msg)]


async def handle_tool_call(
name: str, arguments: dict[str, Any] | None, adapter: WebSimAdapter
) -> list[types.TextContent]:
"""Dispatch an MCP tool call to the adapter and return the tool result as TextContent.

Invalid inputs return an error TextContent instead of raising, so the MCP
client observes a structured failure rather than a transport-level crash.
"""
args = arguments or {}
try:
if name == "om1_move":
action = MoveAction(args["action"])
adapter.move(action.value)
return _ok(f"Executed move: {action.value}")

if name == "om1_speak":
text = args.get("text", "").strip()
if not text:
raise ValueError("text must be non-empty")
adapter.speak(text)
return _ok(f"Spoke: {text}")

if name == "om1_face":
emotion = Emotion(args["emotion"])
adapter.face(emotion.value)
return _ok(f"Changed emotion to: {emotion.value}")

return _err(f"Unknown tool: {name}")
except (KeyError, ValueError) as e:
return _err(str(e))


def build_server(adapter: WebSimAdapter) -> Server:
"""Wire an mcp.server.Server with list_tools/call_tool handlers backed by ``adapter``."""
server = Server("om1_mcp_server")

@server.list_tools()
async def _list_tools() -> list[types.Tool]:
return build_tool_definitions()

@server.call_tool()
async def _call_tool(name: str, arguments: dict[str, Any] | None) -> list[types.TextContent]:
return await handle_tool_call(name, arguments, adapter)

return server


async def run(config: ServerConfig) -> None:
"""Initialise logging/WebSim and serve the MCP protocol over stdio until stdin closes."""
logging.basicConfig(level=config.log_level.upper())
adapter = WebSimAdapter.create(config.websim_host, config.websim_port)
server = build_server(adapter)

async with mcp.server.stdio.stdio_server() as (read, write):
await server.run(
read,
write,
InitializationOptions(
server_name="om1_mcp_server",
server_version=_get_version(),
capabilities=server.get_capabilities(
notification_options=NotificationOptions(),
experimental_capabilities={},
),
),
)


def main() -> None:
"""CLI entry point (``om1-mcp-server``): read env config and run the server."""
asyncio.run(run(ServerConfig.from_env()))


if __name__ == "__main__":
main()
79 changes: 79 additions & 0 deletions src/expose/tools.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
"""MCP tool definitions exposed by the OM1 server."""

from __future__ import annotations

from enum import Enum

import mcp.types as types


class MoveAction(str, Enum):
"""Supported values for the ``om1_move`` tool's ``action`` parameter."""

FORWARD = "forward"
BACKWARD = "backward"
TURN_LEFT = "turn_left"
TURN_RIGHT = "turn_right"
SPIN = "spin"
SIT = "sit"
STAND = "stand"
IDLE = "idle"


class Emotion(str, Enum):
"""Supported values for the ``om1_face`` tool's ``emotion`` parameter."""

JOY = "joy"
SMILE = "smile"
PONDER = "ponder"
ALERT = "alert"
SAD = "sad"


def build_tool_definitions() -> list[types.Tool]:
"""Return the list of MCP Tool schemas exposed by the OM1 server."""
return [
types.Tool(
name="om1_move",
description="Move the OM1 agent.",
inputSchema={
"type": "object",
"properties": {
"action": {
"type": "string",
"enum": [a.value for a in MoveAction],
}
},
"required": ["action"],
},
),
types.Tool(
name="om1_speak",
description="Make the OM1 agent speak text aloud.",
inputSchema={
"type": "object",
"properties": {
"text": {
"type": "string",
"minLength": 1,
"maxLength": 500,
}
},
"required": ["text"],
},
),
types.Tool(
name="om1_face",
description="Change the OM1 agent's facial emotion.",
inputSchema={
"type": "object",
"properties": {
"emotion": {
"type": "string",
"enum": [e.value for e in Emotion],
}
},
"required": ["emotion"],
},
),
]
49 changes: 49 additions & 0 deletions src/expose/websim_adapter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
"""Adapter that wraps WebSim behind a minimal, testable interface."""

from __future__ import annotations

import socket
from contextlib import closing
from typing import Any

from llm.output_model import Action


class WebSimAdapter:
"""Thin wrapper exposing only the move/speak/face operations the MCP server needs."""

def __init__(self, websim: Any):
"""Wrap an already-constructed WebSim-like object (used directly in tests with mocks)."""
self._websim = websim

@classmethod
def create(cls, host: str, port: int) -> "WebSimAdapter":
"""Factory that also starts a real WebSim on (host, port)."""
cls.ensure_port_free(host, port)
# Imported here so unit tests for the adapter don't need WebSim.
from simulators.base import SimulatorConfig
from simulators.plugins.WebSim import WebSim

return cls(WebSim(SimulatorConfig(host=host, port=port)))

@staticmethod
def ensure_port_free(host: str, port: int) -> None:
"""Raise RuntimeError if ``(host, port)`` is already accepting connections."""
with closing(socket.socket(socket.AF_INET, socket.SOCK_STREAM)) as s:
if s.connect_ex((host, port)) == 0:
raise RuntimeError(
f"Port {port} on {host} is already in use. "
f"Set OM1_WEBSIM_PORT to a free port or kill the conflict."
)

def move(self, action: str) -> None:
"""Dispatch a ``move`` Action to the underlying WebSim."""
self._websim.sim([Action(type="move", value=action)])

def speak(self, text: str) -> None:
"""Dispatch a ``speak`` Action to the underlying WebSim."""
self._websim.sim([Action(type="speak", value=text)])

def face(self, emotion: str) -> None:
"""Dispatch an ``emotion`` Action to the underlying WebSim."""
self._websim.sim([Action(type="emotion", value=emotion)])
Empty file added tests/expose/__init__.py
Empty file.
36 changes: 36 additions & 0 deletions tests/expose/test_config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
"""Tests for expose.config — server configuration from env vars."""

import pytest

from expose.config import ServerConfig


class TestDefaults:
def test_defaults_when_no_env_vars(self, monkeypatch):
for var in ("OM1_WEBSIM_HOST", "OM1_WEBSIM_PORT", "OM1_LOG_LEVEL"):
monkeypatch.delenv(var, raising=False)
cfg = ServerConfig.from_env()
assert cfg.websim_host == "127.0.0.1"
assert cfg.websim_port == 8000
assert cfg.log_level == "WARNING"


class TestEnvOverrides:
def test_host_override(self, monkeypatch):
monkeypatch.setenv("OM1_WEBSIM_HOST", "0.0.0.0")
assert ServerConfig.from_env().websim_host == "0.0.0.0"

def test_port_override_parses_int(self, monkeypatch):
monkeypatch.setenv("OM1_WEBSIM_PORT", "9000")
assert ServerConfig.from_env().websim_port == 9000

def test_log_level_override(self, monkeypatch):
monkeypatch.setenv("OM1_LOG_LEVEL", "DEBUG")
assert ServerConfig.from_env().log_level == "DEBUG"


class TestValidation:
def test_invalid_port_raises(self, monkeypatch):
monkeypatch.setenv("OM1_WEBSIM_PORT", "not-a-number")
with pytest.raises(ValueError):
ServerConfig.from_env()
69 changes: 69 additions & 0 deletions tests/expose/test_server.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
"""Tests for expose.server — tool dispatch and MCP wiring."""

from unittest.mock import MagicMock

import pytest

from expose.server import handle_tool_call


@pytest.fixture
def adapter():
return MagicMock()


class TestMove:
@pytest.mark.asyncio
async def test_valid_move_calls_adapter(self, adapter):
result = await handle_tool_call("om1_move", {"action": "forward"}, adapter)
adapter.move.assert_called_once_with("forward")
assert len(result) == 1
assert "forward" in result[0].text

@pytest.mark.asyncio
async def test_invalid_move_returns_error_not_raise(self, adapter):
result = await handle_tool_call("om1_move", {"action": "moonwalk"}, adapter)
adapter.move.assert_not_called()
assert result[0].text.lower().startswith("error")

@pytest.mark.asyncio
async def test_missing_action_arg_returns_error(self, adapter):
result = await handle_tool_call("om1_move", {}, adapter)
adapter.move.assert_not_called()
assert "error" in result[0].text.lower()


class TestSpeak:
@pytest.mark.asyncio
async def test_valid_speak_calls_adapter(self, adapter):
result = await handle_tool_call("om1_speak", {"text": "hi"}, adapter)
adapter.speak.assert_called_once_with("hi")
assert "hi" in result[0].text

@pytest.mark.asyncio
async def test_empty_text_returns_error(self, adapter):
result = await handle_tool_call("om1_speak", {"text": " "}, adapter)
adapter.speak.assert_not_called()
assert "error" in result[0].text.lower()


class TestFace:
@pytest.mark.asyncio
async def test_valid_emotion(self, adapter):
result = await handle_tool_call("om1_face", {"emotion": "joy"}, adapter)
adapter.face.assert_called_once_with("joy")
assert "joy" in result[0].text

@pytest.mark.asyncio
async def test_invalid_emotion_returns_error(self, adapter):
result = await handle_tool_call("om1_face", {"emotion": "angry"}, adapter)
adapter.face.assert_not_called()
assert "error" in result[0].text.lower()


class TestUnknownTool:
@pytest.mark.asyncio
async def test_unknown_tool_returns_error(self, adapter):
result = await handle_tool_call("om1_teleport", {}, adapter)
assert "error" in result[0].text.lower()
assert "om1_teleport" in result[0].text
Loading
Loading