fix: add timeout to execute_workflow to prevent unbounded lock hold (closes #9051)#9438
Conversation
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
|
AGENT_REVIEW_VERDICT: TEST_APPROVED APPROVED WITH SUGGESTIONSThis test correctly reproduces the bug and will pass after the fix is implemented. All critical dimensions check out. A. Test RealismAll imports resolve to real production values:
The Prefect test harness is correctly reused via the session-scoped B. Test CorrectnessThe test fails today with After the fix adds The test cannot pass without changing C. Test QualityGenerally fine, with two minor points: 1. Redundant fixture dependency in the test function.
Suggestion ( 2. After the fix, if D. Alignment with AnalysisThe test is tightly aligned with the analyst's root cause:
No scope issues — the test does not reach Recommended next steps
AGENT_REVIEW_ITERATION: test-1
|
|
/bug-fix |
|
✅ Bug fixer agent completed successfully! |
…9051) When no Prefect worker is available, WorkflowWorkerExecution.execute_workflow blocked indefinitely on run_deployment, holding global_schema_lock forever. Add timeout parameter to the abstract interface and both concrete implementations, pass it to run_deployment, and raise FlowRunWaitTimeout when the run does not reach a terminal state within the allotted time. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…apter The timeout parameter is passed through to Prefect's run_deployment API, not used as an asyncio.timeout() context manager. Suppress the ruff ASYNC109 rule inline on each parameter declaration. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
|
Commit pushed:
|
|
Fix implemented and pushed to this PR. What was done:
Validation: Warning Firewall blocked 1 domainThe following domain was blocked by the firewall during workflow execution:
network:
allowed:
- defaults
- "sensomatic.prefect.io"See Network Configuration for more information. Note 🔒 Integrity filter blocked 2 itemsThe following items were blocked because they don't meet the GitHub integrity level.
To allow these resources, lower tools:
github:
min-integrity: approved # merged | approved | unapproved | none
|
|
AGENT_REVIEW_VERDICT: FIX_CHANGES_REQUESTED CHANGES REQUESTEDThe fix correctly solves the root cause and all production code paths are properly updated. One concrete subclass of A. CorrectnessThe fix correctly addresses the root cause: The post-call check is sound given the stated Prefect 3.6.13 behavior (returns a non-final No edge-case correctness issues with the main happy path or existing callers (all pass B. Code Quality — BLOCKING
# current (incomplete)
async def execute_workflow(
self,
workflow: WorkflowDefinition,
expected_return: type | None = None,
context: InfrahubContext | None = None,
parameters: dict[str, Any] | None = None,
tags: list[str] | None = None, # <-- timeout missing
) -> Any:The mypy config ( Required fix — add the parameter to async def execute_workflow(
self,
workflow: WorkflowDefinition,
expected_return: type | None = None,
context: InfrahubContext | None = None,
parameters: dict[str, Any] | None = None,
tags: list[str] | None = None,
timeout: float | None = None, # noqa: ARG002, ASYNC109
) -> Any:C. Documentation Alignment
D. Test QualityThe test itself is sound and unchanged from what was approved in the test review. Two minor leftovers from that review were not addressed (non-blocking):
Recommended next steps
AGENT_REVIEW_ITERATION: fix-1
|
|
/bug-fix |
|
✅ Bug fixer agent completed successfully! |
- Add `timeout: float | None = None` to `WorkflowRecorder.execute_workflow` to match the updated abstract base signature; the parameter is accepted and ignored (recorder semantics are unchanged). - Remove stale `# type: ignore[call-overload]` from the replication test now that both overloads in worker.py include the `timeout` parameter. - Remove redundant `prefect_test_fixture: None` from the test function signature; `work_pool_and_deployment` already depends on it. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
|
Commit pushed:
|
|
Revision pushed addressing all items from the review. Blocking fix:
Non-blocking cleanups:
Validation: replication test passes ( Warning Firewall blocked 1 domainThe following domain was blocked by the firewall during workflow execution:
network:
allowed:
- defaults
- "sensomatic.prefect.io"See Network Configuration for more information. Note 🔒 Integrity filter blocked 1 itemThe following item was blocked because it doesn't meet the GitHub integrity level.
To allow these resources, lower tools:
github:
min-integrity: approved # merged | approved | unapproved | none
|
|
AGENT_REVIEW_VERDICT: FIX_APPROVED APPROVEDAll blocking items from the previous fix review have been resolved. The fix is correct, complete, and clean across all three production adapters and the test recorder. A. CorrectnessRoot cause is fully addressed. Edge cases are sound:
B. Code Quality
The All
No unnecessary refactoring, no security or performance concerns. C. Documentation Alignment
D. Test QualityBoth non-blocking items from fix-1 are resolved:
The test exercises observable behavior (exception raised), not implementation details. It cannot pass without the fix. It is isolated, deterministic, and follows project conventions. One minor observation (non-blocking, no action needed): Recommended next stepsNo changes required. The fix is ready for human review and merge. AGENT_REVIEW_ITERATION: fix-2
|
Why
WorkflowWorkerExecution.execute_workflowcalled Prefect'srun_deploymentwith notimeoutargument. When the workflow task worker is unreachable (e.g., not running), the call blocks indefinitely. In the/api/schema/loadhandler, this holdsglobal_schema_lockforever, preventing any subsequent schema operations from acquiring the lock.The goal is to let callers supply a
timeoutso thatexecute_workflowraisesFlowRunWaitTimeoutif no worker picks up the submitted flow run within the deadline, freeing the lock.Non-goals: this PR does not add a default timeout or configure one via settings; it only wires the parameter through so callers can opt in.
Closes #9051
Fix strategy
The root cause is shallow: a missing parameter that prevents passing a timeout to Prefect. A targeted fix is appropriate — no design refactoring needed.
run_deployment(timeout=N)returns theFlowRunin its current state (SCHEDULED) when the timeout expires instead of raisingFlowRunWaitTimeout. Afterrun_deploymentreturns, I check whether a timeout was given and the run is not yet in a terminal state, and raiseFlowRunWaitTimeoutin that case to give callers the expected semantics.What changed
execute_workflownow accepts atimeout: float | None = Noneparameter. When set, if the flow run does not reach a terminal state withintimeoutseconds,FlowRunWaitTimeoutis raised instead of blocking indefinitely.InfrahubWorkflow.execute_workflow(abstract base + overloads in__init__.py): addedtimeoutparameter.WorkflowWorkerExecution.execute_workflow(overloads + impl inworker.py): addedtimeout, passes it torun_deployment, and raisesFlowRunWaitTimeoutwhen the run is not final after timeout.WorkflowLocalExecution.execute_workflow(local.py): addedtimeout(accepted but ignored — local execution is synchronous and has no concept of a remote worker timeout).# noqa: ASYNC109suppressions added: the ruff rule flagstimeoutparams on async functions suggestingasyncio.timeout()context manager, but heretimeoutis an API value forwarded to Prefect, not an asyncio context.API contract: fully backward compatible —
timeoutdefaults toNone, matching previous behavior (unbounded wait).How to review
backend/infrahub/services/adapters/workflow/worker.py— the core fix: new parameter and theFlowRunWaitTimeoutraise after checkingresponse.state.is_final().backend/infrahub/services/adapters/workflow/__init__.py— abstract interface updated to includetimeout.backend/infrahub/services/adapters/workflow/local.py— trivial: accept and ignoretimeoutto keep the interface consistent.backend/tests/functional/workflow/test_worker_timeout.py— the replication test (written by the test-writer agent, not modified here).How to test
Expected: 1 passed —
FlowRunWaitTimeoutis raised after 1 second when no worker picks up the submitted flow run.Expected: all 765 unit tests pass.
Impact & rollout
timeoutparameter defaults toNone, preserving the current unbounded-wait behavior for all existing callers.timeout=None(default).Checklist
uv run towncrier create ...)Analyst's findings (summary)
Replication test
Test file:
backend/tests/functional/workflow/test_worker_timeout.pyTest name:
test_execute_workflow_raises_when_no_worker_availableWhat it tests:
execute_workflowraisesFlowRunWaitTimeoutwhen called with atimeoutparameter and no Prefect worker picks up the submitted flow run within that window.Verification: Test confirmed PASSING after fix.
AGENT_FIX_COMPLETE
Warning
Firewall blocked 1 domain
The following domain was blocked by the firewall during workflow execution:
sensomatic.prefect.ioSee Network Configuration for more information.
Note
🔒 Integrity filter blocked 2 items
The following items were blocked because they don't meet the GitHub integrity level.
search_pull_requests: has lower integrity than agent requires. The agent cannot read data with integrity below "approved".issue_read: has lower integrity than agent requires. The agent cannot read data with integrity below "approved".To allow these resources, lower
min-integrityin your GitHub frontmatter: