Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions airlock/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,12 +130,12 @@ async def auth_config() -> dict[str, str]:
@app.get("/api/actions")
async def list_actions(status: str | None = None, limit: int = 100, offset: int = 0) -> list[Action]:
status_enum = ActionStatus(status) if status else None
return await gate._req_storage.list_actions(status_enum, limit=limit, offset=offset)
return await gate.req_storage.list_actions(status_enum, limit=limit, offset=offset)

@app.get("/api/actions/{session_key}/{action_seq}")
async def get_action(session_key: str, action_seq: int) -> Action:
key = ActionKey(session_key=session_key, action_seq=action_seq)
action = await gate._req_storage.get_action(key)
action = await gate.req_storage.get_action(key)
if action is None:
raise HTTPException(status_code=404, detail="Action not found")
return action
Expand Down
26 changes: 13 additions & 13 deletions airlock/proxy_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ async def _rehydrate_pending_actions(self) -> None:
Without this, decide() would fail for any PENDING action from a prior run because
_pending_decisions only lives in memory and is empty on startup.
"""
pending = await self._req_storage.list_actions(ActionStatus.PENDING)
pending = await self.req_storage.list_actions(ActionStatus.PENDING)
for action in pending:
namespace = MCPMountPrefix(action.call.server_namespace)
self._spawn(self._await_human_decision(action.key, namespace, action.call.tool_name, action.call.arguments))
Expand Down Expand Up @@ -230,20 +230,20 @@ def _register_resources(self) -> None:
async def action_resource(session_key: str, action_seq: int) -> str:
"""Current state of a deferred action."""
key = ActionKey(session_key=session_key, action_seq=action_seq)
action = await self._req_storage.get_action(key)
action = await self.req_storage.get_action(key)
if action is None:
raise ValueError(f"Action not found: {session_key}/{action_seq}")
return action.model_dump_json()

@self.resource("resource://sessions/{session_key}/log_hwm")
async def log_hwm_resource(session_key: str) -> str:
"""The entry_id of the last log entry for this session."""
return str(await self._req_storage.get_log_hwm(session_key))
return str(await self.req_storage.get_log_hwm(session_key))

@self.resource("resource://sessions/{session_key}/log/{entry_id}")
async def log_entry_resource(session_key: str, entry_id: int) -> str:
"""A specific log entry."""
entry = await self._req_storage.get_log_entry(session_key, entry_id)
entry = await self.req_storage.get_log_entry(session_key, entry_id)
if entry is None:
raise ValueError(f"Log entry not found: {session_key}/{entry_id}")
return entry.model_dump_json()
Expand All @@ -264,7 +264,7 @@ def _register_tools(self) -> None:
@self.tool(auth=require_scopes(READ_SCOPE))
async def list_actions(status: ActionStatus | None = None, limit: int = 100, offset: int = 0) -> list[Action]:
"""List queued/processed actions, optionally filtered by status."""
return await self._req_storage.list_actions(status, limit=limit, offset=offset)
return await self.req_storage.list_actions(status, limit=limit, offset=offset)

@self.tool(auth=require_scopes(DECIDE_SCOPE))
async def approve_action(key: ActionKey) -> None:
Expand All @@ -282,7 +282,7 @@ async def withdraw_action(key: ActionKey) -> Action:
return await self.withdraw(key)

@property
def _req_storage(self) -> ActionStorage:
def req_storage(self) -> ActionStorage:
if self._storage is None:
raise RuntimeError("storage not initialised — gate not started")
return self._storage
Expand All @@ -301,11 +301,11 @@ def _register_wrapped_tool(self, namespace: MCPMountPrefix, backend_tool: mcp_ty
async def _tool_handler(
justification: str,
session_key: str,
input: dict[str, object] = {}, # noqa: B006
input: dict[str, object] = {},
wait_mode: WaitMode | None = None,
) -> Action:
call = ToolCall(server_namespace=namespace, tool_name=tool_name, arguments=input)
action = await self._req_storage.create_action(
action = await self.req_storage.create_action(
session_key=session_key, call=call, justification=justification
)
key = action.key
Expand All @@ -322,7 +322,7 @@ async def _tool_handler(
with anyio.move_on_after(effective_timeout):
await asyncio.shield(pipeline)

result = await self._req_storage.get_action(key)
result = await self.req_storage.get_action(key)
assert result is not None
return result

Expand Down Expand Up @@ -393,7 +393,7 @@ async def _apply_decision(

async def _append_log_and_notify(self, key: ActionKey, detail: LogEventDetail) -> None:
"""Append a log entry and notify subscribed sessions of action + HWM changes."""
await self._req_storage.append_log_entry(session_key=key.session_key, action_seq=key.action_seq, detail=detail)
await self.req_storage.append_log_entry(session_key=key.session_key, action_seq=key.action_seq, detail=detail)
await self._notify_subscribers(f"resource://sessions/{key.session_key}/actions/{key.action_seq}")
await self._notify_subscribers(f"resource://sessions/{key.session_key}/log_hwm")

Expand All @@ -419,7 +419,7 @@ async def decide(self, key: ActionKey, decision: OperatorDecision) -> None:
Raises ValueError if the action does not exist, is not pending, or is not
awaiting a human decision (e.g. still processing an auto-predicate).
"""
action = await self._req_storage.get_action(key)
action = await self.req_storage.get_action(key)
if action is None:
raise ValueError(f"Action not found: {key.session_key}/{key.action_seq}")
if not isinstance(action.state, PendingState):
Expand All @@ -434,7 +434,7 @@ async def withdraw(self, key: ActionKey) -> Action:

Raises ValueError if the action does not exist or is not pending.
"""
action = await self._req_storage.get_action(key)
action = await self.req_storage.get_action(key)
if action is None:
raise ValueError(f"Action not found: {key.session_key}/{key.action_seq}")
if not isinstance(action.state, PendingState):
Expand All @@ -454,7 +454,7 @@ def _spawn(self, coro: Coroutine[Any, Any, Any]) -> asyncio.Task[Any]:

async def _update_and_notify(self, key: ActionKey, new_state: ActionState, detail: LogEventDetail) -> Action:
"""Update action state and append log entry atomically, then notify subscribers."""
action, _ = await self._req_storage.update_state_and_log(key, new_state, detail)
action, _ = await self.req_storage.update_state_and_log(key, new_state, detail)
await self._notify_subscribers(f"resource://sessions/{key.session_key}/actions/{key.action_seq}")
await self._notify_subscribers(f"resource://sessions/{key.session_key}/log_hwm")
self._broadcast_sse({"type": "action_updated", "session_key": key.session_key, "action_seq": key.action_seq})
Expand Down
16 changes: 8 additions & 8 deletions devinfra/claude/auth_proxy/proxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ def __init__(self, listen_port: int, max_workers: int = 100):
self._upstream_url: str | None = None
self._creds_lock = threading.Lock()
self.server_socket: socket.socket | None = None
self._running = False
self.running = False
self._thread: threading.Thread | None = None
Comment on lines 88 to 92
Copy link

Copilot AI Mar 27, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Renaming _running to running breaks existing call sites that still check proxy._running (e.g. devinfra/claude/test_integration.py). To avoid runtime/test failures, either update those call sites in this PR or provide a backwards-compatible alias (e.g., a property _running that proxies to running) during the transition.

Copilot uses AI. Check for mistakes.
self._executor: ThreadPoolExecutor | None = None
self._connections: list[socket.socket] = []
Expand Down Expand Up @@ -118,15 +118,15 @@ def start(self) -> None:
self.server_socket.settimeout(0.5)

self._executor = ThreadPoolExecutor(max_workers=self.max_workers, thread_name_prefix="proxy")
self._running = True
self.running = True
self._thread = threading.Thread(target=self._serve, daemon=True)
self._thread.start()

logger.info("Auth proxy started on 127.0.0.1:%d (max_workers: %d)", self.listen_port, self.max_workers)

def stop(self) -> None:
"""Stop the proxy server."""
self._running = False
self.running = False
if self._thread:
self._thread.join(timeout=2)
if self._executor:
Expand All @@ -140,7 +140,7 @@ def stop(self) -> None:

def _serve(self) -> None:
"""Main server loop."""
while self._running:
while self.running:
try:
client_sock, _ = self.server_socket.accept() # type: ignore[union-attr]
self._connections.append(client_sock)
Expand Down Expand Up @@ -309,7 +309,7 @@ def __init__(self, sock_path: Path, remote_target: str, max_workers: int = 100):
self._upstream_url: str | None = None
self._creds_lock = threading.Lock()
self.server_socket: socket.socket | None = None
self._running = False
self.running = False
self._thread: threading.Thread | None = None
self._executor: ThreadPoolExecutor | None = None
self._connections: list[socket.socket] = []
Expand Down Expand Up @@ -341,15 +341,15 @@ def start(self) -> None:
self.server_socket.settimeout(0.5)

self._executor = ThreadPoolExecutor(max_workers=self.max_workers, thread_name_prefix="uds-proxy")
self._running = True
self.running = True
self._thread = threading.Thread(target=self._serve, daemon=True)
self._thread.start()

logger.info("UDS remote proxy started on %s → %s", self.sock_path, self.remote_target)

def stop(self) -> None:
"""Stop the UDS proxy server."""
self._running = False
self.running = False
if self._thread:
self._thread.join(timeout=2)
if self._executor:
Expand All @@ -365,7 +365,7 @@ def stop(self) -> None:

def _serve(self) -> None:
"""Main server loop."""
while self._running:
while self.running:
try:
client_sock, _ = self.server_socket.accept() # type: ignore[union-attr]
self._connections.append(client_sock)
Expand Down
4 changes: 2 additions & 2 deletions devinfra/claude/auth_proxy/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ def _resolve_rlocation(rlocation_path: str) -> Path | None:
bazel_wrapper subprocess (which runs outside Bazel runfiles).
"""
try:
from util.bazel.runfiles import get_required_path # noqa: PLC0415
from util.bazel.runfiles import get_required_path
Copy link

Copilot AI Mar 27, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The local import in _resolve_rlocation() is intentional (to avoid crashing outside Bazel), but removing # noqa: PLC0415 will make ruff flag this as an import outside top-level (PLC0415 is enabled repo-wide). Please add the # noqa: PLC0415 back on this import or add a per-file ignore for this file.

Suggested change
from util.bazel.runfiles import get_required_path
from util.bazel.runfiles import get_required_path # noqa: PLC0415

Copilot uses AI. Check for mistakes.

return get_required_path(rlocation_path)
except RuntimeError:
Expand Down Expand Up @@ -347,7 +347,7 @@ async def setup_auth_proxy(
# Create combined CA bundle (for tools like uv that use SSL_CERT_FILE)
_create_combined_ca_bundle(paths)

status = (f"running (port {port})" if proxy._running else "configured") if proxy is not None else "uds-only"
status = (f"running (port {port})" if proxy.running else "configured") if proxy is not None else "uds-only"
ca_status = "custom CA" if combined_ca.exists() else "system"

logger.info("Auth proxy setup complete")
Expand Down
2 changes: 1 addition & 1 deletion difftree/diff_tree.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ def __rich_console__(self, console: Console, options: ConsoleOptions) -> RenderR
width=options.max_width or 80,
legacy_windows=False,
force_terminal=True, # Force ANSI codes to preserve styling
color_system="standard" if console._color_system else None, # Match parent console
color_system="standard" if console._color_system else None, # Match parent console # noqa: SLF001
)
temp_console.print(tree)

Expand Down
8 changes: 4 additions & 4 deletions ember/runtime.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,10 +70,10 @@ async def create(cls, settings: EmberSettings) -> EmberRuntime:
This is the primary way to create a runtime - handles all async initialization.
"""
runtime = cls.__new__(cls)
runtime._settings = settings
runtime._task = None
runtime._stop_event = asyncio.Event()
await runtime._initialize()
runtime._settings = settings # noqa: SLF001
runtime._task = None # noqa: SLF001
runtime._stop_event = asyncio.Event() # noqa: SLF001
await runtime._initialize() # noqa: SLF001
return runtime

async def _initialize(self) -> None:
Expand Down
2 changes: 1 addition & 1 deletion inop/runners/containerized_claude.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ def _patched_find_cli(self) -> str:
raise RuntimeError("claude_binary not set in options")
return bin_path

SubprocessCLITransport._find_cli = _patched_find_cli # type: ignore[method-assign]
SubprocessCLITransport._find_cli = _patched_find_cli # type: ignore[method-assign] # noqa: SLF001


# Apply monkeypatch once at module level
Expand Down
2 changes: 1 addition & 1 deletion inventree_utils/beautifier/upload_lcsc_images.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ def upload_lcsc_images(api: InvenTreeAPI):
if lcsc_from_link and lcsc_from_supplier:
# If both are present, assert they match
if lcsc_from_link != lcsc_from_supplier:
raise ValueError(f"Conflicting LCSC IDs: {lcsc_from_link=} != {lcsc_from_supplier=}", log._context)
raise ValueError(f"Conflicting LCSC IDs: {lcsc_from_link=} != {lcsc_from_supplier=}", log._context) # noqa: SLF001
# Both match => use either one
lcsc_id = lcsc_from_link
elif lcsc_from_link or lcsc_from_supplier:
Expand Down
10 changes: 5 additions & 5 deletions mcp_infra/compositor/notifications_buffer.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,10 @@ def __init__(self, owner: NotificationsBuffer) -> None:

# Override with narrower types than base MessageHandler (which accepts Any)
async def on_resource_updated(self, message: mcp_types.ResourceUpdatedNotification) -> None:
await self._buffer._on_updated(message)
await self._buffer.on_updated(message)

async def on_resource_list_changed(self, message: mcp_types.ResourceListChangedNotification) -> None:
await self._buffer._on_list_changed(message)
await self._buffer.on_list_changed(message)


class NotificationsBuffer:
Expand Down Expand Up @@ -77,21 +77,21 @@ def poll(self) -> NotificationsBatch:
self._servers.clear()
return batch

async def _on_updated(self, message: mcp_types.ResourceUpdatedNotification) -> None:
async def on_updated(self, message: mcp_types.ResourceUpdatedNotification) -> None:
# Add URI to the server's update set
uri_str = str(message.params.uri)
server = await self._derive_server(uri_str)
self._servers.setdefault(server, _ServerNoticeAccumulator()).updated.add(uri_str)

async def _on_list_changed(self, message: mcp_types.ResourceListChangedNotification) -> None:
async def on_list_changed(self, message: mcp_types.ResourceListChangedNotification) -> None:
# Attribute origin using compositor-captured child notifications when available
names = list(self._compositor.pop_recent_resource_list_changes())
for name in names:
self._servers.setdefault(name, _ServerNoticeAccumulator()).list_changed = True

async def _derive_server(self, uri: str) -> str:
"""Derive origin server from resource URI using all compositor mounts."""
mount_names = await self._compositor._mount_names()
mount_names = await self._compositor.mount_names()
return derive_origin_server(uri, mount_names)

async def _on_resource_listener(self, name: str, uri: str) -> None:
Expand Down
4 changes: 2 additions & 2 deletions mcp_infra/compositor/resources_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -750,8 +750,8 @@ async def _broadcast_subs_updated(self) -> None:

async def _present_servers(self) -> set[str]:
# Include all mounted servers, including in-proc mounts without typed specs.
# Use compositor._mount_names() directly; do not swallow errors.
names = await self._compositor._mount_names()
# Use compositor.mount_names() directly; do not swallow errors.
names = await self._compositor.mount_names()
return set(names)

def _get_or_create_sub(self, server: str, uri: str) -> SubscriptionRecord:
Expand Down
18 changes: 9 additions & 9 deletions mcp_infra/compositor/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,13 +54,13 @@ def __init__(self, compositor: BaseCompositor, server_prefix: MCPMountPrefix) ->
self._server_prefix = server_prefix

async def on_resource_list_changed(self, message: mcp_types.ResourceListChangedNotification) -> None:
self._compositor._pending_resource_list_changes.add(self._server_prefix)
self._compositor.pending_resource_list_changes.add(self._server_prefix)
# No forwarding here; child client handles forwarding via proxy
await self._compositor._notify_resource_list_change(self._server_prefix)
await self._compositor.notify_resource_list_change(self._server_prefix)

async def on_resource_updated(self, message: mcp_types.ResourceUpdatedNotification) -> None:
# Forward to listeners with origin attribution
await self._compositor._notify_resource_updated(self._server_prefix, str(message.params.uri))
await self._compositor.notify_resource_updated(self._server_prefix, str(message.params.uri))


class CompositorState(Enum):
Expand Down Expand Up @@ -130,7 +130,7 @@ def __init__(
self._mount_listeners: list[Callable[[MCPMountPrefix, MountEvent], Awaitable[None] | None]] = []

# Resource change tracking
self._pending_resource_list_changes: set[MCPMountPrefix] = set()
self.pending_resource_list_changes: set[MCPMountPrefix] = set()
self._resource_list_change_listeners: list[Callable[[MCPMountPrefix], Awaitable[None] | None]] = []
self._resource_updated_listeners: list[Callable[[MCPMountPrefix, str], Awaitable[None] | None]] = []

Expand All @@ -157,7 +157,7 @@ def add_resource_list_change_listener(self, cb: Callable[[MCPMountPrefix], Await
"""
self._resource_list_change_listeners.append(cb)

async def _notify_resource_list_change(self, prefix: MCPMountPrefix) -> None:
async def notify_resource_list_change(self, prefix: MCPMountPrefix) -> None:
for cb in list(self._resource_list_change_listeners):
res = cb(prefix)
if asyncio.iscoroutine(res):
Expand All @@ -171,7 +171,7 @@ def add_resource_updated_listener(self, cb: Callable[[MCPMountPrefix, str], Awai
"""
self._resource_updated_listeners.append(cb)

async def _notify_resource_updated(self, prefix: MCPMountPrefix, uri: str) -> None:
async def notify_resource_updated(self, prefix: MCPMountPrefix, uri: str) -> None:
for cb in list(self._resource_updated_listeners):
res = cb(prefix, uri)
if asyncio.iscoroutine(res):
Expand All @@ -181,8 +181,8 @@ async def _notify_resource_updated(self, prefix: MCPMountPrefix, uri: str) -> No

def pop_recent_resource_list_changes(self) -> list[MCPMountPrefix]:
"""Return and clear servers that recently reported resource list changes."""
names = sorted(self._pending_resource_list_changes)
self._pending_resource_list_changes.clear()
names = sorted(self.pending_resource_list_changes)
self.pending_resource_list_changes.clear()
return names

async def server_entries(self) -> dict[MCPMountPrefix, ServerEntry]:
Expand Down Expand Up @@ -651,7 +651,7 @@ def __del__(self):
print(msg, file=sys.stderr)

# ---- Internals ----------------------------------------------------------
async def _mount_names(self) -> list[str]:
async def mount_names(self) -> list[str]:
async with self._mount_lock:
return list(self._mounts.keys())

Expand Down
2 changes: 1 addition & 1 deletion mcp_infra/enhanced/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ async def on_initialize(
if context.fastmcp_context is not None and context.fastmcp_context.session is not None:
session = context.fastmcp_context.session
if isinstance(session, ServerSession):
self._enhanced._sessions.add(session)
self._enhanced._sessions.add(session) # noqa: SLF001
await self._enhanced.flush_pending()
return result

Expand Down
Loading
Loading