From 5142b581c86e4e309c5e2a0947af6ef33c664102 Mon Sep 17 00:00:00 2001 From: Peter Kraft Date: Tue, 23 Jun 2026 13:28:16 -0700 Subject: [PATCH 1/9] fix --- dbosify/_internal/interpreter.py | 4 +++- dbosify/workflow.py | 13 ++++++++----- tests/integration/test_interpreter_basic.py | 17 +++++++++++++++++ 3 files changed, 28 insertions(+), 6 deletions(-) diff --git a/dbosify/_internal/interpreter.py b/dbosify/_internal/interpreter.py index 875aad3..5150c89 100644 --- a/dbosify/_internal/interpreter.py +++ b/dbosify/_internal/interpreter.py @@ -2933,7 +2933,9 @@ def runtime_info(self) -> Info: if self._meta.root is not None else None ) - start_time = datetime.fromtimestamp(self._start_time) + # Timezone-aware UTC (matches temporalio); the epoch source is checkpointed + # so the rendered datetime is replay-stable regardless of the worker's local tz. + start_time = datetime.fromtimestamp(self._start_time, timezone.utc) return Info( attempt=self._meta.attempt, continued_run_id=self._continued_from, diff --git a/dbosify/workflow.py b/dbosify/workflow.py index 9f32515..2e7076d 100644 --- a/dbosify/workflow.py +++ b/dbosify/workflow.py @@ -19,7 +19,7 @@ import uuid as uuid_mod from contextlib import AbstractContextManager, nullcontext from dataclasses import dataclass, field -from datetime import datetime, timedelta +from datetime import datetime, timedelta, timezone from enum import Enum, IntEnum from random import Random from typing import ( @@ -675,7 +675,7 @@ class Info: .. deprecated:: Use :py:attr:`typed_search_attributes` instead. """ - start_time: datetime = datetime.fromtimestamp(0) + start_time: datetime = datetime.fromtimestamp(0, timezone.utc) task_queue: str = "" # Workflow-task timeout: no workflow-task concept here (inert). Surfaced for # parity; always None. @@ -684,7 +684,7 @@ class Info: workflow_id: str = "" # The run's initialization time. A single start timestamp per run (no # "first task" vs "initialization" distinction), so equals start_time. - workflow_start_time: datetime = datetime.fromtimestamp(0) + workflow_start_time: datetime = datetime.fromtimestamp(0, timezone.utc) workflow_type: str = "" def get_current_history_length(self) -> int: @@ -1217,8 +1217,11 @@ def upsert_search_attributes( def now() -> datetime: - """Current workflow time: deterministic, advances only on events.""" - return datetime.fromtimestamp(time(), tz=None) + """Current workflow time: deterministic, advances only on events. + + Returns a timezone-aware UTC datetime, matching temporalio (whose + ``workflow.now()`` documents UTC as the set time zone).""" + return datetime.fromtimestamp(time(), timezone.utc) def time() -> float: diff --git a/tests/integration/test_interpreter_basic.py b/tests/integration/test_interpreter_basic.py index 3cab1a7..da21862 100644 --- a/tests/integration/test_interpreter_basic.py +++ b/tests/integration/test_interpreter_basic.py @@ -89,11 +89,25 @@ async def run(self) -> Dict[str, Any]: schedule_to_close_timeout=timedelta(seconds=30), retry_policy=RetryPolicy(maximum_attempts=3), ) + # workflow.now()/start_time are tz-aware UTC (parity with temporalio); a + # fixed aware reference keeps this comparison deterministic on replay. + aware_ref = datetime(2000, 1, 1, tzinfo=timezone.utc) return { "first_execution_run_id": info.first_execution_run_id, "run_id": info.run_id, "workflow_id": info.workflow_id, "start_time_eq": info.workflow_start_time == info.start_time, + "start_time_is_utc": ( + info.start_time.tzinfo is not None + and info.start_time.utcoffset() == timedelta(0) + ), + "now_is_utc": ( + workflow.now().tzinfo is not None + and workflow.now().utcoffset() == timedelta(0) + ), + # Comparing the workflow clock against a tz-aware datetime must not + # raise (the naive-local bug raised "can't compare offset-naive..."). + "now_after_aware_ref": workflow.now() > aware_ref, "has_parent": info.parent is not None, "task_queue": info.task_queue, "activity": act, @@ -263,6 +277,9 @@ def test_workflow_and_activity_info_parity_fields() -> None: assert res["run_id"] == "infowf" assert res["workflow_id"] == "infowf" assert res["start_time_eq"] is True + assert res["start_time_is_utc"] is True + assert res["now_is_utc"] is True + assert res["now_after_aware_ref"] is True assert res["has_parent"] is False # No Worker registered a queue (in-process harness), so the workflow's queue # falls back to "default"; a local activity reports the workflow's queue. From b337ecbedff4a3080b9267e414fd4dc8e60fe176 Mon Sep 17 00:00:00 2001 From: Peter Kraft Date: Tue, 23 Jun 2026 13:44:50 -0700 Subject: [PATCH 2/9] cleanup --- dbosify/_internal/interpreter.py | 3 +-- dbosify/workflow.py | 3 +-- tests/integration/test_interpreter_basic.py | 6 ++---- 3 files changed, 4 insertions(+), 8 deletions(-) diff --git a/dbosify/_internal/interpreter.py b/dbosify/_internal/interpreter.py index 5150c89..fb79c2a 100644 --- a/dbosify/_internal/interpreter.py +++ b/dbosify/_internal/interpreter.py @@ -2933,8 +2933,7 @@ def runtime_info(self) -> Info: if self._meta.root is not None else None ) - # Timezone-aware UTC (matches temporalio); the epoch source is checkpointed - # so the rendered datetime is replay-stable regardless of the worker's local tz. + # Tz-aware UTC (temporalio parity), replay-stable via the checkpointed epoch. start_time = datetime.fromtimestamp(self._start_time, timezone.utc) return Info( attempt=self._meta.attempt, diff --git a/dbosify/workflow.py b/dbosify/workflow.py index 2e7076d..3610ee5 100644 --- a/dbosify/workflow.py +++ b/dbosify/workflow.py @@ -1219,8 +1219,7 @@ def upsert_search_attributes( def now() -> datetime: """Current workflow time: deterministic, advances only on events. - Returns a timezone-aware UTC datetime, matching temporalio (whose - ``workflow.now()`` documents UTC as the set time zone).""" + Tz-aware UTC, matching temporalio.""" return datetime.fromtimestamp(time(), timezone.utc) diff --git a/tests/integration/test_interpreter_basic.py b/tests/integration/test_interpreter_basic.py index da21862..8384609 100644 --- a/tests/integration/test_interpreter_basic.py +++ b/tests/integration/test_interpreter_basic.py @@ -89,8 +89,7 @@ async def run(self) -> Dict[str, Any]: schedule_to_close_timeout=timedelta(seconds=30), retry_policy=RetryPolicy(maximum_attempts=3), ) - # workflow.now()/start_time are tz-aware UTC (parity with temporalio); a - # fixed aware reference keeps this comparison deterministic on replay. + # Fixed aware ref keeps the tz-aware compare below deterministic on replay. aware_ref = datetime(2000, 1, 1, tzinfo=timezone.utc) return { "first_execution_run_id": info.first_execution_run_id, @@ -105,8 +104,7 @@ async def run(self) -> Dict[str, Any]: workflow.now().tzinfo is not None and workflow.now().utcoffset() == timedelta(0) ), - # Comparing the workflow clock against a tz-aware datetime must not - # raise (the naive-local bug raised "can't compare offset-naive..."). + # Comparing the clock to a tz-aware datetime must not raise. "now_after_aware_ref": workflow.now() > aware_ref, "has_parent": info.parent is not None, "task_queue": info.task_queue, From 6ab359a3bf707c736a0df80af373d9f60b868778 Mon Sep 17 00:00:00 2001 From: Peter Kraft Date: Tue, 23 Jun 2026 13:48:11 -0700 Subject: [PATCH 3/9] nit --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index 8232a7a..8a624b5 100644 --- a/README.md +++ b/README.md @@ -22,7 +22,7 @@ pip install dbosify ``` This is a drop-in replacement: simply import `dbosify` instead of `temporalio` and connect your clients and workers to a Postgres database instead of a Temporal server. -Further documentation [here](https://docs.dbos.dev/explanations/migrating-from-temporal). +Further documentation [here](https://docs.dbos.dev/explanations/dbosify). ```python import asyncio From dbb640b6aeb03f7722a610c19b4fa22d0eba35bb Mon Sep 17 00:00:00 2001 From: Peter Kraft Date: Tue, 23 Jun 2026 14:40:57 -0700 Subject: [PATCH 4/9] fixes --- dbosify/_internal/payloads.py | 15 +++++++++++ dbosify/_internal/visibility.py | 8 +++++- dbosify/_schedule.py | 14 ++++++---- tests/unit/test_failure_payloads.py | 42 ++++++++++++++++++++++++++++- tests/unit/test_schedules.py | 28 +++++++++++++++++++ tests/unit/test_visibility.py | 14 ++++++++-- 6 files changed, 112 insertions(+), 9 deletions(-) diff --git a/dbosify/_internal/payloads.py b/dbosify/_internal/payloads.py index a3837eb..89f824e 100644 --- a/dbosify/_internal/payloads.py +++ b/dbosify/_internal/payloads.py @@ -292,6 +292,12 @@ def serialize_failure( int(exc.retry_state) if exc.retry_state is not None else None ), } + elif isinstance(exc, exceptions.ServerError): + env = { + "cls": "ServerError", + "message": exc.message, + "non_retryable": exc.non_retryable, + } else: # Anything else (including FailureError subclasses we don't model # structurally) becomes an ApplicationError keyed by class name. @@ -305,7 +311,11 @@ def serialize_failure( } if exc.__traceback__ is not None: env["stack_trace"] = "".join(traceback.format_tb(exc.__traceback__)) + # Explicit `raise ... from` wins; else fall back to the implicit __context__ + # chain unless suppressed — matching temporalio. cause = exc.__cause__ + if cause is None and not exc.__suppress_context__: + cause = exc.__context__ env["cause"] = serialize_failure(cause, converter) if cause is not None else None return env @@ -370,6 +380,11 @@ def deserialize_failure(env: FailureEnvelope) -> exceptions.FailureError: started_event_id=0, retry_state=_retry_state(env.get("retry_state")), ) + elif cls == "ServerError": + exc = exceptions.ServerError( + env["message"], + non_retryable=bool(env.get("non_retryable", False)), + ) else: raise ValueError(f"unknown failure envelope class: {cls!r}") cause = env.get("cause") diff --git a/dbosify/_internal/visibility.py b/dbosify/_internal/visibility.py index 5336e9e..1d63d14 100644 --- a/dbosify/_internal/visibility.py +++ b/dbosify/_internal/visibility.py @@ -62,7 +62,13 @@ # Each Temporal status -> the DBOS status string(s) that can hold it. The four # ERROR-family statuses all collapse onto DBOS ``ERROR`` and need a post-filter. _TEMPORAL_TO_DBOS: Dict[WorkflowExecutionStatus, Tuple[str, ...]] = { - WorkflowExecutionStatus.RUNNING: ("PENDING", "ENQUEUED", "DELAYED"), + WorkflowExecutionStatus.RUNNING: ( + "PENDING", + "ENQUEUED", + "DELAYED", + # Stuck-but-not-closed; to_execution_status maps it to RUNNING too. + "MAX_RECOVERY_ATTEMPTS_EXCEEDED", + ), WorkflowExecutionStatus.COMPLETED: ("SUCCESS",), WorkflowExecutionStatus.TERMINATED: ("CANCELLED",), WorkflowExecutionStatus.FAILED: ("ERROR",), diff --git a/dbosify/_schedule.py b/dbosify/_schedule.py index d8d4154..a66a486 100644 --- a/dbosify/_schedule.py +++ b/dbosify/_schedule.py @@ -21,7 +21,7 @@ import asyncio import inspect from dataclasses import dataclass, field -from datetime import datetime, timedelta, timezone +from datetime import datetime, timedelta, timezone, tzinfo from enum import IntEnum from typing import ( TYPE_CHECKING, @@ -35,6 +35,7 @@ Sequence, Union, ) +from zoneinfo import ZoneInfo, ZoneInfoNotFoundError from ._internal import attributes as _attributes from ._internal import conversion @@ -880,14 +881,17 @@ def rng(rs: Sequence[ScheduleRange]) -> Sequence["tuple[int, int, int]"]: def _next_action_times(ctx: Mapping[str, Any], count: int) -> List[datetime]: try: spec = _deserialize_spec(ctx.get("spec", {})) - cron, _tz = compile_spec(spec) - except ValueError: + cron, tz_name = compile_spec(spec) + tz: tzinfo = ZoneInfo(tz_name) if tz_name else timezone.utc + except (ValueError, ZoneInfoNotFoundError): return [] from dbos._croniter import croniter # type: ignore[attr-defined] + # Iterate in the schedule's timezone (the DBOS scheduler fires there), then + # report each instant in UTC for parity with temporalio. now = datetime.now(timezone.utc) - it = croniter(cron, now, second_at_beginning=True) - return [it.get_next(datetime) for _ in range(count)] + it = croniter(cron, now.astimezone(tz), second_at_beginning=True) + return [it.get_next(datetime).astimezone(timezone.utc) for _ in range(count)] async def _description_from_row(row: Mapping[str, Any]) -> ScheduleDescription: diff --git a/tests/unit/test_failure_payloads.py b/tests/unit/test_failure_payloads.py index 5da0a9e..4a28c11 100644 --- a/tests/unit/test_failure_payloads.py +++ b/tests/unit/test_failure_payloads.py @@ -6,7 +6,7 @@ """ from dbosify._internal.payloads import deserialize_failure, serialize_failure -from dbosify.exceptions import ApplicationError, ApplicationErrorCategory +from dbosify.exceptions import ApplicationError, ApplicationErrorCategory, ServerError def test_application_error_category_roundtrips() -> None: @@ -31,3 +31,43 @@ def test_non_failure_exception_defaults_to_unspecified_category() -> None: back = deserialize_failure(serialize_failure(ValueError("oops"))) assert isinstance(back, ApplicationError) assert back.category == ApplicationErrorCategory.UNSPECIFIED + + +def test_implicit_context_chain_is_recorded_as_cause() -> None: + # `except X: raise Y` (no `from`) chains the original via __context__; + # temporalio records it as the cause, so we must too. + try: + try: + raise ValueError("root cause") + except ValueError: + raise ApplicationError("wrapped", type="Wrapped") + except ApplicationError as exc: + back = deserialize_failure(serialize_failure(exc)) + assert isinstance(back.__cause__, ApplicationError) + assert back.__cause__.type == "ValueError" + assert "root cause" in str(back.__cause__) + + +def test_suppressed_context_drops_cause() -> None: + # `raise Y from None` suppresses the implicit chain. + try: + try: + raise ValueError("root cause") + except ValueError: + raise ApplicationError("wrapped") from None + except ApplicationError as exc: + back = deserialize_failure(serialize_failure(exc)) + assert back.__cause__ is None + + +def test_server_error_roundtrips_preserving_non_retryable() -> None: + back = deserialize_failure( + serialize_failure(ServerError("srv", non_retryable=True)) + ) + assert isinstance(back, ServerError) + assert back.non_retryable is True + assert back.message == "srv" + # The default must round-trip as retryable, not silently flip. + back2 = deserialize_failure(serialize_failure(ServerError("srv2"))) + assert isinstance(back2, ServerError) + assert back2.non_retryable is False diff --git a/tests/unit/test_schedules.py b/tests/unit/test_schedules.py index 36b0905..587b7cf 100644 --- a/tests/unit/test_schedules.py +++ b/tests/unit/test_schedules.py @@ -17,6 +17,7 @@ ScheduleRange, ScheduleSpec, ScheduleState, + _next_action_times, _schedule_from_context, compile_spec, encode_action_attributes, @@ -117,6 +118,33 @@ def test_compile_spec_requires_a_spec() -> None: compile_spec(ScheduleSpec()) +def test_next_action_times_honor_schedule_timezone() -> None: + # A 09:00 daily calendar in America/New_York must report next fire times in + # UTC at 13:00/14:00 (EDT/EST) — not a naive 09:00 read in UTC. + schedule = Schedule( + action=ScheduleActionStartWorkflow("W", id="w", task_queue="tq"), + spec=ScheduleSpec( + calendars=[ScheduleCalendarSpec(hour=(ScheduleRange(9),))], + time_zone_name="America/New_York", + ), + ) + times = _next_action_times(serialize_schedule_context(schedule), 3) + assert times + for t in times: + assert t.utcoffset() == timedelta(0) # reported in UTC + assert t.hour in (13, 14) # 09:00 New York is 13:00 (EDT) / 14:00 (EST) UTC + + +def test_next_action_times_utc_schedule_unchanged() -> None: + # A plain UTC schedule still reports the wall-clock hour (no regression). + schedule = Schedule( + action=ScheduleActionStartWorkflow("W", id="w", task_queue="tq"), + spec=ScheduleSpec(calendars=[ScheduleCalendarSpec(hour=(ScheduleRange(9),))]), + ) + times = _next_action_times(serialize_schedule_context(schedule), 1) + assert times and times[0].hour == 9 and times[0].utcoffset() == timedelta(0) + + async def test_context_round_trip() -> None: kw = SearchAttributeKey.for_keyword("RoundTripKw") start_action = ScheduleActionStartWorkflow( diff --git a/tests/unit/test_visibility.py b/tests/unit/test_visibility.py index 6d38bb9..c6d5a9d 100644 --- a/tests/unit/test_visibility.py +++ b/tests/unit/test_visibility.py @@ -124,7 +124,11 @@ def test_workflow_id_bad_operator_rejected() -> None: def test_status_running_is_clean_multi_map() -> None: parsed = parse_query("ExecutionStatus = 'Running'") - assert parsed.to_dbos_filters() == {"status": ["PENDING", "ENQUEUED", "DELAYED"]} + # MAX_RECOVERY_ATTEMPTS_EXCEEDED is included: a stuck workflow maps to RUNNING + # in describe(), so the filter must surface it too. + assert parsed.to_dbos_filters() == { + "status": ["PENDING", "ENQUEUED", "DELAYED", "MAX_RECOVERY_ATTEMPTS_EXCEEDED"] + } assert parsed.post_filter() is None # no ERROR-family member @@ -164,7 +168,13 @@ def test_status_canceled_post_filter_keeps_only_cancellations() -> None: def test_status_in_unions_dbos_statuses() -> None: parsed = parse_query("ExecutionStatus IN ('Running', 'Completed')") statuses = parsed.to_dbos_filters()["status"] - assert set(statuses) == {"PENDING", "ENQUEUED", "DELAYED", "SUCCESS"} + assert set(statuses) == { + "PENDING", + "ENQUEUED", + "DELAYED", + "MAX_RECOVERY_ATTEMPTS_EXCEEDED", + "SUCCESS", + } assert parsed.post_filter() is None From 455ed60016bbee62fe0109a817e468cef961b921 Mon Sep 17 00:00:00 2001 From: Peter Kraft Date: Tue, 23 Jun 2026 15:16:44 -0700 Subject: [PATCH 5/9] timeout fix --- dbosify/_internal/activities.py | 51 +++++- dbosify/_internal/activity_workflow.py | 55 ++++-- dbosify/_internal/interpreter.py | 71 ++++++-- .../schedule_to_close_recovery_worker.py | 91 ++++++++++ tests/integration/test_schedule_to_close.py | 159 ++++++++++++++++++ .../test_schedule_to_close_recovery.py | 47 ++++++ 6 files changed, 439 insertions(+), 35 deletions(-) create mode 100644 tests/integration/schedule_to_close_recovery_worker.py create mode 100644 tests/integration/test_schedule_to_close.py create mode 100644 tests/integration/test_schedule_to_close_recovery.py diff --git a/dbosify/_internal/activities.py b/dbosify/_internal/activities.py index 1394a1f..8c22f9d 100644 --- a/dbosify/_internal/activities.py +++ b/dbosify/_internal/activities.py @@ -85,8 +85,8 @@ def retry_decision( Returns ``(backoff delay before the next attempt, or None to give up; the retry state to report when giving up)``. ``elapsed`` is the time since the activity was scheduled; pass it (with ``schedule_to_close``) so the - schedule-to-close budget gates retries — like Temporal, it bounds the retry - sequence, not an in-flight attempt (``start_to_close`` bounds that). + schedule-to-close budget gates retries between attempts. An in-flight attempt + is bounded separately by :func:`effective_deadline`. """ if failure.get("non_retryable"): return None, exceptions.RetryState.NON_RETRYABLE_FAILURE @@ -121,6 +121,25 @@ def retry_decision( return delay, exceptions.RetryState.IN_PROGRESS +def effective_deadline( + start_to_close: Optional[float], remaining_schedule_to_close: Optional[float] +) -> Tuple[Optional[float], str]: + """The deadline bounding a single in-flight attempt: the smaller of the + per-attempt ``start_to_close`` and the remaining ``schedule_to_close`` budget, + plus a label for which bound is binding (so a timeout reports the right type). + ``None`` means unbounded (neither timeout set). The caller computes the + remaining budget from deterministic time so the result replays identically. + """ + candidates: List[Tuple[float, str]] = [] + if start_to_close is not None: + candidates.append((max(0.0, start_to_close), "start_to_close")) + if remaining_schedule_to_close is not None: + candidates.append((max(0.0, remaining_schedule_to_close), "schedule_to_close")) + if not candidates: + return None, "start_to_close" + return min(candidates, key=lambda c: c[0]) + + class _RootActivityInbound(activity_interceptor.ActivityInboundInterceptor): """Root of the activity inbound chain: actually invokes the user function. @@ -177,10 +196,15 @@ def heartbeat(self, *details: Any) -> None: def _make_attempt_step(activity_name: str, *, dynamic: bool = False) -> AttemptStep: async def attempt( - args: List[Any], start_to_close: Optional[float], meta: Dict[str, Any] + args: List[Any], deadline: Optional[float], meta: Dict[str, Any] ) -> Dict[str, Any]: from .. import activity as activity_api + # The per-attempt deadline is the smaller of start_to_close and the + # remaining schedule_to_close budget (computed by the caller); the label + # says which, so a timeout reports the right TimeoutType. + deadline_type = meta.get("deadline_type", "start_to_close") + # The dynamic step handles any unmatched type: resolve the single dynamic # activity rather than one keyed by the requested name (the real type is in meta). defn = ( @@ -332,13 +356,13 @@ async def _watch( } async def run_to_deadline() -> Dict[str, Any]: - # Enforce start-to-close ourselves: the deadline is authoritative, so once + # Enforce the attempt deadline ourselves: it is authoritative, so once # it passes we cancel and abandon the attempt and discard any late result. - if start_to_close is None: + if deadline is None: return await run_attempt() task = asyncio.ensure_future(run_attempt()) try: - done, _ = await asyncio.wait({task}, timeout=start_to_close) + done, _ = await asyncio.wait({task}, timeout=deadline) except asyncio.CancelledError: # External cancellation (workflow cancel / scope): propagate the # cancel into the attempt, then re-raise. @@ -355,7 +379,7 @@ async def run_to_deadline() -> Dict[str, Any]: try: # User exceptions (including user-raised TimeoutError) are converted inside - # call_user_activity, so a TimeoutError here is the start-to-close deadline. + # call_user_activity, so a TimeoutError here is the attempt deadline. return await run_to_deadline() except activity_api_complete_async_error(): # raise_complete_async(): the function returned but the activity stays @@ -369,9 +393,18 @@ async def run_to_deadline() -> Dict[str, Any]: # Mark the context so a hung sync thread (which cancellation # cannot interrupt) still unwinds at its next heartbeat. ctx.cancelled.set() + is_stc = deadline_type == "schedule_to_close" timeout_failure = exceptions.TimeoutError( - "activity Start-To-Close timeout", - type=exceptions.TimeoutType.START_TO_CLOSE, + ( + "activity Schedule-To-Close timeout" + if is_stc + else "activity Start-To-Close timeout" + ), + type=( + exceptions.TimeoutType.SCHEDULE_TO_CLOSE + if is_stc + else exceptions.TimeoutType.START_TO_CLOSE + ), last_heartbeat_details=[], ) return { diff --git a/dbosify/_internal/activity_workflow.py b/dbosify/_internal/activity_workflow.py index d96c3ed..8630ed9 100644 --- a/dbosify/_internal/activity_workflow.py +++ b/dbosify/_internal/activity_workflow.py @@ -127,23 +127,32 @@ async def _run_queued_activity(payload: Dict[str, Any]) -> Dict[str, Any]: } attempt = int(meta.get("attempt", 1)) + # Elapsed budget consumed before the current attempt starts; reconstructed + # deterministically from recorded ``ended_at`` + the durable backoff sleeps, + # so it replays identically (measured from started_at, like retry_decision). + elapsed_before = 0.0 while True: meta["attempt"] = attempt - envelope: Dict[str, Any] = await step_fn(args, start_to_close, meta) + # Bound this attempt by min(start_to_close, remaining schedule_to_close). + remaining_stc = ( + schedule_to_close - elapsed_before + if schedule_to_close is not None + else None + ) + deadline, deadline_type = activities_mod.effective_deadline( + start_to_close, remaining_stc + ) + meta["deadline_type"] = deadline_type + envelope: Dict[str, Any] = await step_fn(args, deadline, meta) if envelope.get("async_pending"): - # raise_complete_async(): park for external completion. A fail re-runs the - # activity per policy (fall through); complete/cancelled/timeout end here. - timeout = ( - start_to_close - if start_to_close is not None - else ( - schedule_to_close - if schedule_to_close is not None - else inbox.RECV_TIMEOUT_SECONDS - ) + # raise_complete_async(): park for external completion, bounded by the + # same deadline. A fail re-runs per policy (fall through); complete/ + # cancelled/timeout end here. + wait_timeout = ( + deadline if deadline is not None else inbox.RECV_TIMEOUT_SECONDS ) envelope = await _await_async_completion( - envelope, timeout, str(meta.get("activity_id", "")) + envelope, wait_timeout, str(meta.get("activity_id", "")), deadline_type ) if envelope.get("ok"): return envelope @@ -163,17 +172,22 @@ async def _run_queued_activity(payload: Dict[str, Any]) -> Dict[str, Any]: if delay is None: return {**envelope, "retry_state": int(retry_state)} await DBOS.sleep_async(delay) + elapsed_before = elapsed + delay attempt += 1 async def _await_async_completion( - pending_env: Dict[str, Any], timeout: float, activity_id: str + pending_env: Dict[str, Any], + timeout: float, + activity_id: str, + deadline_type: str = "start_to_close", ) -> Dict[str, Any]: """Park the activity workflow for external completion (raise_complete_async on the queued path). Returns a normal attempt envelope: complete -> ok, fail -> a retryable failure (the caller re-runs per policy), report_cancellation (or a cancellation marker from the interpreter) -> a terminal CancelledError, - and a recv timeout -> a START_TO_CLOSE timeout. + and a recv timeout -> a START_TO_CLOSE / SCHEDULE_TO_CLOSE timeout (per + ``deadline_type``). Heartbeats sent by the completer are not completions: they are skipped so the park keeps waiting (detail forwarding to the workflow side remains a @@ -188,9 +202,18 @@ async def _await_async_completion( while True: completion = await DBOS.recv_async(inbox.ASYNC_COMPLETE_TOPIC, timeout) if completion is None: + is_stc = deadline_type == "schedule_to_close" timed_out = exceptions.TimeoutError( - "activity Start-To-Close timeout", - type=exceptions.TimeoutType.START_TO_CLOSE, + ( + "activity Schedule-To-Close timeout" + if is_stc + else "activity Start-To-Close timeout" + ), + type=( + exceptions.TimeoutType.SCHEDULE_TO_CLOSE + if is_stc + else exceptions.TimeoutType.START_TO_CLOSE + ), last_heartbeat_details=[], ) return { diff --git a/dbosify/_internal/interpreter.py b/dbosify/_internal/interpreter.py index fb79c2a..e268f1e 100644 --- a/dbosify/_internal/interpreter.py +++ b/dbosify/_internal/interpreter.py @@ -1856,9 +1856,22 @@ def _launch_attempt(self, exec_state: _ActivityExec) -> None: "workflow_type": self._defn.name, "headers": exec_state.headers, } + # Bound the attempt by min(start_to_close, remaining schedule_to_close). + # Remaining is computed from virtual time (deterministic), so the deadline + # replays identically; the timeout outcome is recorded in the envelope. + remaining_stc = ( + exec_state.schedule_to_close + - (self._vloop.time() - exec_state.scheduled_at) + if exec_state.schedule_to_close is not None + else None + ) + deadline, deadline_type = activities_mod.effective_deadline( + exec_state.start_to_close, remaining_stc + ) + meta["deadline_type"] = deadline_type # Call step_fn synchronously so its function_id is claimed here; the result # decode rides outside the recorded step, replaying from the envelope. - step_coro = step_fn(exec_state.args, exec_state.start_to_close, meta) + step_coro = step_fn(exec_state.args, deadline, meta) self._launch_waiter( "activity", exec_state.seq, @@ -2056,15 +2069,30 @@ def _deliver_activity_event(self, waiter: _Waiter) -> None: # raise_complete_async(): the function returned but the activity stays # pending until an activity_result envelope resolves it (checkpointed). exec_state.async_pending = True - if exec_state.start_to_close is not None: - elapsed = float(envelope.get("ended_at", 0.0)) - float( - envelope.get("started_at", envelope.get("ended_at", 0.0)) - ) - remaining = max(0.05, exec_state.start_to_close - max(elapsed, 0.0)) + # Park a deadline = min(remaining start_to_close, remaining + # schedule_to_close); without the latter an async-pending activity with + # only schedule_to_close set would wait forever. On budget exhaustion the + # give-up cause is corrected to SCHEDULE_TO_CLOSE (_activity_failure_cause). + attempt_elapsed = float(envelope.get("ended_at", 0.0)) - float( + envelope.get("started_at", envelope.get("ended_at", 0.0)) + ) + s2c_remaining = ( + exec_state.start_to_close - max(attempt_elapsed, 0.0) + if exec_state.start_to_close is not None + else None + ) + stc_remaining = ( + exec_state.schedule_to_close + - (self._vloop.time() - exec_state.scheduled_at) + if exec_state.schedule_to_close is not None + else None + ) + parked, _ = activities_mod.effective_deadline(s2c_remaining, stc_remaining) + if parked is not None: self._launch_waiter( "act_s2c", exec_state.seq, - DBOS.sleep_async(remaining), + DBOS.sleep_async(max(0.05, parked)), ) if exec_state.heartbeat_timeout is not None: exec_state.async_hb_seen = False @@ -2108,7 +2136,7 @@ def _deliver_activity_event(self, waiter: _Waiter) -> None: activity_id=exec_state.activity_id, retry_state=retry_state, ) - error.__cause__ = deserialize_failure(failure) + error.__cause__ = self._activity_failure_cause(failure, retry_state) exec_state.future.set_exception(error) def _deliver_queued_activity_event(self, waiter: _Waiter) -> None: @@ -2156,7 +2184,7 @@ def _deliver_queued_activity_event(self, waiter: _Waiter) -> None: activity_id=exec_state.activity_id, retry_state=retry_state, ) - error.__cause__ = deserialize_failure(failure) + error.__cause__ = self._activity_failure_cause(failure, retry_state) exec_state.future.set_exception(error) def _deliver_child_event(self, waiter: _Waiter) -> None: @@ -2249,6 +2277,29 @@ def _retry_decision( schedule_to_close=exec_state.schedule_to_close, ) + @staticmethod + def _activity_failure_cause( + failure: FailureEnvelope, retry_state: Optional[exceptions.RetryState] + ) -> BaseException: + """The ``ActivityError`` cause for a terminal failure. When retries stop on + the schedule_to_close budget (``RetryState.TIMEOUT``) with a non-timeout last + failure, temporalio surfaces a SCHEDULE_TO_CLOSE timeout nesting it — so wrap + it. A last failure that is itself a timeout (start-to-close, schedule-to-close, + schedule-to-start, or heartbeat) already carries its specific TimeoutType and + passes through unchanged.""" + cause = deserialize_failure(failure) + if retry_state == exceptions.RetryState.TIMEOUT and not isinstance( + cause, exceptions.TimeoutError + ): + wrapper = exceptions.TimeoutError( + "activity Schedule-To-Close timeout", + type=exceptions.TimeoutType.SCHEDULE_TO_CLOSE, + last_heartbeat_details=[], + ) + wrapper.__cause__ = cause + return wrapper + return cause + # ------------------------------------------------------------------ # Inbox routing # ------------------------------------------------------------------ @@ -2381,7 +2432,7 @@ def _fail_async_attempt( activity_id=exec_state.activity_id, retry_state=retry_state, ) - error.__cause__ = deserialize_failure(failure) + error.__cause__ = self._activity_failure_cause(failure, retry_state) exec_state.future.set_exception(error) def _retire_parked_timers(self, seq: int) -> None: diff --git a/tests/integration/schedule_to_close_recovery_worker.py b/tests/integration/schedule_to_close_recovery_worker.py new file mode 100644 index 0000000..cfe0575 --- /dev/null +++ b/tests/integration/schedule_to_close_recovery_worker.py @@ -0,0 +1,91 @@ +"""Subprocess worker proving a schedule-to-close timeout is durable across a crash. + +Run as: python schedule_to_close_recovery_worker.py + +The activity has only ``schedule_to_close`` set and hangs forever; the budget times +it out with SCHEDULE_TO_CLOSE (before the fix this hung forever). The parent records +the outcome and parks. The test SIGKILLs the parked run and resumes in a fresh +process, where recovery replays run(): the activity's SCHEDULE_TO_CLOSE envelope is +replayed from its checkpoint (the activity is NOT re-run and re-timed), and the +released run returns the same outcome — proving the timeout decision was +checkpointed, not re-derived by luck. +""" + +import asyncio +import sys +from datetime import timedelta + +from dbosify import activity, workflow +from dbosify.common import RetryPolicy +from dbosify.exceptions import ActivityError, TimeoutError +from dbosify.worker import Worker +from tests.dbconfig import connect_client, default_config + +TASK_QUEUE = "stc-recovery-tq" + + +@activity.defn +async def hang_forever() -> None: + await asyncio.Event().wait() + + +@workflow.defn(name="ScheduleToCloseRecoveryWorkflow") +class ScheduleToCloseRecoveryWorkflow: + def __init__(self) -> None: + self.go = False + self.outcome = "" + + @workflow.signal + def release(self) -> None: + self.go = True + + @workflow.run + async def run(self) -> str: + try: + await workflow.execute_activity( + hang_forever, + schedule_to_close_timeout=timedelta(seconds=2), + retry_policy=RetryPolicy(maximum_attempts=1), + ) + self.outcome = "no-timeout" + except ActivityError as err: + if isinstance(err.cause, TimeoutError): + tt = err.cause.type + self.outcome = "timed-out:" + (tt.name if tt is not None else "UNKNOWN") + else: + self.outcome = "other:" + type(err.cause).__name__ + # The timeout outcome is now derived from the activity's checkpoint; park + # so the test can SIGKILL with everything needed to reconstruct it. + print("PARKED", flush=True) + await workflow.wait_condition(lambda: self.go) + return self.outcome + + +async def main() -> None: + action, workflow_id = sys.argv[1], sys.argv[2] + async with Worker( + default_config(), + task_queue=TASK_QUEUE, + workflows=[ScheduleToCloseRecoveryWorkflow], + activities=[hang_forever], + ): + client = await connect_client() + try: + if action == "start": + handle = await client.start_workflow( + ScheduleToCloseRecoveryWorkflow.run, + id=workflow_id, + task_queue=TASK_QUEUE, + ) + await handle.result() + else: + assert action == "resume" + handle = client.get_workflow_handle(workflow_id) + await handle.signal(ScheduleToCloseRecoveryWorkflow.release) + print("RESULT " + await handle.result(), flush=True) + finally: + await client.close() + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/tests/integration/test_schedule_to_close.py b/tests/integration/test_schedule_to_close.py new file mode 100644 index 0000000..34f1335 --- /dev/null +++ b/tests/integration/test_schedule_to_close.py @@ -0,0 +1,159 @@ +"""``schedule_to_close_timeout`` bounds the whole activity lifecycle — including a +*running* attempt, not just the gaps between retries. + +Before the fix, an activity with only ``schedule_to_close`` set whose attempt hung +had no deadline anywhere and hung the workflow forever; Temporal fails it with a +SCHEDULE_TO_CLOSE timeout. These cover: a hang with only schedule_to_close, a +schedule_to_close shorter than start_to_close cutting the attempt early, and the +retry-exhaustion give-up surfacing a SCHEDULE_TO_CLOSE cause that nests the last +failure. +""" + +import asyncio +import time as _time +from contextlib import asynccontextmanager +from datetime import timedelta +from typing import AsyncIterator + +import pytest + +from dbosify import activity, workflow +from dbosify.client import Client +from dbosify.common import RetryPolicy +from dbosify.exceptions import ActivityError, ApplicationError, TimeoutError +from dbosify.worker import Worker +from tests.dbconfig import connect_client, default_config + +pytestmark = pytest.mark.usefixtures("dbosify_env") + +TASK_QUEUE = "stc-tq" + + +@activity.defn +async def hang_forever() -> None: + await asyncio.Event().wait() + + +@activity.defn +async def always_fails() -> None: + raise ApplicationError("boom", type="Boom") + + +def _describe(err: ActivityError) -> str: + """A flat, serializable summary of the terminal ActivityError so the + assertion lives in the test, not the workflow.""" + cause = err.cause + state = err.retry_state.name if err.retry_state is not None else "?" + if not isinstance(cause, TimeoutError): + return f"other:{type(cause).__name__}:state={state}" + ttype = cause.type.name if cause.type is not None else "?" + inner = type(cause.__cause__).__name__ if cause.__cause__ is not None else "None" + return f"timeout:{ttype}:inner={inner}:state={state}" + + +@workflow.defn +class OnlyScheduleToCloseHang: + @workflow.run + async def run(self) -> str: + try: + await workflow.execute_activity( + hang_forever, + schedule_to_close_timeout=timedelta(seconds=2), + ) + return "no-timeout" + except ActivityError as err: + return _describe(err) + + +@workflow.defn +class ScheduleToCloseBeatsStartToClose: + @workflow.run + async def run(self) -> str: + try: + await workflow.execute_activity( + hang_forever, + start_to_close_timeout=timedelta(seconds=60), + schedule_to_close_timeout=timedelta(seconds=2), + ) + return "no-timeout" + except ActivityError as err: + return _describe(err) + + +@workflow.defn +class RetryExhaustsBudget: + @workflow.run + async def run(self) -> str: + try: + await workflow.execute_activity( + always_fails, + schedule_to_close_timeout=timedelta(seconds=2), + retry_policy=RetryPolicy( + initial_interval=timedelta(seconds=0.4), + backoff_coefficient=1.0, + maximum_attempts=100, # the budget, not the count, must stop it + ), + ) + return "no-timeout" + except ActivityError as err: + return _describe(err) + + +@asynccontextmanager +async def _env() -> AsyncIterator[Client]: + worker = Worker( + default_config(), + task_queue=TASK_QUEUE, + workflows=[ + OnlyScheduleToCloseHang, + ScheduleToCloseBeatsStartToClose, + RetryExhaustsBudget, + ], + activities=[hang_forever, always_fails], + ) + async with worker: + client = await connect_client() + try: + yield client + finally: + await client.close() + + +@pytest.mark.timeout(60) +async def test_schedule_to_close_times_out_hanging_attempt() -> None: + # Only schedule_to_close set, attempt hangs: must time out, not hang forever. + async with _env() as client: + started = _time.monotonic() + result = await client.execute_workflow( + OnlyScheduleToCloseHang.run, id="stc-only-hang", task_queue=TASK_QUEUE + ) + elapsed = _time.monotonic() - started + assert result == "timeout:SCHEDULE_TO_CLOSE:inner=None:state=TIMEOUT" + assert elapsed < 30 # bounded by the ~2s budget, not an infinite hang + + +@pytest.mark.timeout(60) +async def test_schedule_to_close_cuts_attempt_before_start_to_close() -> None: + # schedule_to_close (2s) is shorter than start_to_close (60s): the running + # attempt is cut at the budget with SCHEDULE_TO_CLOSE, not at start_to_close. + async with _env() as client: + started = _time.monotonic() + result = await client.execute_workflow( + ScheduleToCloseBeatsStartToClose.run, + id="stc-beats-s2c", + task_queue=TASK_QUEUE, + ) + elapsed = _time.monotonic() - started + assert result == "timeout:SCHEDULE_TO_CLOSE:inner=None:state=TIMEOUT" + assert elapsed < 30 # cut at ~2s, nowhere near start_to_close's 60s + + +@pytest.mark.timeout(60) +async def test_schedule_to_close_retry_exhaustion_wraps_last_failure() -> None: + # Retries stop on the budget: the cause is a SCHEDULE_TO_CLOSE timeout that + # nests the last application error (matching temporalio). + async with _env() as client: + result = await client.execute_workflow( + RetryExhaustsBudget.run, id="stc-retry-exhaust", task_queue=TASK_QUEUE + ) + assert result == "timeout:SCHEDULE_TO_CLOSE:inner=ApplicationError:state=TIMEOUT" diff --git a/tests/integration/test_schedule_to_close_recovery.py b/tests/integration/test_schedule_to_close_recovery.py new file mode 100644 index 0000000..3b235bf --- /dev/null +++ b/tests/integration/test_schedule_to_close_recovery.py @@ -0,0 +1,47 @@ +"""SIGKILL-recovery test for schedule-to-close timeouts. + +An activity with only ``schedule_to_close`` set hangs forever; the budget times it +out with SCHEDULE_TO_CLOSE. This proves that timeout outcome is durable: a parked +run is crashed after the timeout fires and resumed in a fresh process, where it +must replay the same SCHEDULE_TO_CLOSE outcome from the activity's checkpoint +rather than re-running the activity. +""" + +from pathlib import Path +from typing import Dict + +import pytest + +from tests.harness import PythonProcess + +WORKER = Path(__file__).parent / "schedule_to_close_recovery_worker.py" +REPO_ROOT = Path(__file__).parents[2] + + +def _env() -> Dict[str, str]: + return {"PYTHONPATH": str(REPO_ROOT)} + + +@pytest.mark.usefixtures("cleanup_test_databases") +def test_schedule_to_close_timeout_survives_crash() -> None: + wf_id = "schedule-to-close-recovery" + first = PythonProcess(WORKER, "start", wf_id, env=_env()) + first.start() + try: + # PARKED prints only after the activity timed out and the outcome was + # recorded, so the timeout is already checkpointed when we crash. + first.wait_for_line("PARKED", timeout=60) + first.sigkill() + assert first.wait() == -9 + finally: + first.terminate_and_wait() + + second = PythonProcess(WORKER, "resume", wf_id, env=_env()) + second.start() + try: + line = second.wait_for_line("RESULT ", timeout=60) + assert second.wait() == 0 + finally: + second.terminate_and_wait() + + assert line.split("RESULT ", 1)[1].strip() == "timed-out:SCHEDULE_TO_CLOSE" From d5b1d49628353df1231d4570a86a209060eaaf61 Mon Sep 17 00:00:00 2001 From: Peter Kraft Date: Tue, 23 Jun 2026 15:22:19 -0700 Subject: [PATCH 6/9] comments --- dbosify/_internal/activities.py | 14 +++----------- dbosify/_internal/activity_workflow.py | 8 ++------ dbosify/_internal/interpreter.py | 16 +++------------- 3 files changed, 8 insertions(+), 30 deletions(-) diff --git a/dbosify/_internal/activities.py b/dbosify/_internal/activities.py index 8c22f9d..f059520 100644 --- a/dbosify/_internal/activities.py +++ b/dbosify/_internal/activities.py @@ -124,12 +124,7 @@ def retry_decision( def effective_deadline( start_to_close: Optional[float], remaining_schedule_to_close: Optional[float] ) -> Tuple[Optional[float], str]: - """The deadline bounding a single in-flight attempt: the smaller of the - per-attempt ``start_to_close`` and the remaining ``schedule_to_close`` budget, - plus a label for which bound is binding (so a timeout reports the right type). - ``None`` means unbounded (neither timeout set). The caller computes the - remaining budget from deterministic time so the result replays identically. - """ + """``min(start_to_close, remaining schedule_to_close)`` + the binding bound's label (so a timeout reports the right type); ``None`` is unbounded.""" candidates: List[Tuple[float, str]] = [] if start_to_close is not None: candidates.append((max(0.0, start_to_close), "start_to_close")) @@ -200,9 +195,7 @@ async def attempt( ) -> Dict[str, Any]: from .. import activity as activity_api - # The per-attempt deadline is the smaller of start_to_close and the - # remaining schedule_to_close budget (computed by the caller); the label - # says which, so a timeout reports the right TimeoutType. + # Labels the caller-computed deadline so a timeout reports the right type. deadline_type = meta.get("deadline_type", "start_to_close") # The dynamic step handles any unmatched type: resolve the single dynamic @@ -356,8 +349,7 @@ async def _watch( } async def run_to_deadline() -> Dict[str, Any]: - # Enforce the attempt deadline ourselves: it is authoritative, so once - # it passes we cancel and abandon the attempt and discard any late result. + # Enforce the attempt deadline ourselves: once it passes, cancel and discard any late result. if deadline is None: return await run_attempt() task = asyncio.ensure_future(run_attempt()) diff --git a/dbosify/_internal/activity_workflow.py b/dbosify/_internal/activity_workflow.py index 8630ed9..0b6ebd5 100644 --- a/dbosify/_internal/activity_workflow.py +++ b/dbosify/_internal/activity_workflow.py @@ -127,9 +127,7 @@ async def _run_queued_activity(payload: Dict[str, Any]) -> Dict[str, Any]: } attempt = int(meta.get("attempt", 1)) - # Elapsed budget consumed before the current attempt starts; reconstructed - # deterministically from recorded ``ended_at`` + the durable backoff sleeps, - # so it replays identically (measured from started_at, like retry_decision). + # Budget consumed before this attempt, reconstructed deterministically from recorded ended_at + durable backoffs (measured from started_at). elapsed_before = 0.0 while True: meta["attempt"] = attempt @@ -145,9 +143,7 @@ async def _run_queued_activity(payload: Dict[str, Any]) -> Dict[str, Any]: meta["deadline_type"] = deadline_type envelope: Dict[str, Any] = await step_fn(args, deadline, meta) if envelope.get("async_pending"): - # raise_complete_async(): park for external completion, bounded by the - # same deadline. A fail re-runs per policy (fall through); complete/ - # cancelled/timeout end here. + # raise_complete_async(): park for external completion, bounded by the same deadline; a fail re-runs per policy. wait_timeout = ( deadline if deadline is not None else inbox.RECV_TIMEOUT_SECONDS ) diff --git a/dbosify/_internal/interpreter.py b/dbosify/_internal/interpreter.py index e268f1e..7b895d7 100644 --- a/dbosify/_internal/interpreter.py +++ b/dbosify/_internal/interpreter.py @@ -1856,9 +1856,7 @@ def _launch_attempt(self, exec_state: _ActivityExec) -> None: "workflow_type": self._defn.name, "headers": exec_state.headers, } - # Bound the attempt by min(start_to_close, remaining schedule_to_close). - # Remaining is computed from virtual time (deterministic), so the deadline - # replays identically; the timeout outcome is recorded in the envelope. + # Bound the attempt by min(start_to_close, remaining schedule_to_close); remaining uses virtual time, so it replays identically. remaining_stc = ( exec_state.schedule_to_close - (self._vloop.time() - exec_state.scheduled_at) @@ -2069,10 +2067,7 @@ def _deliver_activity_event(self, waiter: _Waiter) -> None: # raise_complete_async(): the function returned but the activity stays # pending until an activity_result envelope resolves it (checkpointed). exec_state.async_pending = True - # Park a deadline = min(remaining start_to_close, remaining - # schedule_to_close); without the latter an async-pending activity with - # only schedule_to_close set would wait forever. On budget exhaustion the - # give-up cause is corrected to SCHEDULE_TO_CLOSE (_activity_failure_cause). + # Park min(remaining start_to_close, remaining schedule_to_close); else a schedule_to_close-only async activity waits forever. attempt_elapsed = float(envelope.get("ended_at", 0.0)) - float( envelope.get("started_at", envelope.get("ended_at", 0.0)) ) @@ -2281,12 +2276,7 @@ def _retry_decision( def _activity_failure_cause( failure: FailureEnvelope, retry_state: Optional[exceptions.RetryState] ) -> BaseException: - """The ``ActivityError`` cause for a terminal failure. When retries stop on - the schedule_to_close budget (``RetryState.TIMEOUT``) with a non-timeout last - failure, temporalio surfaces a SCHEDULE_TO_CLOSE timeout nesting it — so wrap - it. A last failure that is itself a timeout (start-to-close, schedule-to-close, - schedule-to-start, or heartbeat) already carries its specific TimeoutType and - passes through unchanged.""" + """Wrap a non-timeout last failure as SCHEDULE_TO_CLOSE when retries stop on the budget (temporalio parity); an already-typed timeout passes through.""" cause = deserialize_failure(failure) if retry_state == exceptions.RetryState.TIMEOUT and not isinstance( cause, exceptions.TimeoutError From c3592faa8b7d68da09335b02a222aec656e93315 Mon Sep 17 00:00:00 2001 From: Peter Kraft Date: Tue, 23 Jun 2026 15:47:43 -0700 Subject: [PATCH 7/9] fix --- dbosify/_internal/interpreter.py | 35 +++++++++++++++---- .../schedule_to_close_recovery_worker.py | 14 ++++++++ tests/integration/test_activity_roundout.py | 29 +++++++++++++++ .../test_schedule_to_close_recovery.py | 24 ++++++++++--- 4 files changed, 90 insertions(+), 12 deletions(-) diff --git a/dbosify/_internal/interpreter.py b/dbosify/_internal/interpreter.py index 7b895d7..0d7f94b 100644 --- a/dbosify/_internal/interpreter.py +++ b/dbosify/_internal/interpreter.py @@ -624,6 +624,12 @@ class _ActivityExec: last_failure: Optional[FailureEnvelope] = None cancel_requested: bool = False # WAIT_CANCELLATION_COMPLETED in flight async_pending: bool = False # raise_complete_async(): awaiting external + # The binding bound for the async-pending park timer (act_s2c): START_TO_CLOSE + # or SCHEDULE_TO_CLOSE. Set when the park is armed so the timeout reports the + # right type; recomputed deterministically on replay (same code path). + async_park_timeout_type: "exceptions.TimeoutType" = ( + exceptions.TimeoutType.START_TO_CLOSE + ) heartbeat_timeout: Optional[float] = None # Whether a completer heartbeat arrived within the current watch window # (the timeout check counts envelopes between timer fires — no clock reads). @@ -2035,8 +2041,14 @@ async def _deliver(self, done: Set["asyncio.Task[Any]"]) -> None: elif waiter.kind == "q_activity": self._deliver_queued_activity_event(waiter) elif waiter.kind == "act_s2c": + parked_state = self._pending_activities.get(waiter.seq) self._async_parked_timeout( - waiter.seq, exceptions.TimeoutType.START_TO_CLOSE + waiter.seq, + ( + parked_state.async_park_timeout_type + if parked_state is not None + else exceptions.TimeoutType.START_TO_CLOSE + ), ) elif waiter.kind == "act_hb": self._async_heartbeat_check(waiter.seq) @@ -2082,8 +2094,15 @@ def _deliver_activity_event(self, waiter: _Waiter) -> None: if exec_state.schedule_to_close is not None else None ) - parked, _ = activities_mod.effective_deadline(s2c_remaining, stc_remaining) + parked, parked_type = activities_mod.effective_deadline( + s2c_remaining, stc_remaining + ) if parked is not None: + exec_state.async_park_timeout_type = ( + exceptions.TimeoutType.SCHEDULE_TO_CLOSE + if parked_type == "schedule_to_close" + else exceptions.TimeoutType.START_TO_CLOSE + ) self._launch_waiter( "act_s2c", exec_state.seq, @@ -2440,12 +2459,14 @@ def _async_parked_timeout( if exec_state is None or not exec_state.async_pending: return # resolved (or retired) before the timer fired self._retire_parked_timers(seq) + if timeout_type == exceptions.TimeoutType.SCHEDULE_TO_CLOSE: + message = "activity Schedule-To-Close timeout" + elif timeout_type == exceptions.TimeoutType.START_TO_CLOSE: + message = "activity Start-To-Close timeout" + else: + message = "activity Heartbeat timeout" timeout_failure = exceptions.TimeoutError( - ( - "activity Start-To-Close timeout" - if timeout_type == exceptions.TimeoutType.START_TO_CLOSE - else "activity Heartbeat timeout" - ), + message, type=timeout_type, last_heartbeat_details=[], ) diff --git a/tests/integration/schedule_to_close_recovery_worker.py b/tests/integration/schedule_to_close_recovery_worker.py index cfe0575..c7e81d2 100644 --- a/tests/integration/schedule_to_close_recovery_worker.py +++ b/tests/integration/schedule_to_close_recovery_worker.py @@ -9,9 +9,16 @@ replayed from its checkpoint (the activity is NOT re-run and re-timed), and the released run returns the same outcome — proving the timeout decision was checkpointed, not re-derived by luck. + +Each activity invocation appends a line to the file named by ``STC_RECOVERY_MARKER`` +so the test can assert the activity ran *exactly once*. If recovery re-ran the +activity (re-deriving the timeout instead of replaying the checkpoint), the outcome +would look the same but a second marker line would appear — which is the regression +this test now actually detects. """ import asyncio +import os import sys from datetime import timedelta @@ -26,6 +33,13 @@ @activity.defn async def hang_forever() -> None: + # Record each invocation durably (before hanging) so the test can prove the + # activity ran exactly once: on resume the timeout must replay from the + # checkpoint, never re-invoke this body. + marker = os.environ.get("STC_RECOVERY_MARKER") + if marker: + with open(marker, "a") as f: + f.write("invoked\n") await asyncio.Event().wait() diff --git a/tests/integration/test_activity_roundout.py b/tests/integration/test_activity_roundout.py index d954a81..ce752fd 100644 --- a/tests/integration/test_activity_roundout.py +++ b/tests/integration/test_activity_roundout.py @@ -261,6 +261,18 @@ async def run(self, s2c: float, hb: Optional[float], max_attempts: int = 1) -> s return result +@workflow.defn +class AsyncScheduleToCloseTimeoutWorkflow: + @workflow.run + async def run(self, stc: float) -> str: + result: str = await workflow.execute_activity( + complete_externally, + schedule_to_close_timeout=timedelta(seconds=stc), + retry_policy=RetryPolicy(maximum_attempts=1), + ) + return result + + @workflow.defn class SwallowedCancelTimeoutWorkflow: @workflow.run @@ -347,6 +359,7 @@ async def _env() -> AsyncIterator[Client]: HeartbeatTimeoutWorkflow, AsyncRetryWorkflow, AsyncTimeoutWorkflow, + AsyncScheduleToCloseTimeoutWorkflow, AsyncCustomIdWorkflow, DuplicateIdWorkflow, AsyncCanWorkflow, @@ -624,6 +637,22 @@ async def test_parked_async_activity_start_to_close() -> None: assert _timeout_cause(exc_info).type == TimeoutType.START_TO_CLOSE +async def test_parked_async_activity_schedule_to_close() -> None: + """A parked async activity bounded only by schedule-to-close times out with + SCHEDULE_TO_CLOSE, not START_TO_CLOSE: the inline park reports the binding + bound's type, matching the queued path (regression for the dropped label).""" + async with _env() as client: + handle = await client.start_workflow( + AsyncScheduleToCloseTimeoutWorkflow.run, + 0.8, + id="parked-stc", + task_queue=TASK_QUEUE, + ) + with pytest.raises(WorkflowFailureError) as exc_info: + await handle.result() + assert _timeout_cause(exc_info).type == TimeoutType.SCHEDULE_TO_CLOSE + + async def test_parked_async_activity_heartbeat_timeout() -> None: """A parked async activity whose completer never heartbeats times out with TimeoutType.HEARTBEAT; one that heartbeats inside the window diff --git a/tests/integration/test_schedule_to_close_recovery.py b/tests/integration/test_schedule_to_close_recovery.py index 3b235bf..3a763f9 100644 --- a/tests/integration/test_schedule_to_close_recovery.py +++ b/tests/integration/test_schedule_to_close_recovery.py @@ -5,6 +5,12 @@ run is crashed after the timeout fires and resumed in a fresh process, where it must replay the same SCHEDULE_TO_CLOSE outcome from the activity's checkpoint rather than re-running the activity. + +The crux is *replay vs. re-run*: re-running the activity on resume would hang for +another budget and re-derive an identical SCHEDULE_TO_CLOSE outcome, so the outcome +assertion alone cannot tell the two apart. To make the test actually discriminating, +each activity invocation appends to a marker file and we assert it ran exactly once +across both processes — a re-run regression leaves a second line. """ from pathlib import Path @@ -18,25 +24,29 @@ REPO_ROOT = Path(__file__).parents[2] -def _env() -> Dict[str, str]: - return {"PYTHONPATH": str(REPO_ROOT)} +def _env(marker: Path) -> Dict[str, str]: + return {"PYTHONPATH": str(REPO_ROOT), "STC_RECOVERY_MARKER": str(marker)} @pytest.mark.usefixtures("cleanup_test_databases") -def test_schedule_to_close_timeout_survives_crash() -> None: +def test_schedule_to_close_timeout_survives_crash(tmp_path: Path) -> None: wf_id = "schedule-to-close-recovery" - first = PythonProcess(WORKER, "start", wf_id, env=_env()) + marker = tmp_path / "invocations.log" + env = _env(marker) + first = PythonProcess(WORKER, "start", wf_id, env=env) first.start() try: # PARKED prints only after the activity timed out and the outcome was # recorded, so the timeout is already checkpointed when we crash. first.wait_for_line("PARKED", timeout=60) + # The activity ran once and timed out before the crash. + assert marker.read_text().count("invoked") == 1 first.sigkill() assert first.wait() == -9 finally: first.terminate_and_wait() - second = PythonProcess(WORKER, "resume", wf_id, env=_env()) + second = PythonProcess(WORKER, "resume", wf_id, env=env) second.start() try: line = second.wait_for_line("RESULT ", timeout=60) @@ -45,3 +55,7 @@ def test_schedule_to_close_timeout_survives_crash() -> None: second.terminate_and_wait() assert line.split("RESULT ", 1)[1].strip() == "timed-out:SCHEDULE_TO_CLOSE" + # The discriminating assertion: recovery replayed the timeout from the + # activity's checkpoint instead of re-invoking (and re-timing) it. A + # checkpoint regression would re-run hang_forever here -> a second line. + assert marker.read_text().count("invoked") == 1 From 1b456c9ce8bc4d030eabffc35c4a855983893001 Mon Sep 17 00:00:00 2001 From: Peter Kraft Date: Tue, 23 Jun 2026 16:41:17 -0700 Subject: [PATCH 8/9] fix --- dbosify/_internal/activities.py | 10 ++++ dbosify/_internal/activity_workflow.py | 16 ++++-- dbosify/_internal/interpreter.py | 28 +++++++++- tests/integration/test_schedule_to_close.py | 60 ++++++++++++++++++++- 4 files changed, 106 insertions(+), 8 deletions(-) diff --git a/dbosify/_internal/activities.py b/dbosify/_internal/activities.py index f059520..9ba4c2d 100644 --- a/dbosify/_internal/activities.py +++ b/dbosify/_internal/activities.py @@ -100,6 +100,16 @@ def retry_decision( and failure_type in set(policy.non_retryable_error_types) ): return None, exceptions.RetryState.NON_RETRYABLE_FAILURE + # A schedule-to-close timeout means the whole-lifecycle budget is spent: + # terminal, never retried (Temporal parity). Other timeout types + # (start_to_close, heartbeat) stay retryable, still gated by the budget below. + # Robust across paths: an in-band attempt timeout and a parked-async timeout + # both surface here, so neither can be retried past the budget regardless of + # how ``elapsed`` was reconstructed. + if failure.get("cls") == "TimeoutError" and failure.get("timeout_type") == int( + exceptions.TimeoutType.SCHEDULE_TO_CLOSE + ): + return None, exceptions.RetryState.TIMEOUT if policy.maximum_attempts and attempt >= policy.maximum_attempts: return None, exceptions.RetryState.MAXIMUM_ATTEMPTS_REACHED override = failure.get("next_retry_delay") diff --git a/dbosify/_internal/activity_workflow.py b/dbosify/_internal/activity_workflow.py index 0b6ebd5..67b80a2 100644 --- a/dbosify/_internal/activity_workflow.py +++ b/dbosify/_internal/activity_workflow.py @@ -143,10 +143,18 @@ async def _run_queued_activity(payload: Dict[str, Any]) -> Dict[str, Any]: meta["deadline_type"] = deadline_type envelope: Dict[str, Any] = await step_fn(args, deadline, meta) if envelope.get("async_pending"): - # raise_complete_async(): park for external completion, bounded by the same deadline; a fail re-runs per policy. - wait_timeout = ( - deadline if deadline is not None else inbox.RECV_TIMEOUT_SECONDS - ) + # raise_complete_async(): park for external completion. The attempt + # deadline already covers the body run, so charge the body time the + # attempt just consumed against it (mirrors the interpreter inline + # path) — otherwise the park re-waits the full deadline. A fail/timeout + # re-runs or gives up per policy. + if deadline is not None: + body_elapsed = float(envelope.get("ended_at", 0.0)) - float( + envelope.get("started_at", envelope.get("ended_at", 0.0)) + ) + wait_timeout = max(0.0, deadline - max(0.0, body_elapsed)) + else: + wait_timeout = inbox.RECV_TIMEOUT_SECONDS envelope = await _await_async_completion( envelope, wait_timeout, str(meta.get("activity_id", "")), deadline_type ) diff --git a/dbosify/_internal/interpreter.py b/dbosify/_internal/interpreter.py index 0d7f94b..f2e76ce 100644 --- a/dbosify/_internal/interpreter.py +++ b/dbosify/_internal/interpreter.py @@ -621,6 +621,11 @@ class _ActivityExec: cancellation_type: int = 0 # ActivityCancellationType; 2 = ABANDON attempt: int = 1 in_backoff: bool = False + # Seconds of the in-flight durable wait (retry backoff or async-pending park) + # not yet charged to virtual time. Charged when the wait completes so the next + # attempt's schedule-to-close deadline and the give-up gate both include it. + # Deterministic (the wait length is recomputed on replay), so replay-stable. + pending_wait_seconds: float = 0.0 last_failure: Optional[FailureEnvelope] = None cancel_requested: bool = False # WAIT_CANCELLATION_COMPLETED in flight async_pending: bool = False # raise_complete_async(): awaiting external @@ -2042,6 +2047,13 @@ async def _deliver(self, done: Set["asyncio.Task[Any]"]) -> None: self._deliver_queued_activity_event(waiter) elif waiter.kind == "act_s2c": parked_state = self._pending_activities.get(waiter.seq) + if parked_state is not None: + # The park fully elapsed: charge it to virtual time so a + # retried start-to-close park's budget gate includes the wait. + self._advance_time( + self._vloop.time() + parked_state.pending_wait_seconds + ) + parked_state.pending_wait_seconds = 0.0 self._async_parked_timeout( waiter.seq, ( @@ -2069,7 +2081,12 @@ def _deliver_activity_event(self, waiter: _Waiter) -> None: self._pending_activities.pop(waiter.seq, None) return if exec_state.in_backoff: - # Backoff sleep finished -> next attempt. + # Backoff sleep finished -> next attempt. Charge the backoff to virtual + # time so the next attempt's schedule-to-close budget reflects it (the + # delivery of an attempt's ``ended_at`` advances time, but a backoff + # sleep has no envelope of its own). + self._advance_time(self._vloop.time() + exec_state.pending_wait_seconds) + exec_state.pending_wait_seconds = 0.0 exec_state.attempt += 1 self._launch_attempt(exec_state) return @@ -2103,10 +2120,15 @@ def _deliver_activity_event(self, waiter: _Waiter) -> None: if parked_type == "schedule_to_close" else exceptions.TimeoutType.START_TO_CLOSE ) + park_seconds = max(0.05, parked) + # Charge the park to virtual time when it fires so a retried + # start-to-close park's give-up gate includes it (a + # schedule-to-close park is terminal, but the bookkeeping is uniform). + exec_state.pending_wait_seconds = park_seconds self._launch_waiter( "act_s2c", exec_state.seq, - DBOS.sleep_async(max(0.05, parked)), + DBOS.sleep_async(park_seconds), ) if exec_state.heartbeat_timeout is not None: exec_state.async_hb_seen = False @@ -2135,6 +2157,7 @@ def _deliver_activity_event(self, waiter: _Waiter) -> None: retry_delay, retry_state = self._retry_decision(exec_state, failure) if retry_delay is not None: exec_state.in_backoff = True + exec_state.pending_wait_seconds = retry_delay self._launch_waiter( "activity", exec_state.seq, DBOS.sleep_async(retry_delay) ) @@ -2428,6 +2451,7 @@ def _fail_async_attempt( retry_delay, retry_state = self._retry_decision(exec_state, failure) if retry_delay is not None: exec_state.in_backoff = True + exec_state.pending_wait_seconds = retry_delay self._launch_waiter("activity", seq, DBOS.sleep_async(retry_delay)) return del self._pending_activities[seq] diff --git a/tests/integration/test_schedule_to_close.py b/tests/integration/test_schedule_to_close.py index 34f1335..84f34e1 100644 --- a/tests/integration/test_schedule_to_close.py +++ b/tests/integration/test_schedule_to_close.py @@ -13,7 +13,7 @@ import time as _time from contextlib import asynccontextmanager from datetime import timedelta -from typing import AsyncIterator +from typing import AsyncIterator, List import pytest @@ -39,6 +39,19 @@ async def always_fails() -> None: raise ApplicationError("boom", type="Boom") +# Each invocation of the parked async activity below records its attempt number. +# A schedule-to-close timeout on a *parked* (raise_complete_async) attempt must be +# terminal: the body runs exactly once and is never re-invoked past the budget, +# even with retries still available. +_PARK_INVOCATIONS: List[int] = [] + + +@activity.defn +async def park_and_count() -> None: + _PARK_INVOCATIONS.append(activity.info().attempt) + activity.raise_complete_async() + + def _describe(err: ActivityError) -> str: """A flat, serializable summary of the terminal ActivityError so the assertion lives in the test, not the workflow.""" @@ -99,6 +112,27 @@ async def run(self) -> str: return _describe(err) +@workflow.defn +class ParkedAsyncScheduleToClose: + @workflow.run + async def run(self) -> str: + try: + await workflow.execute_activity( + park_and_count, + schedule_to_close_timeout=timedelta(seconds=2), + # Retries are available: only the budget — not the count — may + # stop the parked attempt, and it must stop terminally. + retry_policy=RetryPolicy( + initial_interval=timedelta(seconds=0.4), + backoff_coefficient=1.0, + maximum_attempts=100, + ), + ) + return "no-timeout" + except ActivityError as err: + return _describe(err) + + @asynccontextmanager async def _env() -> AsyncIterator[Client]: worker = Worker( @@ -108,8 +142,9 @@ async def _env() -> AsyncIterator[Client]: OnlyScheduleToCloseHang, ScheduleToCloseBeatsStartToClose, RetryExhaustsBudget, + ParkedAsyncScheduleToClose, ], - activities=[hang_forever, always_fails], + activities=[hang_forever, always_fails, park_and_count], ) async with worker: client = await connect_client() @@ -132,6 +167,27 @@ async def test_schedule_to_close_times_out_hanging_attempt() -> None: assert elapsed < 30 # bounded by the ~2s budget, not an infinite hang +@pytest.mark.timeout(60) +async def test_parked_async_schedule_to_close_is_terminal_not_retried() -> None: + # A parked (raise_complete_async) attempt bounded only by schedule_to_close + # must time out terminally with SCHEDULE_TO_CLOSE and never be re-invoked, + # even with retries left. Regression: the park time escaped the budget gate, + # so the timeout was retried and the activity re-ran past its budget. + _PARK_INVOCATIONS.clear() + async with _env() as client: + started = _time.monotonic() + result = await client.execute_workflow( + ParkedAsyncScheduleToClose.run, + id="stc-parked-async-terminal", + task_queue=TASK_QUEUE, + ) + elapsed = _time.monotonic() - started + assert result == "timeout:SCHEDULE_TO_CLOSE:inner=None:state=TIMEOUT" + # The discriminating assertion: invoked exactly once (attempt 1), not retried. + assert _PARK_INVOCATIONS == [1] + assert elapsed < 30 # bounded by the ~2s budget + + @pytest.mark.timeout(60) async def test_schedule_to_close_cuts_attempt_before_start_to_close() -> None: # schedule_to_close (2s) is shorter than start_to_close (60s): the running From 57bd971b49ad11d0c4258cd323077d8c49be73db Mon Sep 17 00:00:00 2001 From: Peter Kraft Date: Tue, 23 Jun 2026 16:46:31 -0700 Subject: [PATCH 9/9] cleanup --- dbosify/_internal/activities.py | 7 +------ dbosify/_internal/activity_workflow.py | 6 +----- dbosify/_internal/interpreter.py | 17 ++++------------- tests/integration/test_schedule_to_close.py | 13 +++---------- 4 files changed, 9 insertions(+), 34 deletions(-) diff --git a/dbosify/_internal/activities.py b/dbosify/_internal/activities.py index 9ba4c2d..6ce82ab 100644 --- a/dbosify/_internal/activities.py +++ b/dbosify/_internal/activities.py @@ -100,12 +100,7 @@ def retry_decision( and failure_type in set(policy.non_retryable_error_types) ): return None, exceptions.RetryState.NON_RETRYABLE_FAILURE - # A schedule-to-close timeout means the whole-lifecycle budget is spent: - # terminal, never retried (Temporal parity). Other timeout types - # (start_to_close, heartbeat) stay retryable, still gated by the budget below. - # Robust across paths: an in-band attempt timeout and a parked-async timeout - # both surface here, so neither can be retried past the budget regardless of - # how ``elapsed`` was reconstructed. + # A schedule-to-close timeout is terminal, never retried (Temporal parity). if failure.get("cls") == "TimeoutError" and failure.get("timeout_type") == int( exceptions.TimeoutType.SCHEDULE_TO_CLOSE ): diff --git a/dbosify/_internal/activity_workflow.py b/dbosify/_internal/activity_workflow.py index 67b80a2..b8c7e15 100644 --- a/dbosify/_internal/activity_workflow.py +++ b/dbosify/_internal/activity_workflow.py @@ -143,11 +143,7 @@ async def _run_queued_activity(payload: Dict[str, Any]) -> Dict[str, Any]: meta["deadline_type"] = deadline_type envelope: Dict[str, Any] = await step_fn(args, deadline, meta) if envelope.get("async_pending"): - # raise_complete_async(): park for external completion. The attempt - # deadline already covers the body run, so charge the body time the - # attempt just consumed against it (mirrors the interpreter inline - # path) — otherwise the park re-waits the full deadline. A fail/timeout - # re-runs or gives up per policy. + # Park for completion, minus the body time already spent against the deadline. if deadline is not None: body_elapsed = float(envelope.get("ended_at", 0.0)) - float( envelope.get("started_at", envelope.get("ended_at", 0.0)) diff --git a/dbosify/_internal/interpreter.py b/dbosify/_internal/interpreter.py index f2e76ce..2361feb 100644 --- a/dbosify/_internal/interpreter.py +++ b/dbosify/_internal/interpreter.py @@ -621,10 +621,7 @@ class _ActivityExec: cancellation_type: int = 0 # ActivityCancellationType; 2 = ABANDON attempt: int = 1 in_backoff: bool = False - # Seconds of the in-flight durable wait (retry backoff or async-pending park) - # not yet charged to virtual time. Charged when the wait completes so the next - # attempt's schedule-to-close deadline and the give-up gate both include it. - # Deterministic (the wait length is recomputed on replay), so replay-stable. + # In-flight backoff/park seconds, charged to virtual time on completion so the budget counts them (replay-stable). pending_wait_seconds: float = 0.0 last_failure: Optional[FailureEnvelope] = None cancel_requested: bool = False # WAIT_CANCELLATION_COMPLETED in flight @@ -2048,8 +2045,7 @@ async def _deliver(self, done: Set["asyncio.Task[Any]"]) -> None: elif waiter.kind == "act_s2c": parked_state = self._pending_activities.get(waiter.seq) if parked_state is not None: - # The park fully elapsed: charge it to virtual time so a - # retried start-to-close park's budget gate includes the wait. + # Charge the elapsed park to virtual time (counts toward a retry's budget). self._advance_time( self._vloop.time() + parked_state.pending_wait_seconds ) @@ -2081,10 +2077,7 @@ def _deliver_activity_event(self, waiter: _Waiter) -> None: self._pending_activities.pop(waiter.seq, None) return if exec_state.in_backoff: - # Backoff sleep finished -> next attempt. Charge the backoff to virtual - # time so the next attempt's schedule-to-close budget reflects it (the - # delivery of an attempt's ``ended_at`` advances time, but a backoff - # sleep has no envelope of its own). + # Backoff done -> next attempt; charge it to virtual time (no ended_at of its own). self._advance_time(self._vloop.time() + exec_state.pending_wait_seconds) exec_state.pending_wait_seconds = 0.0 exec_state.attempt += 1 @@ -2121,9 +2114,7 @@ def _deliver_activity_event(self, waiter: _Waiter) -> None: else exceptions.TimeoutType.START_TO_CLOSE ) park_seconds = max(0.05, parked) - # Charge the park to virtual time when it fires so a retried - # start-to-close park's give-up gate includes it (a - # schedule-to-close park is terminal, but the bookkeeping is uniform). + # Charge the park to virtual time when it fires (counts toward a retry's budget). exec_state.pending_wait_seconds = park_seconds self._launch_waiter( "act_s2c", diff --git a/tests/integration/test_schedule_to_close.py b/tests/integration/test_schedule_to_close.py index 84f34e1..a8facf8 100644 --- a/tests/integration/test_schedule_to_close.py +++ b/tests/integration/test_schedule_to_close.py @@ -39,10 +39,7 @@ async def always_fails() -> None: raise ApplicationError("boom", type="Boom") -# Each invocation of the parked async activity below records its attempt number. -# A schedule-to-close timeout on a *parked* (raise_complete_async) attempt must be -# terminal: the body runs exactly once and is never re-invoked past the budget, -# even with retries still available. +# Records each invocation of the parked async activity below (must run exactly once under a budget timeout). _PARK_INVOCATIONS: List[int] = [] @@ -120,8 +117,7 @@ async def run(self) -> str: await workflow.execute_activity( park_and_count, schedule_to_close_timeout=timedelta(seconds=2), - # Retries are available: only the budget — not the count — may - # stop the parked attempt, and it must stop terminally. + # Retries available: only the budget, not the count, may stop it. retry_policy=RetryPolicy( initial_interval=timedelta(seconds=0.4), backoff_coefficient=1.0, @@ -169,10 +165,7 @@ async def test_schedule_to_close_times_out_hanging_attempt() -> None: @pytest.mark.timeout(60) async def test_parked_async_schedule_to_close_is_terminal_not_retried() -> None: - # A parked (raise_complete_async) attempt bounded only by schedule_to_close - # must time out terminally with SCHEDULE_TO_CLOSE and never be re-invoked, - # even with retries left. Regression: the park time escaped the budget gate, - # so the timeout was retried and the activity re-ran past its budget. + # A parked attempt under schedule_to_close must time out terminally and never re-run (regression: the park escaped the budget gate). _PARK_INVOCATIONS.clear() async with _env() as client: started = _time.monotonic()