Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ pip install dbosify
```

This is a drop-in replacement: simply import `dbosify` instead of `temporalio` and connect your clients and workers to a Postgres database instead of a Temporal server.
Further documentation [here](https://docs.dbos.dev/explanations/migrating-from-temporal).
Further documentation [here](https://docs.dbos.dev/explanations/dbosify).

```python
import asyncio
Expand Down
50 changes: 40 additions & 10 deletions dbosify/_internal/activities.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,8 +85,8 @@ def retry_decision(
Returns ``(backoff delay before the next attempt, or None to give up;
the retry state to report when giving up)``. ``elapsed`` is the time since
the activity was scheduled; pass it (with ``schedule_to_close``) so the
schedule-to-close budget gates retries — like Temporal, it bounds the retry
sequence, not an in-flight attempt (``start_to_close`` bounds that).
schedule-to-close budget gates retries between attempts. An in-flight attempt
is bounded separately by :func:`effective_deadline`.
"""
if failure.get("non_retryable"):
return None, exceptions.RetryState.NON_RETRYABLE_FAILURE
Expand All @@ -100,6 +100,11 @@ def retry_decision(
and failure_type in set(policy.non_retryable_error_types)
):
return None, exceptions.RetryState.NON_RETRYABLE_FAILURE
# A schedule-to-close timeout is terminal, never retried (Temporal parity).
if failure.get("cls") == "TimeoutError" and failure.get("timeout_type") == int(
exceptions.TimeoutType.SCHEDULE_TO_CLOSE
):
return None, exceptions.RetryState.TIMEOUT
if policy.maximum_attempts and attempt >= policy.maximum_attempts:
return None, exceptions.RetryState.MAXIMUM_ATTEMPTS_REACHED
override = failure.get("next_retry_delay")
Expand All @@ -121,6 +126,20 @@ def retry_decision(
return delay, exceptions.RetryState.IN_PROGRESS


def effective_deadline(
start_to_close: Optional[float], remaining_schedule_to_close: Optional[float]
) -> Tuple[Optional[float], str]:
"""``min(start_to_close, remaining schedule_to_close)`` + the binding bound's label (so a timeout reports the right type); ``None`` is unbounded."""
candidates: List[Tuple[float, str]] = []
if start_to_close is not None:
candidates.append((max(0.0, start_to_close), "start_to_close"))
if remaining_schedule_to_close is not None:
candidates.append((max(0.0, remaining_schedule_to_close), "schedule_to_close"))
if not candidates:
return None, "start_to_close"
return min(candidates, key=lambda c: c[0])


class _RootActivityInbound(activity_interceptor.ActivityInboundInterceptor):
"""Root of the activity inbound chain: actually invokes the user function.

Expand Down Expand Up @@ -177,10 +196,13 @@ def heartbeat(self, *details: Any) -> None:

def _make_attempt_step(activity_name: str, *, dynamic: bool = False) -> AttemptStep:
async def attempt(
args: List[Any], start_to_close: Optional[float], meta: Dict[str, Any]
args: List[Any], deadline: Optional[float], meta: Dict[str, Any]
) -> Dict[str, Any]:
from .. import activity as activity_api

# Labels the caller-computed deadline so a timeout reports the right type.
deadline_type = meta.get("deadline_type", "start_to_close")

# The dynamic step handles any unmatched type: resolve the single dynamic
# activity rather than one keyed by the requested name (the real type is in meta).
defn = (
Expand Down Expand Up @@ -332,13 +354,12 @@ async def _watch(
}

async def run_to_deadline() -> Dict[str, Any]:
# Enforce start-to-close ourselves: the deadline is authoritative, so once
# it passes we cancel and abandon the attempt and discard any late result.
if start_to_close is None:
# Enforce the attempt deadline ourselves: once it passes, cancel and discard any late result.
if deadline is None:
return await run_attempt()
task = asyncio.ensure_future(run_attempt())
try:
done, _ = await asyncio.wait({task}, timeout=start_to_close)
done, _ = await asyncio.wait({task}, timeout=deadline)
except asyncio.CancelledError:
# External cancellation (workflow cancel / scope): propagate the
# cancel into the attempt, then re-raise.
Expand All @@ -355,7 +376,7 @@ async def run_to_deadline() -> Dict[str, Any]:

try:
# User exceptions (including user-raised TimeoutError) are converted inside
# call_user_activity, so a TimeoutError here is the start-to-close deadline.
# call_user_activity, so a TimeoutError here is the attempt deadline.
return await run_to_deadline()
except activity_api_complete_async_error():
# raise_complete_async(): the function returned but the activity stays
Expand All @@ -369,9 +390,18 @@ async def run_to_deadline() -> Dict[str, Any]:
# Mark the context so a hung sync thread (which cancellation
# cannot interrupt) still unwinds at its next heartbeat.
ctx.cancelled.set()
is_stc = deadline_type == "schedule_to_close"
timeout_failure = exceptions.TimeoutError(
"activity Start-To-Close timeout",
type=exceptions.TimeoutType.START_TO_CLOSE,
(
"activity Schedule-To-Close timeout"
if is_stc
else "activity Start-To-Close timeout"
),
type=(
exceptions.TimeoutType.SCHEDULE_TO_CLOSE
if is_stc
else exceptions.TimeoutType.START_TO_CLOSE
),
last_heartbeat_details=[],
)
return {
Expand Down
55 changes: 39 additions & 16 deletions dbosify/_internal/activity_workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,23 +127,32 @@ async def _run_queued_activity(payload: Dict[str, Any]) -> Dict[str, Any]:
}

attempt = int(meta.get("attempt", 1))
# Budget consumed before this attempt, reconstructed deterministically from recorded ended_at + durable backoffs (measured from started_at).
elapsed_before = 0.0
while True:
meta["attempt"] = attempt
envelope: Dict[str, Any] = await step_fn(args, start_to_close, meta)
# Bound this attempt by min(start_to_close, remaining schedule_to_close).
remaining_stc = (
schedule_to_close - elapsed_before
if schedule_to_close is not None
else None
)
deadline, deadline_type = activities_mod.effective_deadline(
start_to_close, remaining_stc
)
meta["deadline_type"] = deadline_type
envelope: Dict[str, Any] = await step_fn(args, deadline, meta)
if envelope.get("async_pending"):
# raise_complete_async(): park for external completion. A fail re-runs the
# activity per policy (fall through); complete/cancelled/timeout end here.
timeout = (
start_to_close
if start_to_close is not None
else (
schedule_to_close
if schedule_to_close is not None
else inbox.RECV_TIMEOUT_SECONDS
# Park for completion, minus the body time already spent against the deadline.
if deadline is not None:
body_elapsed = float(envelope.get("ended_at", 0.0)) - float(
envelope.get("started_at", envelope.get("ended_at", 0.0))
)
)
wait_timeout = max(0.0, deadline - max(0.0, body_elapsed))
else:
wait_timeout = inbox.RECV_TIMEOUT_SECONDS
envelope = await _await_async_completion(
envelope, timeout, str(meta.get("activity_id", ""))
envelope, wait_timeout, str(meta.get("activity_id", "")), deadline_type
)
if envelope.get("ok"):
return envelope
Expand All @@ -163,17 +172,22 @@ async def _run_queued_activity(payload: Dict[str, Any]) -> Dict[str, Any]:
if delay is None:
return {**envelope, "retry_state": int(retry_state)}
await DBOS.sleep_async(delay)
elapsed_before = elapsed + delay
attempt += 1


async def _await_async_completion(
pending_env: Dict[str, Any], timeout: float, activity_id: str
pending_env: Dict[str, Any],
timeout: float,
activity_id: str,
deadline_type: str = "start_to_close",
) -> Dict[str, Any]:
"""Park the activity workflow for external completion (raise_complete_async
on the queued path). Returns a normal attempt envelope: complete -> ok,
fail -> a retryable failure (the caller re-runs per policy), report_cancellation
(or a cancellation marker from the interpreter) -> a terminal CancelledError,
and a recv timeout -> a START_TO_CLOSE timeout.
and a recv timeout -> a START_TO_CLOSE / SCHEDULE_TO_CLOSE timeout (per
``deadline_type``).

Heartbeats sent by the completer are not completions: they are skipped so the
park keeps waiting (detail forwarding to the workflow side remains a
Expand All @@ -188,9 +202,18 @@ async def _await_async_completion(
while True:
completion = await DBOS.recv_async(inbox.ASYNC_COMPLETE_TOPIC, timeout)
if completion is None:
is_stc = deadline_type == "schedule_to_close"
timed_out = exceptions.TimeoutError(
"activity Start-To-Close timeout",
type=exceptions.TimeoutType.START_TO_CLOSE,
(
"activity Schedule-To-Close timeout"
if is_stc
else "activity Start-To-Close timeout"
),
type=(
exceptions.TimeoutType.SCHEDULE_TO_CLOSE
if is_stc
else exceptions.TimeoutType.START_TO_CLOSE
),
last_heartbeat_details=[],
)
return {
Expand Down
Loading
Loading