Skip to content
Open
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 2 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -533,11 +533,8 @@ def feature_flag_is_on(workflow_id: str | None) -> bool:
def feature_flag_selector(
context: temporalio.converter.StorageDriverStoreContext, _payload: Payload
) -> temporalio.converter.StorageDriver | None:
workflow_id = None
if isinstance(context.serialization_context, temporalio.converter.WorkflowSerializationContext):
workflow_id = context.serialization_context.workflow_id
elif isinstance(context.serialization_context, temporalio.converter.ActivitySerializationContext):
workflow_id = context.serialization_context.workflow_id
wf = context.current_workflow or context.target_workflow
workflow_id = wf.id if wf else None
return my_driver if feature_flag_is_on(workflow_id) else None

options = ExternalStorage(
Expand Down
1,016 changes: 574 additions & 442 deletions temporalio/client.py

Large diffs are not rendered by default.

55 changes: 20 additions & 35 deletions temporalio/contrib/aws/s3driver/_driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,12 @@
from temporalio.api.common.v1 import Payload
from temporalio.contrib.aws.s3driver._client import S3StorageDriverClient
from temporalio.converter import (
ActivitySerializationContext,
StorageDriver,
StorageDriverActivityInfo,
StorageDriverClaim,
StorageDriverRetrieveContext,
StorageDriverStoreContext,
WorkflowSerializationContext,
StorageDriverWorkflowInfo,
)

_T = TypeVar("_T")
Expand Down Expand Up @@ -113,40 +113,25 @@ async def store(
(e.g. proto binary). The returned list is the same length as
``payloads``.
"""
workflow_id: str | None = None
activity_id: str | None = None
namespace: str | None = None
if isinstance(context.serialization_context, WorkflowSerializationContext):
workflow_id = context.serialization_context.workflow_id
namespace = context.serialization_context.namespace
if isinstance(context.serialization_context, ActivitySerializationContext):
# Prioritize workflow over activity so that the same payload that
# may be stored across workflow and activity boundaries are deduplicated.
if context.serialization_context.workflow_id:
workflow_id = context.serialization_context.workflow_id
elif context.serialization_context.activity_id:
activity_id = context.serialization_context.activity_id
namespace = context.serialization_context.namespace

# URL encode values to avoid characters that break the key format
# e.g. spaces, forward-slashes, etc.
if namespace:
namespace = urllib.parse.quote(namespace, safe="")
if workflow_id:
workflow_id = urllib.parse.quote(workflow_id, safe="")
if activity_id:
activity_id = urllib.parse.quote(activity_id, safe="")

namespace_segments = f"/ns/{namespace}" if namespace else ""

def _quote(val: str | None) -> str | None:
return urllib.parse.quote(val, safe="") if val else None

# Build context segments from the target identity.
context_segments = ""
# Prioritize workflow over activity so that the same payload that
# may be stored across workflow and activity boundaries are deduplicated.
# Workflow and Activity IDs are case sensitive.
if workflow_id:
context_segments += f"/wfi/{workflow_id}"
elif activity_id:
context_segments += f"/aci/{activity_id}"
target = context.target
namespace = _quote(target.namespace) if target is not None else None
namespace_segment = f"/ns/{namespace}" if namespace else ""
if isinstance(target, StorageDriverWorkflowInfo):
wf_type = _quote(target.type) or "null"
wf_id = _quote(target.id) or "null"
wf_run_id = _quote(target.run_id) or "null"
context_segments = f"/wt/{wf_type}/wi/{wf_id}/ri/{wf_run_id}"
elif isinstance(target, StorageDriverActivityInfo):
act_type = _quote(target.type) or "null"
act_id = _quote(target.id) or "null"
act_run_id = _quote(target.run_id) or "null"
context_segments = f"/at/{act_type}/ai/{act_id}/ri/{act_run_id}"

async def _upload(payload: Payload) -> StorageDriverClaim:
bucket = self._get_bucket(context, payload)
Expand All @@ -162,7 +147,7 @@ async def _upload(payload: Payload) -> StorageDriverClaim:

digest_segments = f"/d/sha256/{hash_digest}"

key = f"v0{namespace_segments}{context_segments}{digest_segments}"
key = f"v0{namespace_segment}{context_segments}{digest_segments}"

try:
if not await self._client.object_exists(bucket=bucket, key=key):
Expand Down
4 changes: 4 additions & 0 deletions temporalio/converter/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,11 @@
from temporalio.converter._extstore import (
ExternalStorage,
StorageDriver,
StorageDriverActivityInfo,
StorageDriverClaim,
StorageDriverRetrieveContext,
StorageDriverStoreContext,
StorageDriverWorkflowInfo,
StorageWarning,
)
from temporalio.converter._failure_converter import (
Expand Down Expand Up @@ -54,9 +56,11 @@
"ActivitySerializationContext",
"ExternalStorage",
"StorageDriver",
"StorageDriverActivityInfo",
"StorageDriverClaim",
"StorageDriverRetrieveContext",
"StorageDriverStoreContext",
"StorageDriverWorkflowInfo",
"StorageWarning",
"AdvancedJSONEncoder",
"BinaryNullPayloadConverter",
Expand Down
120 changes: 99 additions & 21 deletions temporalio/converter/_extstore.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,6 @@

from temporalio.api.common.v1 import Payload, Payloads
from temporalio.converter._payload_converter import JSONPlainPayloadConverter
from temporalio.converter._serialization_context import (
SerializationContext,
WithSerializationContext,
)

_T = TypeVar("_T")

Expand Down Expand Up @@ -92,6 +88,83 @@ class StorageDriverClaim:
"""


@dataclass(frozen=True, kw_only=True)
class StorageDriverWorkflowInfo:
"""Workflow identity information for external storage operations.

.. warning::
This API is experimental.
"""

namespace: str
"""The namespace of the workflow execution."""

id: str | None = None
"""The workflow ID."""

run_id: str | None = None
"""The workflow run ID, if available."""

type: str | None = None
"""The workflow type name, if available."""


@dataclass(frozen=True, kw_only=True)
class StorageDriverActivityInfo:
"""Activity identity information for external storage operations.

.. warning::
This API is experimental.
"""

namespace: str
"""The namespace of the activity execution."""

id: str | None = None
"""The activity ID."""

run_id: str | None = None
"""The activity run ID (only for standalone activities)."""

type: str | None = None
"""The activity type name, if available."""


@dataclass(frozen=True, kw_only=True)
class StorageDriverStoreMetadata:
"""Store-only metadata available during external storage operations.

.. warning::
This API is experimental.
"""

target: StorageDriverActivityInfo | StorageDriverWorkflowInfo | None = None
"""The workflow or activity for which this payload is being stored."""


_current_store_metadata: contextvars.ContextVar[StorageDriverStoreMetadata | None] = (
contextvars.ContextVar("_current_store_metadata", default=None)
)


@contextlib.contextmanager
def store_metadata_context(
metadata: StorageDriverStoreMetadata | None,
) -> Generator[None, None, None]:
"""Context manager that sets store metadata and resets it on exit.

If metadata is None, yields without setting anything.
"""
if metadata is None:
yield
return
token = _current_store_metadata.set(metadata)
try:
yield
finally:
_current_store_metadata.reset(token)


@dataclass(frozen=True)
class StorageDriverStoreContext:
"""Context passed to :meth:`StorageDriver.store` and ``driver_selector`` calls.
Expand All @@ -100,10 +173,18 @@ class StorageDriverStoreContext:
This API is experimental.
"""

serialization_context: SerializationContext | None = None
"""The serialization context active when this store operation was initiated,
or ``None`` if no context has been set.
"""
target: StorageDriverActivityInfo | StorageDriverWorkflowInfo | None = None
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"target" struck me as odd since it's an info rather than a target. then my second thought was that target would be "where it's stored" rather than "what's being called."
Maybe callee_info or something?

Copy link
Copy Markdown
Contributor Author

@jmaeagle99 jmaeagle99 Apr 2, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Like you said, target might not be the best name here. It implies where something should be stored, which is what the information is hinting at. It's trying to say "use this information for storing the payload in a structured manner".

Implementation wise, the target could be the current context or the other size of the serialization boundary. So calling it caller or callee would be wrong in a handful of scenarios. For example, if this was invoke for the result of a completing workflow, it's the "caller" information if it's the top-level workflow but is the "callee" information if the workflow has a parent. Because we want to store it (in terms of S3 terminology) the key prefix for the object that needs it for replay and for lifecycle management. So the API is opinionated as to what information should be used for storing in some kind of hierarchy.

Maybe:

  • context_info? Feels generic.
  • hierarchy_info? Why is the API telling me about a hierarchy when there isn't an obvious prescription of how to generate the hierarchy? Maybe the API should be offering a visitor for prescriptive ordering and layering of the information.

"""The workflow or activity for which this payload is being stored.

For payloads being stored on behalf of an explicit target (e.g. a child
workflow being started, an activity being scheduled, an external workflow
being signaled), this is that target's identity. When no explicit target
exists the current execution context (workflow or activity) is used as the
Copy link
Copy Markdown
Contributor

@drewhoskins-temporal drewhoskins-temporal Apr 1, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

separate caller_info field ? Feels like glossing over this distinction between source and target or caller and callee could result in bugs. Would rather callsites explicitly say what they want to refer to.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

During code review, we talked about the idea of providing both "caller" and "callee" information and felt that it was ambiguous as to how a driver author was supposed to use them, because you don't always store in the callee context.

When talking in terms of the S3 driver, there was an overall pattern of we always want to store the information in a key space that best suits itself for replay and for lifecycle management. That mostly meant the "target workflow" (e.g. child workflow, external signal, CaN, completing workflow that does have a parent) but is sometimes the the current workflow" (e.g. workflow activities, completing workflow that doesn't have a parent); sometimes it is the "current activity" (completing standalone activity) but is sometimes the "target activity" (client starts a standalone activity).

Just providing "caller" and "callee" information isn't enough; you also need to know standalone vs workflow activity. Maybe that's all the extra was needed but the algorithm is not easy to implement.

We felt that this determination would be the same across drivers, if they cared to store payloads in a contextual hierarchy. So it was built into the external storage layer rather than just a concern driver authors had to think about.

target instead.

The :attr:`StorageDriverWorkflowInfo.namespace` or
:attr:`StorageDriverActivityInfo.namespace` field on the target carries the
namespace for the execution, when available."""


@dataclass(frozen=True)
Expand Down Expand Up @@ -182,7 +263,7 @@ class _StorageReference:


@dataclass(frozen=True)
class ExternalStorage(WithSerializationContext):
class ExternalStorage:
"""Configuration for external storage behavior.

.. warning::
Expand Down Expand Up @@ -223,10 +304,6 @@ class ExternalStorage(WithSerializationContext):
for retrieval lookups.
"""

_context: SerializationContext | None = dataclasses.field(
init=False, default=None, repr=False, compare=False
)

_claim_converter: ClassVar[JSONPlainPayloadConverter] = JSONPlainPayloadConverter(
encoding=_REFERENCE_ENCODING.decode()
)
Expand Down Expand Up @@ -257,12 +334,6 @@ def __post_init__(self) -> None:
driver_map[name] = driver
object.__setattr__(self, "_driver_map", driver_map)

def with_context(self, context: SerializationContext) -> Self:
"""Return a copy of these options with the serialization context applied."""
result = dataclasses.replace(self)
object.__setattr__(result, "_context", context)
return result

def _select_driver(
self, context: StorageDriverStoreContext, payload: Payload
) -> StorageDriver | None:
Expand Down Expand Up @@ -292,9 +363,16 @@ def _get_driver_by_name(self, name: str) -> StorageDriver:
raise ValueError(f"No driver found with name '{name}'")
return driver

@staticmethod
def _build_store_context() -> StorageDriverStoreContext:
meta = _current_store_metadata.get()
return StorageDriverStoreContext(
target=meta.target if meta else None,
)

async def _store_payload(self, payload: Payload) -> Payload:
start_time = time.monotonic()
context = StorageDriverStoreContext(serialization_context=self._context)
context = self._build_store_context()

driver = self._select_driver(context, payload)
if driver is None:
Expand Down Expand Up @@ -335,7 +413,7 @@ async def _store_payload_sequence(
start_time = time.monotonic()

results = list(payloads)
context = StorageDriverStoreContext(serialization_context=self._context)
context = self._build_store_context()

to_store: list[tuple[int, Payload, StorageDriver]] = []
for index, payload in enumerate(payloads):
Expand Down
Loading
Loading