Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
154 changes: 151 additions & 3 deletions libs/deepagents/deepagents/middleware/subagents.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
"""Middleware for providing subagents to an agent via a `task` tool."""

import dataclasses
import json
from collections.abc import Awaitable, Callable, Sequence
from typing import Any, NotRequired, TypedDict, cast

from langchain.agents import create_agent
from langchain.agents.middleware import HumanInTheLoopMiddleware, InterruptOnConfig
from langchain.agents.middleware.types import AgentMiddleware, ContextT, ModelRequest, ModelResponse, ResponseT
from langchain.agents.structured_output import ResponseFormat
from langchain.tools import BaseTool, ToolRuntime
from langchain_core.language_models import BaseChatModel
from langchain_core.messages import HumanMessage, ToolMessage
Expand Down Expand Up @@ -76,6 +79,42 @@ class SubAgent(TypedDict):
skills: NotRequired[list[str]]
"""Skill source paths for SkillsMiddleware."""

response_format: NotRequired[ResponseFormat[Any] | type | dict[str, Any]]
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of documenting example usage, could we document what the options are?

What is type ? (i.e., allows pydantic model, dataclass etc)

waht is dict? (i think it's json schema)

Given that we allow a dataclass, does this code work b/c there's a json.dumps?

"""Structured output response format for the subagent.

When specified, the subagent will produce a `structured_response` conforming to the
given schema. The structured response is JSON-serialized and returned as the
ToolMessage content to the parent agent, replacing the default last-message extraction.

Accepted formats (from `langchain.agents.structured_output`):

- `ToolStrategy(schema)`: Use tool calling to extract structured output from the model.
- `ProviderStrategy(schema)`: Use the model provider's native structured output mode.
- `AutoStrategy(schema)`: Automatically select the best strategy.
- A bare Python `type`: A Pydantic `BaseModel` subclass, `dataclass`, or `TypedDict`
class. Equivalent to `AutoStrategy(schema)`.
- `dict[str, Any]`: A JSON schema dictionary (e.g.,
`{"type": "object", "properties": {...}, "required": [...]}`).

Example:
```python
from pydantic import BaseModel

class Findings(BaseModel):
findings: str
confidence: float

analyzer: SubAgent = {
"name": "analyzer",
"description": "Analyzes data and returns structured findings",
"system_prompt": "Analyze the data and return your findings.",
"model": "openai:gpt-4o",
"tools": [],
"response_format": Findings,
}
```
"""


class CompiledSubAgent(TypedDict):
"""A pre-compiled agent spec.
Expand Down Expand Up @@ -295,6 +334,99 @@ class _SubagentSpec(TypedDict):
runnable: Runnable


def _get_subagents_legacy(
*,
default_model: str | BaseChatModel,
default_tools: Sequence[BaseTool | Callable | dict[str, Any]],
default_middleware: list[AgentMiddleware] | None,
default_interrupt_on: dict[str, bool | InterruptOnConfig] | None,
subagents: Sequence[SubAgent | CompiledSubAgent],
general_purpose_agent: bool,
) -> list[_SubagentSpec]:
"""Create subagent instances from specifications.

Args:
default_model: Default model for subagents that don't specify one.
default_tools: Default tools for subagents that don't specify tools.
default_middleware: Middleware to apply to all subagents. If `None`,
no default middleware is applied.
default_interrupt_on: The tool configs to use for the default general-purpose subagent. These
are also the fallback for any subagents that don't specify their own tool configs.
subagents: List of agent specifications or pre-compiled agents.
general_purpose_agent: Whether to include a general-purpose subagent.

Returns:
List of subagent specs containing name, description, and runnable.
"""
# Use empty list if None (no default middleware)
default_subagent_middleware = default_middleware or []

specs: list[_SubagentSpec] = []

# Create general-purpose agent if enabled
if general_purpose_agent:
general_purpose_middleware = [*default_subagent_middleware]
if default_interrupt_on:
general_purpose_middleware.append(HumanInTheLoopMiddleware(interrupt_on=default_interrupt_on))
general_purpose_subagent = create_agent(
default_model,
system_prompt=DEFAULT_SUBAGENT_PROMPT,
tools=default_tools,
middleware=general_purpose_middleware,
name="general-purpose",
)
specs.append(
{
"name": "general-purpose",
"description": DEFAULT_GENERAL_PURPOSE_DESCRIPTION,
"runnable": general_purpose_subagent,
}
)

# Process custom subagents
for agent_ in subagents:
if "runnable" in agent_:
custom_agent = cast("CompiledSubAgent", agent_)
specs.append(
{
"name": custom_agent["name"],
"description": custom_agent["description"],
"runnable": custom_agent["runnable"],
}
)
continue
_tools = agent_.get("tools", list(default_tools))

subagent_model = agent_.get("model", default_model)

_middleware = [*default_subagent_middleware, *agent_["middleware"]] if "middleware" in agent_ else [*default_subagent_middleware]

interrupt_on = agent_.get("interrupt_on", default_interrupt_on)
if interrupt_on:
_middleware.append(HumanInTheLoopMiddleware(interrupt_on=interrupt_on))

create_agent_kwargs: dict[str, Any] = {}
if "response_format" in agent_:
create_agent_kwargs["response_format"] = agent_["response_format"]

Comment on lines +408 to +411
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

eh no need to thread this through the legacy API, we'll remove

specs.append(
{
"name": agent_["name"],
"description": agent_["description"],
"runnable": create_agent(
subagent_model,
system_prompt=agent_["system_prompt"],
tools=_tools,
middleware=_middleware,
name=agent_["name"],
**create_agent_kwargs,
),
}
)

return specs


def _build_task_tool( # noqa: C901
subagents: list[_SubagentSpec],
task_description: str | None = None,
Expand Down Expand Up @@ -332,12 +464,23 @@ def _return_command_with_state_update(result: dict, tool_call_id: str) -> Comman
raise ValueError(error_msg)

state_update = {k: v for k, v in result.items() if k not in _EXCLUDED_STATE_KEYS}
# Strip trailing whitespace to prevent API errors with Anthropic
message_text = result["messages"][-1].text.rstrip() if result["messages"][-1].text else ""

structured = result.get("structured_response")
if structured is not None:
Comment on lines +468 to +469
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

little nit

Suggested change
structured = result.get("structured_response")
if structured is not None:
if (structured := result.get("structured_response")) is not None:

if hasattr(structured, "model_dump_json"):
content: str = structured.model_dump_json()
elif dataclasses.is_dataclass(structured) and not isinstance(structured, type):
content = json.dumps(dataclasses.asdict(structured))
else:
content = json.dumps(structured)
else:
# Strip trailing whitespace to prevent API errors with Anthropic
content = result["messages"][-1].text.rstrip() if result["messages"][-1].text else ""

return Command(
update={
**state_update,
"messages": [ToolMessage(message_text, tool_call_id=tool_call_id)],
"messages": [ToolMessage(content, tool_call_id=tool_call_id)],
}
)

Expand Down Expand Up @@ -501,6 +644,10 @@ def _get_subagents(self) -> list[_SubagentSpec]:
if interrupt_on:
middleware.append(HumanInTheLoopMiddleware(interrupt_on=interrupt_on))

create_agent_kwargs: dict[str, Any] = {}
if "response_format" in spec:
create_agent_kwargs["response_format"] = spec["response_format"]

specs.append(
{
"name": spec["name"],
Expand All @@ -511,6 +658,7 @@ def _get_subagents(self) -> list[_SubagentSpec]:
tools=spec["tools"],
middleware=middleware,
name=spec["name"],
**create_agent_kwargs,
),
}
)
Expand Down
123 changes: 122 additions & 1 deletion libs/deepagents/tests/integration_tests/test_subagent_middleware.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
import json
from typing import ClassVar

import pytest
from langchain.agents.middleware import AgentMiddleware
from langchain_core.messages import HumanMessage
from langchain.agents.structured_output import ToolStrategy
from langchain_core.messages import AIMessage, HumanMessage
from langchain_core.tools import tool
from pydantic import BaseModel, Field

from deepagents.backends.state import StateBackend
from deepagents.graph import create_agent
Expand Down Expand Up @@ -201,3 +204,121 @@ def test_defined_subagent_custom_runnable(self):
agent,
{"messages": [HumanMessage(content="What is the weather in Tokyo?")]},
)

def test_deprecated_api_subagents_inherit_model(self):
"""Test that subagents inherit default_model when not specified."""
with pytest.warns(DeprecationWarning, match="default_model"):
agent = create_agent(
model="claude-sonnet-4-20250514",
system_prompt="Use the task tool to call a subagent.",
middleware=[
SubAgentMiddleware(
default_model="gpt-4.1", # Custom subagent should inherit this
default_tools=[get_weather],
subagents=[
{
"name": "custom",
"description": "Custom subagent that gets weather.",
"system_prompt": "Use the get_weather tool.",
# No model specified - should inherit from default_model
}
],
)
],
)
# Verify the custom subagent uses the inherited model
expected_tool_calls = [
{"name": "task", "args": {"subagent_type": "custom"}, "model": "claude-sonnet-4-20250514"},
{"name": "get_weather", "args": {}, "model": "gpt-4.1-2025-04-14"}, # Inherited model
]
assert_expected_subgraph_actions(
expected_tool_calls,
agent,
{"messages": [HumanMessage(content="What is the weather in Tokyo?")]},
)

def test_deprecated_api_subagents_inherit_tools(self):
"""Test that subagents inherit default_tools when not specified."""
with pytest.warns(DeprecationWarning, match="default_model"):
agent = create_agent(
model="claude-sonnet-4-20250514",
system_prompt="Use the task tool to call a subagent.",
middleware=[
SubAgentMiddleware(
default_model="claude-sonnet-4-20250514",
default_tools=[get_weather], # Custom subagent should inherit this
subagents=[
{
"name": "custom",
"description": "Custom subagent that gets weather.",
"system_prompt": "Use the get_weather tool to get weather.",
# No tools specified - should inherit from default_tools
}
],
)
],
)
# Verify the custom subagent can use the inherited tools
expected_tool_calls = [
{"name": "task", "args": {"subagent_type": "custom"}},
{"name": "get_weather", "args": {}}, # Inherited tool
]
assert_expected_subgraph_actions(
expected_tool_calls,
agent,
{"messages": [HumanMessage(content="What is the weather in Tokyo?")]},
)

def test_subagent_response_format_serialized_as_json(self):
"""Test that subagent responseFormat produces JSON-serialized ToolMessage content.

Verifies the end-to-end flow when `response_format` is set directly on a
`SubAgent` spec: the subagent's `structured_response` is JSON-serialized
into the ToolMessage content returned to the parent agent.
"""

class SubagentFindings(BaseModel):
findings: str = Field(description="The findings")
confidence: float = Field(description="Confidence score")
summary: str = Field(description="Brief summary")

agent = create_agent(
model="claude-sonnet-4-20250514",
system_prompt="You are an orchestrator. Always delegate tasks to the appropriate subagent via the task tool.",
middleware=[
SubAgentMiddleware(
backend=StateBackend(),
subagents=[
{
"name": "foo",
"description": "Call this when the user says 'foo'",
"system_prompt": "You are a foo agent",
"model": "claude-haiku-4-5",
"tools": [],
"response_format": ToolStrategy(schema=SubagentFindings),
},
],
)
],
)

result = agent.invoke(
{"messages": [HumanMessage(content="foo - tell me how confident you are that pineapple belongs on pizza")]},
{"recursion_limit": 100},
)

agent_messages = [msg for msg in result["messages"] if isinstance(msg, AIMessage)]
tool_calls = [tc for msg in agent_messages for tc in (msg.tool_calls or [])]
assert any(tc["name"] == "task" and tc["args"].get("subagent_type") == "foo" for tc in tool_calls)

task_tool_messages = [msg for msg in result["messages"] if msg.type == "tool" and msg.name == "task"]
assert len(task_tool_messages) > 0

task_tool_message = task_tool_messages[0]
parsed = json.loads(task_tool_message.content)
assert "findings" in parsed
assert "confidence" in parsed
assert "summary" in parsed
assert isinstance(parsed["findings"], str)
assert isinstance(parsed["confidence"], (int, float))
assert isinstance(parsed["summary"], str)
Loading
Loading