-
Notifications
You must be signed in to change notification settings - Fork 169
Add LangSmith tracing plugin for Temporal workflows #1369
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
+5,086
−7
Merged
Changes from 41 commits
Commits
Show all changes
46 commits
Select commit
Hold shift + click to select a range
3981cea
Add LangSmith tracing plugin for Temporal workflows
xumaple df9a55c
Refactor LangSmith interceptor: add ReplaySafeRunTree, reduce boilerp…
xumaple 5d25abc
Fix import sorting and extract _get_current_run_safe helper
xumaple 601d67a
Add Nexus integration test coverage
xumaple cdb5886
Apply ruff formatting to all langsmith files
xumaple 941637f
Fix pydocstyle, pyright, and mypy lint errors
xumaple 2fa9571
Fix basedpyright errors and add CLAUDE.md with CI lint docs
xumaple 7623d43
Fix all basedpyright warnings (deprecated imports, unused params)
xumaple a3c0bee
Clean up unused env params: use type:ignore consistently
xumaple 982d220
Address PR review feedback: defaults, naming, and header key
xumaple ad67096
Add replay safety and worker restart tests for LangSmith plugin
xumaple 197a2d3
Implement background thread I/O for LangSmith workflow tracing
xumaple 96c139b
Replace unnecessary Any type annotations with specific types
xumaple d1e66c4
Fix basedpyright warnings in test files
xumaple ee09c1f
Clean up types, dead code, and test assertions
xumaple 4e70ef8
Fix formatting in test_integration.py
xumaple 2b84421
Add @traceable to all activity definitions in integration tests
xumaple ded720b
tests
xumaple 6ff9cc9
Fix context propagation bugs and remove handler suppression
xumaple d9fb85a
Skip LangSmith tracing for built-in Temporal queries
xumaple d271aba
Remove dead error gate from _safe_aio_to_thread
xumaple 4f5d040
Fix pydoctor cross-refs and mock collector trace duplication
xumaple 54d47a9
Address PR review feedback: comments, end() determinism, yield simpli…
xumaple c37bac8
Create per-worker LangSmith interceptors instead of sharing one acros…
xumaple a232d16
Remove unnecessary sandbox_unrestricted from post/patch in _ReplaySaf…
xumaple 5bdc3f4
Rename _ContextBridgeRunTree to _RootReplaySafeRunTreeFactory
xumaple c6db234
Rename overloaded kwargs/ctx_kwargs variables in LangSmith interceptor
xumaple 0fc2ab3
Clean up parent post-processing in LangSmith interceptor
xumaple 2853299
Make StartFoo and RunFoo siblings instead of parent-child in LangSmit…
xumaple 1b38567
Add README for LangSmith plugin
xumaple 7d8c19a
Share one langsmith.Client across all interceptors
xumaple c2490b0
Add langsmith optional dependency and install instructions
xumaple 5ca84e7
Delete duplicate test_constructor_requires_executor test
xumaple 75115a5
Revert to single shared LangSmithInterceptor
xumaple cf01699
Pin langsmith dependency to 0.7.x
xumaple 1928927
Improve README and rename plugin params to match interceptor API
xumaple 71e24f6
Improve README and rename plugin params to match interceptor API
xumaple 3ad85bd
Consolidate SimpleNexusWorkflow into TraceableActivityWorkflow
xumaple c456106
Consolidate test workflows and verify ValidateUpdate elision
xumaple 5ac12f0
Extract find_traces helper for test trace filtering
xumaple a81e8d6
Add screenshots and polish README examples
xumaple 22d9838
Merge remote-tracking branch 'origin/main' into maplexu/langsmith-plugin
xumaple d119c94
Use uv add in README install instructions
xumaple 491b847
Merge branch 'main' into maplexu/langsmith-plugin
xumaple 4027019
Simplify _poll_query to rely on pytest timeout
xumaple fd6e4d6
Merge branch 'maplexu/langsmith-plugin' of github.com:temporalio/sdk-…
xumaple File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,244 @@ | ||
| # LangSmith Plugin for Temporal Python SDK | ||
|
|
||
| This Temporal [Plugin](https://docs.temporal.io/develop/plugins-guide) allows your [LangSmith](https://smith.langchain.com/) traces to work within Temporal Workflows. It propagates trace context across Worker boundaries so that `@traceable` calls, LLM invocations, and Temporal operations show up in a single connected trace, and ensures that replaying does not generate duplicate traces. | ||
|
|
||
| ## Quick Start | ||
|
|
||
| Install Temporal with the LangSmith feature enabled: | ||
|
|
||
| ```bash | ||
| pip install temporalio[langsmith] | ||
xumaple marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| ``` | ||
|
|
||
| Register the Plugin on your Temporal Client. You need it on both the Client (starter) side and the Workers: | ||
|
|
||
| ```python | ||
| from temporalio.client import Client | ||
| from temporalio.contrib.langsmith import LangSmithPlugin | ||
|
|
||
| client = await Client.connect( | ||
| "localhost:7233", | ||
| plugins=[LangSmithPlugin(project_name="my-project")], | ||
| ) | ||
| ``` | ||
xumaple marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
|
||
| Once that's set up, any `@traceable` function inside your Workflows and Activities will show up in LangSmith with correct parent-child relationships, even across Worker boundaries. | ||
|
|
||
| ## Example: AI Chatbot | ||
|
|
||
| A conversational chatbot using OpenAI, orchestrated by a Temporal Workflow. The Workflow stays alive waiting for user messages via Signals, and dispatches each message to an Activity that calls the LLM. | ||
|
|
||
| ### Activity (Wraps the LLM Call) | ||
|
|
||
| ```python | ||
| from langsmith import traceable | ||
|
|
||
| @traceable(name="Call OpenAI", run_type="chain") | ||
| @activity.defn | ||
| async def call_openai(request: OpenAIRequest) -> Response: | ||
| client = wrap_openai(AsyncOpenAI()) # This is a traced langsmith function | ||
| return await client.responses.create( | ||
| model=request.model, | ||
| input=request.input, | ||
| instructions=request.instructions, | ||
| ) | ||
| ``` | ||
|
|
||
| ### Workflow (Orchestrates the Conversation) | ||
|
|
||
| ```python | ||
| @workflow.defn | ||
| class ChatbotWorkflow: | ||
| @workflow.run | ||
| async def run(self) -> str: | ||
| # @traceable works inside Workflows — fully replay-safe | ||
| now = workflow.now().strftime("%b %d %H:%M") | ||
| return await traceable( | ||
| name=f"Session {now}", run_type="chain", | ||
| )(self._run_with_trace)() | ||
|
|
||
| async def _run_with_trace(self) -> str: | ||
| while not self._done: | ||
| await workflow.wait_condition( | ||
| lambda: self._pending_message is not None or self._done | ||
| ) | ||
| if self._done: | ||
| break | ||
|
|
||
| message = self._pending_message | ||
| self._pending_message = None | ||
|
|
||
| @traceable(name=f"Query: {message[:60]}", run_type="chain") | ||
| async def _query(msg: str) -> str: | ||
| response = await workflow.execute_activity( | ||
| call_openai, | ||
| OpenAIRequest(model="gpt-4o-mini", input=msg), | ||
| start_to_close_timeout=timedelta(seconds=60), | ||
| ) | ||
| return response.output_text | ||
|
|
||
| self._last_response = await _query(message) | ||
|
|
||
| return "Session ended." | ||
| ``` | ||
|
|
||
| ### Worker | ||
|
|
||
| ```python | ||
| client = await Client.connect( | ||
| "localhost:7233", | ||
| plugins=[LangSmithPlugin(project_name="chatbot")], | ||
| ) | ||
|
|
||
| worker = Worker( | ||
| client, | ||
| task_queue="chatbot", | ||
| workflows=[ChatbotWorkflow], | ||
| activities=[call_openai], | ||
| ) | ||
| await worker.run() | ||
| ``` | ||
|
|
||
| ### What you see in LangSmith | ||
|
|
||
| With the default configuration (`add_temporal_runs=False`), the trace contains only your application logic: | ||
|
|
||
| ``` | ||
| Session Apr 03 14:30 | ||
| Query: "What's the weather in NYC?" | ||
| Call OpenAI | ||
| openai.responses.create (auto-traced by wrap_openai) | ||
| ``` | ||
|
|
||
| An actual look at the LangSmith UI: | ||
|
|
||
|  | ||
|
|
||
| ## `add_temporal_runs` — Temporal Operation Visibility | ||
|
|
||
| By default, `add_temporal_runs` is `False` and only your `@traceable` application logic appears in traces. Setting it to `True` also adds Temporal operations (StartWorkflow, RunWorkflow, StartActivity, RunActivity, etc.): | ||
|
|
||
| ```python | ||
| plugins=[LangSmithPlugin(project_name="my-project", add_temporal_runs=True)] | ||
| ``` | ||
|
|
||
| This adds Temporal operation nodes to the trace tree so that the orchestration layer is visible alongside your application logic. If the caller wraps `start_workflow` in a `@traceable` function, the full trace looks like: | ||
|
|
||
| ``` | ||
| Ask Chatbot # @traceable wrapper around client.start_workflow | ||
| StartWorkflow:ChatbotWorkflow | ||
| RunWorkflow:ChatbotWorkflow | ||
| Session Apr 03 14:30 | ||
| Query: "What's the weather in NYC?" | ||
| StartActivity:call_openai | ||
| RunActivity:call_openai | ||
| Call OpenAI | ||
| openai.responses.create | ||
| ``` | ||
|
|
||
| Note: `StartFoo` and `RunFoo` appear as siblings. The start is the short-lived outbound RPC that enqueues work on a task queue and completes immediately, and the run is the actual execution which may be delayed and may take much longer. | ||
|
|
||
| An actual look at the LangSmith UI: | ||
|
|
||
|  | ||
|
|
||
| And here is a waterfall view of the Workflow in Temporal UI: | ||
|
|
||
|  | ||
|
|
||
| ## Migrating Existing LangSmith Code to Temporal | ||
|
|
||
| If you already have code with LangSmith tracing, you should be able to move it into a Temporal Workflow and keep the same trace hierarchy. The Plugin handles sandbox restrictions and context propagation behind the scenes, so anything that was traceable before should remain traceable after the move. More details below: | ||
|
|
||
| ### Where `@traceable` Works | ||
|
|
||
| The Plugin allows `@traceable` to work inside Temporal's deterministic Workflow sandbox, where it normally can't run. Note that `@traceable` on an Activity fires on each retry. | ||
|
|
||
| | Location | Works? | Notes | | ||
| |-------------------------------|--------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| | ||
| | Inside Workflow methods | Yes | Traces called from inside `@workflow.run`, `@workflow.signal`, etc.; can trace sync and async methods | | ||
| | Inside Activity methods | Yes | Traces called from inside `@activity.defn`; can trace sync and async methods | | ||
| | On `@activity.defn` functions | Yes | Must stack `@traceable` decorator on top of `@activity.defn` decorator for correct functionality. *Note*: This trace fires on every retry; see [Wrapping Retriable Steps section](#example-wrapping-retriable-steps-in-a-trace) for more info | | ||
| | On `@workflow.defn` classes | No | Use `@traceable` inside `@workflow.run` instead. Decorating the workflow class or the `@workflow.run` function is not supported. | | ||
|
|
||
| ## Replay Safety | ||
|
|
||
| Temporal Workflows are deterministic and get replayed from event history on recovery. The Plugin accounts for this by injecting replay-safe data into your traceable runs: | ||
|
|
||
| - **No duplicate traces on replay.** Run IDs are derived deterministically from the Workflow's random seed, so replayed operations produce the same IDs and LangSmith deduplicates them. | ||
| - **No non-deterministic calls.** The Plugin injects metadata using `workflow.now()` for timestamps and `workflow.random()` for UUIDs instead of `datetime.now()` and `uuid4()`. | ||
| - **Background I/O stays outside the sandbox.** LangSmith HTTP calls to the server are submitted to a background thread pool that doesn't interfere with the deterministic Workflow execution. | ||
|
|
||
| You don't need to do anything special for this. Your `@traceable` functions behave the same whether it's a fresh execution or a replay. | ||
|
|
||
| ### Example: Worker Crash Mid-Workflow | ||
|
|
||
| ``` | ||
| 1. Workflow starts, executes Activity A -> trace appears in LangSmith | ||
| 2. Worker crashes during Activity B | ||
| 3. New Worker picks up the Workflow | ||
| 4. Workflow replays Activity A (skips execution) -> NO duplicate trace | ||
| 5. Workflow executes Activity B (new work) -> new trace appears | ||
| ``` | ||
|
|
||
| As you can see in the UI example below, a crash in the `Call OpenAI` activity didn't cause earlier traces to be duplicated: | ||
|
|
||
|  | ||
|
|
||
| ### Example: Wrapping Retriable Steps in a Trace | ||
|
|
||
| Since Temporal retries failed Activities, you can use an outer `@traceable` to group the attempts together: | ||
|
|
||
| ```python | ||
| @traceable(name="Call OpenAI", run_type="llm") | ||
| @activity.defn | ||
| async def call_openai(...): | ||
| ... | ||
|
|
||
| @traceable(name="my_step", run_type="chain") | ||
| async def my_step(message: str) -> str: | ||
| return await workflow.execute_activity( | ||
| call_openai, | ||
| ... | ||
| ) | ||
| ``` | ||
|
|
||
| This groups everything under one run: | ||
| ``` | ||
| my_step | ||
| Call OpenAI # first attempt | ||
| openai.responses.create | ||
| Call OpenAI # retry | ||
| openai.responses.create | ||
| ``` | ||
|
|
||
| ## Context Propagation | ||
|
|
||
| The Plugin propagates trace context across process boundaries (Client -> Workflow -> Activity -> Child Workflow -> Nexus) via Temporal headers. You don't need to pass any context manually. | ||
|
|
||
| ``` | ||
| Client Process Worker Process (Workflow) Worker Process (Activity) | ||
| ───────────── ────────────────────────── ───────────────────────── | ||
| @traceable("my workflow") | ||
| start_workflow ──headers──> RunWorkflow | ||
| @traceable("session") | ||
| execute_activity ──headers──> RunActivity | ||
| @traceable("Call OpenAI") | ||
| openai.create(...) | ||
| ``` | ||
|
|
||
| ## API Reference | ||
|
|
||
| ### `LangSmithPlugin` | ||
|
|
||
| ```python | ||
| LangSmithPlugin( | ||
| client=None, # langsmith.Client instance (auto-created if None) | ||
| project_name=None, # LangSmith project name | ||
| add_temporal_runs=False, # Show Temporal operation nodes in traces | ||
| default_metadata=None, # Custom metadata attached to all LangSmith traces (https://docs.smith.langchain.com/observability/how_to_guides/add_metadata_tags) | ||
| default_tags=None, # Custom tags attached to all LangSmith traces (see link above) | ||
| ) | ||
| ``` | ||
|
|
||
| We recommend registering the Plugin on both the Client and all Workers. Strictly speaking, you only need it on the sides that produce traces, but adding it everywhere avoids surprises with context propagation. The Client and Worker don't need to share the same configuration — for example, they can use different `add_temporal_runs` settings. | ||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,14 @@ | ||
| """LangSmith integration for Temporal SDK. | ||
|
|
||
| This package provides LangSmith tracing integration for Temporal workflows, | ||
| activities, and other operations. It includes automatic run creation and | ||
| context propagation for distributed tracing in LangSmith. | ||
| """ | ||
|
|
||
| from temporalio.contrib.langsmith._interceptor import LangSmithInterceptor | ||
| from temporalio.contrib.langsmith._plugin import LangSmithPlugin | ||
|
|
||
| __all__ = [ | ||
| "LangSmithInterceptor", | ||
xumaple marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| "LangSmithPlugin", | ||
| ] | ||
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.