Anonymous View
Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 46 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,52 @@ ADDED
uses a deterministic instance ID (`export-job-{job_id}`, exposed via
`orchestrator_instance_id_for(...)`) so callers can correlate a job ID
with its orchestrator for logging, monitoring, and restart.
- Type-aware deserialization of payloads. `OrchestrationContext.call_activity`,
`call_sub_orchestrator`, and `call_entity` accept an optional `return_type`,
and `wait_for_external_event` accepts an optional `data_type`. When provided,
the result/event payload is coerced to that type: dataclasses are
reconstructed from their dict payloads (including nested dataclass, `Optional`,
and `list` fields), and types exposing a `from_json()` classmethod are rebuilt
via that hook. When omitted, the raw deserialized JSON is returned as before.
The `return_type` / `data_type` argument also refines the static type of the
returned task (e.g. `call_activity(..., return_type=Foo)` is typed as
`CompletableTask[Foo]`).
- Inbound payloads are reconstructed from function type annotations. When an
orchestrator, activity, or entity operation annotates its input parameter with
a dataclass or a `from_json()`-capable type, the incoming payload is
automatically coerced to that type. Discovery is best-effort and conservative:
builtins and unannotated/unknown types are passed through unchanged, and a
payload that cannot be coerced falls back to the raw value.
- `call_activity` results are reconstructed from the activity's return
annotation. When an activity function reference is passed (not a string name)
and its return type is annotated with a dataclass or `from_json()`-capable
type, the result is automatically coerced to that type. An explicit
`return_type` argument takes precedence over the discovered annotation.
- Added typed accessors to `client.OrchestrationState`: `get_input()`,
`get_output()`, and `get_custom_status()` each accept an optional
`expected_type` and deserialize the corresponding `serialized_*` payload,
reconstructing dataclasses and `from_json()`-capable types. The raw
`serialized_input` / `serialized_output` / `serialized_custom_status` string
fields are retained.
- Objects exposing a `to_json()` method are now JSON-serializable when passed as
activity/orchestrator inputs or outputs.
- Entity state retrieval (`get_state(intended_type=...)`) now reconstructs
dataclasses from their stored dict payloads and supports types exposing a
`from_json()` classmethod, in addition to the existing constructor-based
coercion.

CHANGED

- Custom objects (dataclasses, `SimpleNamespace`) are now serialized as plain
JSON without an internal type marker. Decoding without a `return_type` /
`data_type` therefore yields a plain `dict` (previously a `SimpleNamespace`
for marked payloads). Pass the new type arguments to reconstruct the original
type. Payloads produced by older SDK versions (carrying the legacy marker)
continue to deserialize, including into a `SimpleNamespace` when no type is
supplied.
- JSON serialization failures now raise a `TypeError` that chains the original
error (`__cause__`) and names the offending type, making serialization issues
easier to diagnose.

## v1.5.0

Expand Down
93 changes: 92 additions & 1 deletion durabletask/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from dataclasses import dataclass
from datetime import datetime
from enum import Enum
from typing import Any, Generic, Protocol, TypeVar, cast
from typing import Any, Generic, Protocol, TypeVar, cast, overload

import grpc
import grpc.aio
Expand Down Expand Up @@ -54,6 +54,7 @@
TInput = TypeVar('TInput')
TOutput = TypeVar('TOutput')
TItem = TypeVar('TItem')
T = TypeVar('T')


class OrchestrationStatus(Enum):
Expand Down Expand Up @@ -82,6 +83,96 @@ class OrchestrationState:
serialized_custom_status: str | None
failure_details: task.FailureDetails | None

@overload
def get_input(self, expected_type: type[T]) -> T | None:
...

@overload
def get_input(self, expected_type: None = ...) -> Any:
...

def get_input(self, expected_type: type | None = None) -> Any:
"""Deserialize the orchestration's input.

Parameters
----------
expected_type : type | None
Optional type used to reconstruct the input. When provided, the
payload is coerced to this type (dataclasses are constructed from
their dict payloads, types exposing a ``from_json()`` classmethod
are reconstructed via that hook) and the return value is typed as
``expected_type | None``. When omitted, the raw deserialized JSON is
returned.

Returns
-------
Any
The deserialized input, or None if there is no input.
"""
if self.serialized_input is None:
return None
return shared.from_json(self.serialized_input, expected_type)

@overload
def get_output(self, expected_type: type[T]) -> T | None:
...

@overload
def get_output(self, expected_type: None = ...) -> Any:
...

def get_output(self, expected_type: type | None = None) -> Any:
"""Deserialize the orchestration's output.

Parameters
----------
expected_type : type | None
Optional type used to reconstruct the output. When provided, the
payload is coerced to this type (dataclasses are constructed from
their dict payloads, types exposing a ``from_json()`` classmethod
are reconstructed via that hook) and the return value is typed as
``expected_type | None``. When omitted, the raw deserialized JSON is
returned.

Returns
-------
Any
The deserialized output, or None if there is no output.
"""
if self.serialized_output is None:
return None
return shared.from_json(self.serialized_output, expected_type)

@overload
def get_custom_status(self, expected_type: type[T]) -> T | None:
...

@overload
def get_custom_status(self, expected_type: None = ...) -> Any:
...

def get_custom_status(self, expected_type: type | None = None) -> Any:
"""Deserialize the orchestration's custom status.

Parameters
----------
expected_type : type | None
Optional type used to reconstruct the custom status. When provided,
the payload is coerced to this type (dataclasses are constructed
from their dict payloads, types exposing a ``from_json()``
classmethod are reconstructed via that hook) and the return value is
typed as ``expected_type | None``. When omitted, the raw
deserialized JSON is returned.

Returns
-------
Any
The deserialized custom status, or None if there is no custom status.
"""
if self.serialized_custom_status is None:
return None
return shared.from_json(self.serialized_custom_status, expected_type)

def raise_if_failed(self):
if self.failure_details is not None:
raise OrchestrationFailedError(
Expand Down
11 changes: 2 additions & 9 deletions durabletask/entities/entity_metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from durabletask.entities.entity_instance_id import EntityInstanceId

import durabletask.internal.orchestrator_service_pb2 as pb
from durabletask.internal import shared

TState = TypeVar("TState")

Expand Down Expand Up @@ -77,15 +78,7 @@ def get_state(self, intended_type: type[TState] | None = None) -> TState | Any |
if intended_type is None or self._state is None:
return self._state

if isinstance(self._state, intended_type):
return self._state

try:
return intended_type(self._state) # type: ignore[call-arg]
except Exception as ex:
raise TypeError(
f"Could not convert state of type '{type(self._state).__name__}' to '{intended_type.__name__}'"
) from ex
return shared.coerce_to_type(self._state, intended_type)

def get_locked_by(self) -> EntityInstanceId | None:
"""Get the identifier of the worker that currently holds the lock on the entity.
Expand Down
11 changes: 2 additions & 9 deletions durabletask/internal/entity_state_shim.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from typing import Any, TypeVar, overload

import durabletask.internal.orchestrator_service_pb2 as pb
from durabletask.internal import shared

TState = TypeVar("TState")

Expand Down Expand Up @@ -31,15 +32,7 @@ def get_state(self, intended_type: type[TState] | None = None, default: TState |
if intended_type is None:
return self._current_state

if isinstance(self._current_state, intended_type):
return self._current_state

try:
return intended_type(self._current_state) # type: ignore[call-arg]
except Exception as ex:
raise TypeError(
f"Could not convert state of type '{type(self._current_state).__name__}' to '{intended_type.__name__}'"
) from ex
return shared.coerce_to_type(self._current_state, intended_type)

def set_state(self, state: Any) -> None:
self._current_state = state
Expand Down
Loading
Loading