-
Notifications
You must be signed in to change notification settings - Fork 168
Add LangSmith tracing plugin for Temporal workflows #1369
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from 30 commits
3981cea
df9a55c
5d25abc
601d67a
cdb5886
941637f
2fa9571
7623d43
a3c0bee
982d220
ad67096
197a2d3
96c139b
d1e66c4
ee09c1f
4e70ef8
2b84421
ded720b
6ff9cc9
d9fb85a
d271aba
4f5d040
54d47a9
c37bac8
a232d16
5bdc3f4
c6db234
0fc2ab3
2853299
1b38567
7d8c19a
c2490b0
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -69,6 +69,7 @@ dev = [ | |
| "googleapis-common-protos==1.70.0", | ||
| "pytest-rerunfailures>=16.1", | ||
| "moto[s3,server]>=5", | ||
| "langsmith>=0.7.17", | ||
| ] | ||
|
Comment on lines
70
to
74
|
||
|
|
||
| [tool.poe.tasks] | ||
|
|
||
| Original file line number | Diff line number | Diff line change | ||||
|---|---|---|---|---|---|---|
| @@ -0,0 +1,225 @@ | ||||||
| # LangSmith Plugin for Temporal Python SDK | ||||||
|
|
||||||
| This Temporal [plugin](https://docs.temporal.io/develop/plugins-guide) allows your [LangSmith](https://smith.langchain.com/) traces to be fully replay safe when added to Temporal workflows and activities. It propagates trace context across worker boundaries so that `@traceable` calls, LLM invocations, and Temporal operations show up in a single connected trace, and ensures that replaying does not generate duplicate traces. | ||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: replay safety is a detail. Let's remove that bit from the first sentence and just say that it allows them to work within Temporal Workflows. |
||||||
|
|
||||||
| ## Quick Start | ||||||
|
|
||||||
| Register the plugin on your Temporal client. You need it on both the client (starter) side and the workers: | ||||||
|
|
||||||
| ```python | ||||||
| from temporalio.client import Client | ||||||
| from temporalio.contrib.langsmith import LangSmithPlugin | ||||||
|
|
||||||
| client = await Client.connect( | ||||||
| "localhost:7233", | ||||||
| plugins=[LangSmithPlugin(project_name="my-project")], | ||||||
| ) | ||||||
| ``` | ||||||
|
Comment on lines
+11
to
+23
|
||||||
|
|
||||||
| Once that's set up, any `@traceable` function inside your workflows and activities will show up in LangSmith with correct parent-child relationships, even across worker boundaries. | ||||||
|
|
||||||
| ## Example: AI Chatbot | ||||||
|
|
||||||
| A conversational chatbot using OpenAI, orchestrated by a Temporal workflow. The workflow stays alive waiting for user messages via signals, and dispatches each message to an activity that calls the LLM. | ||||||
|
|
||||||
| ### Activity (wraps the LLM call) | ||||||
|
|
||||||
| ```python | ||||||
| @langsmith.traceable(name="Call OpenAI", run_type="chain") | ||||||
| @activity.defn | ||||||
| async def call_openai(request: OpenAIRequest) -> Response: | ||||||
| client = wrap_openai(AsyncOpenAI()) # This is a traced langsmith function | ||||||
| return await client.responses.create( | ||||||
| model=request.model, | ||||||
| input=request.input, | ||||||
| instructions=request.instructions, | ||||||
| ) | ||||||
| ``` | ||||||
|
|
||||||
| ### Workflow (orchestrates the conversation) | ||||||
|
|
||||||
| ```python | ||||||
| @workflow.defn | ||||||
| class ChatbotWorkflow: | ||||||
| @workflow.run | ||||||
| async def run(self) -> str: | ||||||
| # @traceable works inside workflows — fully replay-safe | ||||||
| now = workflow.now().strftime("%b %d %H:%M") | ||||||
| return await langsmith.traceable( | ||||||
| name=f"Session {now}", run_type="chain", | ||||||
| )(self._session)() | ||||||
|
|
||||||
| async def _session(self) -> str: | ||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: can you make this function name a verb phrase so that it's clear what it does/returns? |
||||||
| while not self._done: | ||||||
| await workflow.wait_condition( | ||||||
| lambda: self._pending_message is not None or self._done | ||||||
| ) | ||||||
| if self._done: | ||||||
| break | ||||||
|
|
||||||
| message = self._pending_message | ||||||
| self._pending_message = None | ||||||
|
|
||||||
| @langsmith.traceable(name=f"Request: {message[:60]}", run_type="chain") | ||||||
| async def _query(msg: str) -> str: | ||||||
| response = await workflow.execute_activity( | ||||||
| call_openai, | ||||||
| OpenAIRequest(model="gpt-4o-mini", input=msg), | ||||||
| start_to_close_timeout=timedelta(seconds=60), | ||||||
| ) | ||||||
| return response.output_text | ||||||
|
|
||||||
| self._last_response = await _query(message) | ||||||
|
|
||||||
| return "Session ended." | ||||||
| ``` | ||||||
|
|
||||||
| ### Worker | ||||||
|
|
||||||
| ```python | ||||||
| client = await Client.connect( | ||||||
| "localhost:7233", | ||||||
| plugins=[LangSmithPlugin(project_name="chatbot")], | ||||||
| ) | ||||||
|
|
||||||
| worker = Worker( | ||||||
| client, | ||||||
| task_queue="chatbot", | ||||||
| workflows=[ChatbotWorkflow], | ||||||
| activities=[call_openai], | ||||||
| ) | ||||||
| await worker.run() | ||||||
| ``` | ||||||
|
|
||||||
| ### What you see in LangSmith | ||||||
|
|
||||||
| With the default configuration (`add_temporal_runs=False`), the trace only contains your application logic: | ||||||
|
|
||||||
| ``` | ||||||
| Session Apr 03 14:30 | ||||||
| Request: "What's the weather in NYC?" | ||||||
| Call OpenAI | ||||||
| openai.responses.create (auto-traced by wrap_openai) | ||||||
| ``` | ||||||
|
|
||||||
| <!-- Screenshot: LangSmith trace tree with add_temporal_runs=False showing clean application-only hierarchy --> | ||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You'll add a screenshot here? |
||||||
|
|
||||||
| ## `add_temporal_runs` — Temporal Operation Visibility | ||||||
|
|
||||||
| By default, `add_temporal_runs` is `False` and only your `@traceable` application logic appears in traces. Setting it to `True` also adds Temporal operations (StartWorkflow, RunWorkflow, StartActivity, RunActivity, etc.): | ||||||
|
|
||||||
| ```python | ||||||
| plugins=[LangSmithPlugin(project_name="my-project", add_temporal_runs=True)] | ||||||
| ``` | ||||||
|
|
||||||
| This adds Temporal operation nodes to the trace tree so that the orchestration layer is visible alongside your application logic. If the caller wraps `start_workflow` in a `@traceable` function, the full trace looks like: | ||||||
|
|
||||||
| ``` | ||||||
| Ask Chatbot # @traceable wrapper around client.start_workflow | ||||||
| StartWorkflow:ChatbotWorkflow | ||||||
| RunWorkflow:ChatbotWorkflow | ||||||
| Session Apr 03 14:30 | ||||||
| Request: "What's the weather in NYC?" | ||||||
| StartActivity:call_openai | ||||||
| RunActivity:call_openai | ||||||
| Call OpenAI | ||||||
| openai.responses.create | ||||||
| ``` | ||||||
|
|
||||||
| Note: `StartFoo` and `RunFoo` appear as siblings. The start is the short-lived outbound RPC that completes immediately, and the run is the actual execution which may take much longer. | ||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||
|
|
||||||
| <!-- Screenshot: LangSmith trace tree with add_temporal_runs=True showing Temporal operation nodes --> | ||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Screenshot TODOs here too |
||||||
|
|
||||||
| <!-- Screenshot: Temporal UI showing the corresponding workflow execution --> | ||||||
|
|
||||||
| ## Migrating Existing LangSmith Code to Temporal | ||||||
|
|
||||||
| If you already have code with LangSmith tracing, you should be able to move it into a Temporal workflow and keep the same trace hierarchy. The plugin handles sandbox restrictions and context propagation behind the scenes, so anything that was traceable before should remain traceable after the move. More details below: | ||||||
|
|
||||||
| ### Where `@traceable` works | ||||||
|
|
||||||
| The plugin allows `@traceable` to work inside Temporal's deterministic workflow sandbox, where it normally can't run: | ||||||
|
|
||||||
| | Location | Works? | Notes | | ||||||
| |----------|--------|-------| | ||||||
| | On `@activity.defn` functions | Yes | Stack `@traceable` on top of `@activity.defn` | | ||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. So from what I read below, this fires on each retry? This could be noting briefly because I hadn't thought of that distinction. And a user who doesn't want that could use the wrapping approach you mention elsewhere. Also the code I read uses |
||||||
| | On `@workflow.defn` class | No | Use `@traceable` inside `@workflow.run` instead | | ||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I would expect it to work on
|
||||||
| | Inside workflow methods (sync or async) | Yes | Use `langsmith.traceable(name="...")(fn)()` | | ||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: Do you mean "Methods that run inside workflow definitions" ? If so it feels a touch more precise. |
||||||
| | Inside activity methods (sync or async) | Yes | Regular `@traceable` decorator | | ||||||
| | Around `client.start_workflow` / `execute_workflow` | Yes | Wrap the caller to trace the entire workflow as one unit | | ||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What about logging signals and updates? |
||||||
| | Around `execute_activity` calls | Yes | Wrap the dispatch to group related operations | | ||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. But this is just annotating a function insie a workflow method, right? (Line 147) |
||||||
|
|
||||||
| ## Replay Safety | ||||||
|
|
||||||
| Temporal workflows are deterministic and get replayed from event history on recovery. The plugin accounts for this by injecting replay-safe data into your traceable runs: | ||||||
|
|
||||||
| - **No duplicate traces on replay.** Run IDs are derived deterministically from the workflow's random seed, so replayed operations produce the same IDs and LangSmith deduplicates them. | ||||||
| - **No non-deterministic calls.** The plugin injects metadata using `workflow.now()` for timestamps and `workflow.random()` for UUIDs instead of `datetime.now()` and `uuid4()`. | ||||||
| - **Background I/O stays outside the sandbox.** LangSmith HTTP calls to the server are submitted to a background thread pool that doesn't interfere with the deterministic workflow execution. | ||||||
|
|
||||||
| You don't need to do anything special for this. Your `@traceable` functions behave the same whether it's a fresh execution or a replay. | ||||||
|
|
||||||
| ### Example: Worker crash mid-workflow | ||||||
|
|
||||||
| ``` | ||||||
| 1. Workflow starts, executes Activity A -> trace appears in LangSmith | ||||||
| 2. Worker crashes | ||||||
| 3. New worker picks up the workflow | ||||||
| 4. Workflow replays Activity A (skips execution) -> NO duplicate trace | ||||||
| 5. Workflow executes Activity B (new work) -> new trace appears | ||||||
| ``` | ||||||
|
|
||||||
| <!-- Screenshot: LangSmith showing a workflow trace that survived a worker restart with no duplicate runs --> | ||||||
|
|
||||||
| ### Example: Wrapping retriable steps in a trace | ||||||
|
|
||||||
| Since Temporal retries failed activities, you can use `@traceable` to group the attempts together: | ||||||
|
|
||||||
| ```python | ||||||
| @langsmith.traceable(name="my_step", run_type="chain") | ||||||
| async def my_step(message: str) -> str: | ||||||
| return await workflow.execute_activity( | ||||||
| call_openai, | ||||||
| ... | ||||||
| ) | ||||||
| ``` | ||||||
|
|
||||||
| This groups everything under one run: | ||||||
| ``` | ||||||
| my_step | ||||||
| Call OpenAI # first attempt | ||||||
| openai.responses.create | ||||||
| Call OpenAI # retry | ||||||
| openai.responses.create | ||||||
| ``` | ||||||
|
|
||||||
| ## Context Propagation | ||||||
|
|
||||||
| The plugin propagates trace context across process boundaries (client -> workflow -> activity -> child workflow -> nexus) via Temporal headers. You don't need to pass any context manually. | ||||||
|
|
||||||
| ``` | ||||||
| Client Process Worker Process (Workflow) Worker Process (Activity) | ||||||
| ───────────── ────────────────────────── ───────────────────────── | ||||||
| @traceable("my workflow") | ||||||
| start_workflow ──headers──> RunWorkflow | ||||||
| @traceable("session") | ||||||
| execute_activity ──headers──> RunActivity | ||||||
| @traceable("Call OpenAI") | ||||||
| openai.create(...) | ||||||
| ``` | ||||||
|
|
||||||
| ## API Reference | ||||||
|
|
||||||
| ### `LangSmithPlugin` | ||||||
|
|
||||||
| ```python | ||||||
| LangSmithPlugin( | ||||||
| client=None, # langsmith.Client instance (auto-created if None) | ||||||
| project_name=None, # LangSmith project name | ||||||
| add_temporal_runs=False, # Show Temporal operation nodes in traces | ||||||
| metadata=None, # Default metadata for all runs | ||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||||||
| tags=None, # Default tags for all runs | ||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Same comment here. |
||||||
| ) | ||||||
| ``` | ||||||
|
|
||||||
| We recommend registering the plugin on both the client and all workers. Strictly speaking, you only need it on the sides that produce traces, but adding it everywhere avoids surprises with context propagation. The client and worker don't need to share the same configuration — for example, they can use different `add_temporal_runs` settings. | ||||||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,14 @@ | ||
| """LangSmith integration for Temporal SDK. | ||
| This package provides LangSmith tracing integration for Temporal workflows, | ||
| activities, and other operations. It includes automatic run creation and | ||
| context propagation for distributed tracing in LangSmith. | ||
| """ | ||
|
|
||
| from temporalio.contrib.langsmith._interceptor import LangSmithInterceptor | ||
| from temporalio.contrib.langsmith._plugin import LangSmithPlugin | ||
|
|
||
| __all__ = [ | ||
| "LangSmithInterceptor", | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Will users directly create / interface with
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It's not mandatory I think, and we wouldn't expect them to generally use it, but we've made it public elsewhere in the theory that someone could subclass it to do some special things, but it's not a very concrete scenario. I'd be fine either way. |
||
| "LangSmithPlugin", | ||
| ] | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
While we are monkey patching, it might be wise to fix a specific major version so we can control upgrades and make sure not to break people.