Add LangSmith tracing plugin for Temporal workflows#1369
Add LangSmith tracing plugin for Temporal workflows#1369
Conversation
2803b95 to
768ac70
Compare
Implements a LangSmith contrib plugin that creates trace hierarchies for Temporal operations (workflows, activities, signals, queries, updates, child workflows, Nexus). Supports ambient @Traceable context propagation, replay-safe tracing, and an add_temporal_runs toggle for lightweight context-only mode. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…late - Add ReplaySafeRunTree wrapper that handles replay skipping and sandbox safety (post/end/patch no-op during replay, sandbox_unrestricted in workflow context), inspired by OTel plugin's _ReplaySafeSpan pattern - Add config.maybe_run() to eliminate repeated config kwargs at every call site - Add _traced_call (client outbound) and _traced_outbound (workflow outbound) helpers to reduce interceptor methods to one-liners - Fold _extract_context into _workflow_maybe_run for workflow inbound - Remove _safe_post, _safe_patch helpers (internalized in wrapper) - Remove in_workflow parameter from _maybe_run (wrapper detects it) - Establish consistent wrapping invariant: all run references are ReplaySafeRunTree, unwrapping is unconditional ._run at RunTree constructor boundary - Parametrize redundant unit tests (client outbound, workflow inbound/outbound) and remove duplicate test - Remove _make_interceptor test helper, use LangSmithInterceptor directly - Collapse plugin constructor tests into one, add comprehensive plugin integration test, remove redundant sandbox tests Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Fix ruff I001 import sorting violations in _interceptor.py and test_integration.py. Extract _get_current_run_safe() helper for reading ambient LangSmith context with replay safety. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
- Change add_temporal_runs default to False in both plugin and interceptor (reviewer preference for opt-in behavior) - Rename plugin to langchain.LangSmithPlugin per organization.PluginName convention - Prefix header key with _temporal- to avoid collisions - Update all tests to explicitly pass add_temporal_runs=True Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
- Add @Traceable call (outer_chain) directly in ComprehensiveWorkflow to test non-deterministic tracing alongside deterministic replay - Set max_cached_workflows=0 on all test workers to force replay on every workflow task, exposing header non-determinism - Restructure comprehensive tests with mid-workflow worker restart: one shared collector across two worker lifetimes proves context propagates via headers, not cached plugin state - Add is_waiting_for_signal query and poll helper for deterministic sync (no arbitrary sleeps) - Consolidate make_mock_ls_client in conftest.py, remove unused fixtures, use raw client for polling to avoid trace contamination - Tests are expected to fail (TDD): sandbox blocks @Traceable in workflows, max_cached_workflows=0 exposes outputs=None on eviction Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Move RunTree.post()/patch() I/O off the workflow task thread to a single-worker ThreadPoolExecutor, preventing deadlocks from compressed_traces.lock contention with the LangSmith drain thread. Key changes: - _ReplaySafeRunTree.create_child() override propagates replay safety and deterministic IDs to nested @langsmith.traceable calls - Executor-backed post()/patch() with FIFO ordering and fire-and-forget error logging via Future.add_done_callback - _ContextBridgeRunTree for add_temporal_runs=False without external context — invisible parent that produces root @Traceable runs - aio_to_thread patch simplified: removed harmful replay-time tracing disable, added error gate for async @Traceable without plugin - Plugin shutdown via SimplePlugin.run_context instead of dead method - Fix misleading comments referencing test artifacts instead of production reasons, remove OTel cross-references - Strict dump_runs catches dangling parent_run_id references - Add **/CLAUDE.md to .gitignore Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Replace ~35 Any annotations across _plugin.py and _interceptor.py with precise types (langsmith.Client, RunTree, _ReplaySafeRunTree, specific SDK interceptor input types, etc.). Add _InputWithHeaders Protocol for private helpers matching the OTel interceptor pattern. Narrow return types to match base class signatures exactly. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Prefix unused mock parameters with underscore (_args, _kwargs) and rename unused variable (_collector) to satisfy basedpyright's reportUnusedParameter and reportUnusedVariable checks. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
- Remove useless _get_current_run_safe wrapper (inline get_current_run_tree) - Restore generic type params on interceptor return types (ActivityHandle[Any], ChildWorkflowHandle[Any, Any]) to match base class exactly - Fix _make_bridge return type (Any → _ContextBridgeRunTree) - Fix _poll_query helper types (Any → WorkflowHandle, Callable) - Strengthen weak assertions in mixed sync/async integration tests - Add _InputWithHeaders Protocol for private helper input params Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Wrap all 5 activity definitions with @Traceable as outer decorator to test LangSmith tracing through the full activity execution path. Update all 9 expected trace hierarchies to account for the additional @Traceable run nested under each RunActivity. Fix outputs assertion to only check interceptor runs (colon-prefixed names) since @Traceable captures actual return values rather than the interceptor's {'status': 'ok'}. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Bug 1: Replace stale _current_run snapshot with ambient context in outbound interceptor. Add _get_current_run_for_propagation() helper that filters _ContextBridgeRunTree from ambient context. Outbound methods now read get_current_run_tree() for @Traceable nesting instead of a frozen reference from workflow entry. Bug 2: Add tracing_context() to Nexus inbound interceptor for both execute_nexus_operation_start and execute_nexus_operation_cancel, matching the activity inbound pattern. Ensures @Traceable functions in Nexus handlers have a LangSmith client even with add_temporal_runs=False. Remove handler suppression (is_handler check, _workflow_is_active flag) to align with OTel interceptor which creates spans for all handlers unconditionally. Add dump_traces() to test infrastructure for per-root-trace assertions. Restructure comprehensive tests so user_pipeline only wraps start_workflow, with polling/signals/queries as independent root traces. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Built-in queries like __temporal_workflow_metadata, __stack_trace, and __enhanced_stack_trace are fired automatically by infrastructure (e.g. the Temporal Web UI) and are not user-facing. Filter them out of LangSmith traces when add_temporal_runs=True to reduce noise. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
f11f660 to
d9fb85a
Compare
| interceptor._client.flush() | ||
|
|
||
| super().__init__( | ||
| "langchain.LangSmithPlugin", |
There was a problem hiding this comment.
I can see why we would want to add the organization name here but maybe it should be "temporalio" since we're the owners?
Also, seems to be missing here:
There was a problem hiding this comment.
@tconley1428 is this what you meant when you said organization.PluginName? maybe I misunderstood.
There was a problem hiding this comment.
Yeah we came to this consensus after the openai plugin happened. @drewhoskins-temporal for whether we want to represent ownership or more of a relevance thing.
There was a problem hiding this comment.
We did put google.* on the ADK one that we wrote.
The isinstance check that raised RuntimeError("Use the LangSmith
plugin...") was unreachable — when the plugin is active,
_workflow_maybe_run always provides a _ReplaySafeRunTree parent,
so _setup_run always returns a _ReplaySafeRunTree child.
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
- Replace :class:`RunTree` cross-references with backtick literals in docstrings to fix pydoctor build failure (exit status 3). - Add run ID dedup to InMemoryRunCollector.record_create to match real LangSmith API upsert semantics. Fixes flaky Windows CI failure where combined replay+new-event activations caused duplicate trace records with deterministic IDs. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
|
|
||
|
|
||
| # --------------------------------------------------------------------------- | ||
| # Sandbox safety: patch @traceable's aio_to_thread |
There was a problem hiding this comment.
It's not a sandbox safety thing, it simply cannot do run_in_executor, sandbox or no.
|
|
||
| import contextvars | ||
|
|
||
| async def _safe_aio_to_thread( |
There was a problem hiding this comment.
I'm not sure this is actually what we would want to patch it to. This means the trace calls to the network take place on the workflow thread, which could cause timeouts if your connection is slow.
| if _is_replaying(): | ||
| return | ||
| if temporalio.workflow.in_workflow(): | ||
| with temporalio.workflow.unsafe.sandbox_unrestricted(): |
There was a problem hiding this comment.
You shouldn't need pretty much any of these unrestricteds
| "enabled": True, | ||
| } | ||
| # When add_temporal_runs=False and no external parent, create a | ||
| # _ContextBridgeRunTree so @traceable calls get a _ReplaySafeRunTree |
There was a problem hiding this comment.
I don't really follow the name of this thing or what it is for.
| elif temporalio.workflow.in_workflow(): | ||
| # Read-only context (e.g. query handler) — use workflow.uuid4() | ||
| try: | ||
| kwargs["id"] = temporalio.workflow.uuid4() |
There was a problem hiding this comment.
Have you validated this? I believe the whole point here is that you cannot use uuid4 inside a readonly operation.
| kwargs["id"] = temporalio.workflow.uuid4() | ||
| kwargs["start_time"] = temporalio.workflow.now() | ||
| except Exception: | ||
| pass # Not in a real workflow context (e.g., unit test mock) |
There was a problem hiding this comment.
This is quite sketchy. Just suppress all exceptions, notably the one telling you workflow.uuid4 cannot be used in a read only context.
| if parent is None: | ||
| parent = get_current_run_tree() | ||
|
|
||
| kwargs: dict[str, Any] = dict( |
There was a problem hiding this comment.
The way this turns args into kwargs seems really strange.
| if project_name is not None: | ||
| kwargs["project_name"] = project_name | ||
| if parent is not None: | ||
| # Unwrap _ReplaySafeRunTree so RunTree gets the real parent |
There was a problem hiding this comment.
Why is this important?
| parent: RunTree | None = None, | ||
| project_name: str | None = None, | ||
| executor: ThreadPoolExecutor, | ||
| ) -> Iterator[_ReplaySafeRunTree | None]: |
There was a problem hiding this comment.
Do you ever actually use the run tree yielded from here? If not it can just yield None regardless
| super().__init__() | ||
| # Import langsmith.Client lazily to avoid hard dependency at import time | ||
| if client is None: | ||
| import langsmith |
There was a problem hiding this comment.
Lazy imports aren't usually a good solution. Why can't the langsmith interceptor import langsmith?
| # and the client is available even without a parent. | ||
| # Override the parent's ls_client so @traceable children (via create_child) | ||
| # use the plugin's client rather than lazily creating a real one. | ||
| if parent is not None and hasattr(parent, "ls_client"): |
There was a problem hiding this comment.
I don't understand the purpose of this either. Why is it important to use this other langsmith client?
Summary
temporalio.contrib.langsmithplugin that creates LangSmith trace hierarchies for Temporal operations (workflows, activities, signals, queries, updates, child workflows, Nexus)@traceablecontext propagation through Temporal headers, replay-safe tracing, and anadd_temporal_runstoggle for lightweight context-only mode🤖 Generated with Claude Code