Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions dbosify/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,4 @@
Modules mirror ``temporalio``'s layout: ``dbosify.workflow``,
``.activity``, ``.client``, ``.worker``, ``.common``, ``.exceptions``,
``.converter``, ``.testing``. Migration from Temporal is an import-root swap.

See DESIGN.md for the architecture and compatibility contract.
"""
10 changes: 5 additions & 5 deletions dbosify/_internal/activities.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
"""Activity execution: one single-attempt DBOS step per activity type.

Each registered activity gets its own step function named ``act:{type}``
(decision §10.1: per-type naming keeps DBOS-native step listings readable
(per-type naming keeps DBOS-native step listings readable
and makes replay-mismatch detection precise). The step body resolves the
activity from the registry at execution time, so re-registering an activity
implementation takes effect without re-decoration.
Expand Down Expand Up @@ -78,7 +78,7 @@ def retry_decision(
elapsed: Optional[float],
schedule_to_close: Optional[float],
) -> Tuple[Optional[float], exceptions.RetryState]:
"""The activity retry decision (DESIGN §6.1.2), clock-independent so both
"""The activity retry decision, clock-independent so both
execution paths share it: the local path (interpreter, virtual time) and the
queued path (the ``__temporal_activity`` workflow, recorded-timestamp time).

Expand Down Expand Up @@ -193,7 +193,7 @@ async def attempt(
attempt_key = (str(meta.get("workflow_run_id", "")), int(meta.get("seq", -1)))
heartbeat_timeout = meta.get("heartbeat_timeout")
# On the queued path the workflow can't set our in-process cancel Event;
# it sets a checkpointed cancel event on its run that we poll here (§6.1.2).
# it sets a checkpointed cancel event on its run that we poll here.
queued = bool(meta.get("queued"))
cancel_target = str(meta.get("workflow_run_id", ""))
cancel_key = inbox.activity_cancel_key(str(meta.get("activity_id", "")))
Expand Down Expand Up @@ -231,8 +231,8 @@ async def call_user_activity() -> Dict[str, Any]:
activity_api._register_attempt(attempt_key, ctx)
token = activity_api._current_context.set(ctx)
try:
# Build the activity interceptor chain for this attempt (DESIGN
# §6.8): inbound wraps the invocation, outbound wraps info()/heartbeat().
# Build the activity interceptor chain for this attempt:
# inbound wraps the invocation, outbound wraps info()/heartbeat().
impl: activity_interceptor.ActivityInboundInterceptor = (
_RootActivityInbound(ctx, defn.is_async)
)
Expand Down
6 changes: 3 additions & 3 deletions dbosify/_internal/activity_interceptor.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
A worker ``Interceptor`` advertises an activity interceptor via
``intercept_activity`` and a workflow interceptor via
``workflow_interceptor_class``. Nexus interception is unsupported
(DEVIATIONS no-server) and intentionally absent.
(ARCHITECTURE no-server) and intentionally absent.

The activity chain is built per attempt in ``_internal/activities.py``
(mirroring temporalio's ``_activity.py`` chaining): inbound interceptors wrap
Expand Down Expand Up @@ -66,15 +66,15 @@ def workflow_interceptor_class(
) -> "Optional[Type[WorkflowInboundInterceptor]]":
"""Class that will be instantiated and used to intercept workflows.

Called once per workflow execution (DESIGN §6.8). The returned class
Called once per workflow execution. The returned class
must take the same constructor as
:py:meth:`WorkflowInboundInterceptor.__init__` (a single ``next``).
Returning ``None`` (the default) means this interceptor does not
intercept workflows.

Args:
input: Carries ``unsafe_extern_functions`` for parity; inert here
(dbosify has no workflow sandbox, DEVIATIONS dbos-native-management).
(dbosify has no workflow sandbox, ARCHITECTURE dbos-native-management).

Returns:
The class to construct to intercept each workflow, or ``None``.
Expand Down
2 changes: 1 addition & 1 deletion dbosify/_internal/activity_workflow.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
"""The ``__temporal_activity`` dispatcher: the cross-queue / distributed
activity path (DESIGN.md §6.1.2).
activity path.

When ``execute_activity(..., task_queue=)`` names a queue other than the
workflow's own, the interpreter does not run the activity as an in-process
Expand Down
2 changes: 1 addition & 1 deletion dbosify/_internal/attributes.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
"""Memo and search-attribute conversion, backed by DBOS native workflow
attributes (DESIGN §6.2).
attributes.

Temporal has two metadata namespaces — ``memo`` (opaque, converter-encoded,
codec-capable) and ``search_attributes`` (typed, indexed, queryable). DBOS
Expand Down
4 changes: 2 additions & 2 deletions dbosify/_internal/client_interceptor.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@
``dbosify.client.Interceptor`` exactly as it would
``temporalio.client.Interceptor``.

The ``*Input`` dataclasses are copied field-for-field from the SDK (DESIGN
§6.8) so signature parity holds; fields dbosify does not act on (e.g.
The ``*Input`` dataclasses are copied field-for-field from the SDK so
signature parity holds; fields dbosify does not act on (e.g.
``callbacks``, ``links``, ``request_id``, ``versioning_override``, ``priority``,
``rpc_metadata``/``rpc_timeout``, ``data_converter_override``) are carried but
inert. ``headers`` is *not* inert: a header set on ``start_workflow`` /
Expand Down
4 changes: 2 additions & 2 deletions dbosify/_internal/conversion.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
"""The data-conversion boundary (DESIGN §6.9).
"""The data-conversion boundary.

User values are converted to/from small tagged payload dicts at the Temporal
boundaries — where function signatures supply the type hints that rebuild the
Expand Down Expand Up @@ -29,7 +29,7 @@

from ..converter import DataConverter, Payload

# Checkpoints embed a user value as a small dict (DESIGN §6.9 envelope), not a raw
# Checkpoints embed a user value as a small dict (envelope), not a raw
# Payload: codec-less json/plain inlines its JSON; binary/codec bytes use base64.
_active_converter: DataConverter = DataConverter.default

Expand Down
18 changes: 9 additions & 9 deletions dbosify/_internal/dispatcher.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
"""Per-type DBOS workflow dispatchers and the workflow-task retry loop.

Each registered Temporal workflow type gets its own thin DBOS workflow named
``wf:{type}`` (resolved decision §10.1) that delegates to the interpreter.
``wf:{type}`` that delegates to the interpreter.
Per-type registration keeps DBOS-native listing/filtering by name working
and lets clients enqueue by name without importing user code.

Workflow-task failure semantics (§4.2): a non-failure exception from
Workflow-task failure semantics: a non-failure exception from
workflow code does NOT fail the workflow. The dispatcher logs loudly, sleeps
with capped backoff (a real sleep — this is outside the deterministic
boundary), rewinds the DBOS checkpoint cursor (``ctx.function_id``) to where
Expand Down Expand Up @@ -128,10 +128,10 @@ def register_worker(
registry.set_worker_interceptors(interceptors)
registry.set_worker_task_queue(task_queue)
registry.set_worker_namespace(namespace)
# The generic schedule-fire dispatcher is process-global (§6.7); register
# The generic schedule-fire dispatcher is process-global; register
# it so this worker can run schedules whose action targets it.
register_schedule_dispatcher()
# The generic queued-activity dispatcher (§6.1.2) is likewise process-global,
# The generic queued-activity dispatcher is likewise process-global,
# registered for every worker so any host can run cross-queue activities.
register_activity_dispatcher()
for cls in workflows:
Expand Down Expand Up @@ -176,7 +176,7 @@ async def dispatch(payload: Any) -> Any:
# status maps to CONTINUED_AS_NEW and awaiters follow new_run_id.
raise SerializedContinueAsNew({"new_run_id": can.new_run_id}) from None
except WorkflowCancelled as cancelled:
# Cooperative cancellation maps to CANCELED (§6.2), distinct from
# Cooperative cancellation maps to CANCELED, distinct from
# FAILED and TERMINATED; it ends the chain (no retry, no cron).
raise SerializedWorkflowCancellation(
serialize_failure(cancelled.cause)
Expand Down Expand Up @@ -403,8 +403,8 @@ async def _enqueue_next_run(
return new_run_id


# Schedules (§6.7): per-occurrence fire dispatcher — enforces bounds/jitter, applies
# the overlap policy (DEVIATIONS schedules, _apply_overlap_policy), starts the action.
# Schedules: per-occurrence fire dispatcher — enforces bounds/jitter, applies
# the overlap policy (ARCHITECTURE schedules, _apply_overlap_policy), starts the action.

SCHEDULE_FIRE_NAME = "__temporal_schedule_fire"
_schedule_dispatcher_registered = False
Expand Down Expand Up @@ -487,7 +487,7 @@ async def _apply_overlap_policy(
await DBOS.cancel_workflow_async(prior, cancel_children=True)
elif overlap == _OVERLAP_CANCEL_OTHER:
# Cooperative cancel (lets the running action's cleanup run); we do not
# wait for it to unwind before starting the next (DEVIATIONS schedules).
# wait for it to unwind before starting the next (ARCHITECTURE schedules).
await DBOS.send_async(
prior, inbox.cancel_envelope("schedule overlap"), inbox.INBOX_TOPIC
)
Expand All @@ -507,7 +507,7 @@ async def _running_prior_occurrence(
statuses in a single batch; the most recent occurrence that actually left an
action row (skipped fires leave none) is the candidate, and it counts as a
running prior iff still open. Bounded to the most recent
``_OVERLAP_LOOKBACK_LIMIT`` fires (DEVIATIONS schedules)."""
``_OVERLAP_LOOKBACK_LIMIT`` fires (ARCHITECTURE schedules)."""
base = action["id"]
schedule_name = context["schedule_id"]
before_epoch = int(fired_at.timestamp())
Expand Down
2 changes: 1 addition & 1 deletion dbosify/_internal/enqueue.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
"""Shared in-workflow run enqueue (§6.4 chain hops).
"""Shared in-workflow run enqueue.

Every durable chain hop starts one run under a deterministic id with the same
DBOS context plumbing: continue-as-new (``interpreter._begin_continue_as_new``),
Expand Down
10 changes: 5 additions & 5 deletions dbosify/_internal/ids.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
"""Temporal workflow IDs vs DBOS workflow IDs (DESIGN.md §6.4).
"""Temporal workflow IDs vs DBOS workflow IDs.

DBOS allows exactly one execution per DBOS workflow id, ever; Temporal
allows reusing a workflow id across *closed* runs, each run having its own
run id. Scheme (resolved decision §10.3):
run id. Scheme:

- run n of Temporal id W has DBOS id ``W`` for n=0, else ``W--r{n}``
- the user-visible ``run_id`` IS that DBOS id (stable and unique;
Expand All @@ -13,7 +13,7 @@
from typing import Any, Awaitable, Callable, Dict, List, Optional, Sequence, Tuple

RUN_SEPARATOR = "--r"
# Cross-queue activity workflow ids are ``{run}--a{seq}`` (§6.1.2). Reserving this
# Cross-queue activity workflow ids are ``{run}--a{seq}``. Reserving this
# separator keeps an activity workflow id from colliding with any user/child id.
ACTIVITY_SEPARATOR = "--a"

Expand Down Expand Up @@ -54,7 +54,7 @@ def run_dbos_id(workflow_id: str, run_index: int) -> str:


def activity_dbos_id(run_id: str, seq: int) -> str:
"""The ``__temporal_activity`` workflow id for a cross-queue activity (§6.1.2).
"""The ``__temporal_activity`` workflow id for a cross-queue activity.
Deterministic in ``seq``, and in the reserved ``--a`` namespace so it cannot
collide with any user/child/run id."""
return f"{run_id}{ACTIVITY_SEPARATOR}{seq}"
Expand All @@ -64,7 +64,7 @@ def replay_scratch_id(run_id: str, mode: str, suffix: str = "") -> str:
"""The DBOS id for a replay scratch fork of ``run_id``.

The mode travels *in the id* so a worker in a different process than the
forker recognizes the replay (DEVIATIONS replay, query-on-closed rehydrate).
forker recognizes the replay (ARCHITECTURE replay, query-on-closed rehydrate).
``suffix`` is a unique token per operation (the request id for a query, a
fresh token for a verification), so concurrent replays/queries of the same run
each get their own scratch and never collide on one id."""
Expand Down
6 changes: 3 additions & 3 deletions dbosify/_internal/inbox.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
INBOX_TOPIC = "__dbosify_inbox"

# The topic a queued activity workflow parks on for external completion
# (raise_complete_async, §6.1.2): AsyncActivityHandle sends the envelope here.
# (raise_complete_async): AsyncActivityHandle sends the envelope here.
ASYNC_COMPLETE_TOPIC = "__dbosify_async_complete"

# recv timeout per wait; on (checkpointed, deterministic) timeout the
Expand Down Expand Up @@ -112,8 +112,8 @@ def async_activity_gone_key(activity_id: str) -> str:


def activity_cancel_key(activity_id: str) -> str:
"""Event set on the workflow run when a cross-queue activity is cancelled
(§6.1.2): the activity's attempt step on the other worker polls it and, when
"""Event set on the workflow run when a cross-queue activity is cancelled:
the activity's attempt step on the other worker polls it and, when
set, delivers cancellation into the running activity. (The local path uses an
in-process threading.Event instead — same process, no event needed.)"""
return f"__dbosify_act_{activity_id}_cancel"
Expand Down
Loading
Loading