diff --git a/CHANGELOG.txt b/CHANGELOG.txt index ee4c69c0..4bf7f75f 100644 --- a/CHANGELOG.txt +++ b/CHANGELOG.txt @@ -1,3 +1,15 @@ +Unreleased +---------- + +* Added an opt-in ``cancel_futures_on_exit`` flag to ``ThreadSensitiveContext``. + When enabled, ``__aexit__`` calls ``executor.shutdown(wait=False, + cancel_futures=True)`` instead of waiting for worker threads to drain — an + escape hatch for the cross-call deadlock that occurs when sync code in the + context calls back into the event loop via ``SyncToAsync``. The default is + unchanged. The flag can be set per-instance, per-subclass, or process-wide + via the ``ASGIREF_CANCEL_FUTURES_ON_THREADSENSITIVE_EXIT`` environment + variable. + 3.11.1 (2026-02-03) ------------------- diff --git a/asgiref/sync.py b/asgiref/sync.py index e733c8e5..ffa00402 100644 --- a/asgiref/sync.py +++ b/asgiref/sync.py @@ -108,6 +108,17 @@ def __exit__(self, exc, value, tb): AsyncToSync.async_single_thread_context.reset(self.token) +def _cancel_futures_on_exit_default_from_env() -> bool: + """Read ``ASGIREF_CANCEL_FUTURES_ON_THREADSENSITIVE_EXIT`` from the environment. + + Split out so tests can exercise the parsing without importing in a + subprocess. + """ + return os.environ.get( + "ASGIREF_CANCEL_FUTURES_ON_THREADSENSITIVE_EXIT", "" + ).lower() in ("1", "true", "yes") + + class ThreadSensitiveContext: """Async context manager to manage context for thread sensitive mode @@ -126,10 +137,32 @@ class ThreadSensitiveContext: >>> import time >>> async with ThreadSensitiveContext(): ... await sync_to_async(time.sleep, 1)() + + ``__aexit__`` waits for every worker in the context's ``ThreadPoolExecutor`` + to finish before returning. That is the safe default, but it deadlocks + when sync code inside the context calls back into the event loop via + ``SyncToAsync`` and the loop ends up cleaning the context while those + workers are still parked on a future the loop itself must produce + (see issues #545, #495, #458). + + Set ``cancel_futures_on_exit`` (per-instance, per-subclass, or globally + via ``ASGIREF_CANCEL_FUTURES_ON_THREADSENSITIVE_EXIT=1``) to switch + ``executor.shutdown()`` to ``shutdown(wait=False, cancel_futures=True)`` — + pending futures are cancelled, the event loop is freed, and running + workers finish in the background. Trade graceful drain for liveness. + + The environment variable is read once when the class is defined, so it + must be set before ``asgiref.sync`` is imported. Use the constructor + argument or a subclass attribute to change the value at runtime. """ - def __init__(self): + #: Opt-in: skip the graceful drain on exit. Default preserves historic behaviour. + cancel_futures_on_exit: bool = _cancel_futures_on_exit_default_from_env() + + def __init__(self, cancel_futures_on_exit: Optional[bool] = None): self.token = None + if cancel_futures_on_exit is not None: + self.cancel_futures_on_exit = cancel_futures_on_exit async def __aenter__(self): try: @@ -145,7 +178,10 @@ async def __aexit__(self, exc, value, tb): executor = SyncToAsync.context_to_thread_executor.pop(self, None) if executor: - executor.shutdown() + if self.cancel_futures_on_exit: + executor.shutdown(wait=False, cancel_futures=True) + else: + executor.shutdown() SyncToAsync.thread_sensitive_context.reset(self.token) diff --git a/tests/test_sync.py b/tests/test_sync.py index 4199ce65..afc344b4 100644 --- a/tests/test_sync.py +++ b/tests/test_sync.py @@ -15,6 +15,7 @@ from asgiref.sync import ( AsyncSingleThreadContext, + SyncToAsync, ThreadSensitiveContext, async_to_sync, iscoroutinefunction, @@ -690,6 +691,108 @@ async def test_thread_sensitive_context_without_sync_work(): pass +@pytest.mark.asyncio +async def test_thread_sensitive_context_default_drains_executor_gracefully(monkeypatch): + """Default behaviour: ``executor.shutdown()`` called without ``cancel_futures``.""" + monkeypatch.delenv("ASGIREF_CANCEL_FUTURES_ON_THREADSENSITIVE_EXIT", raising=False) + + shutdown_calls = [] + + class _Recorder: + def shutdown(self, *args, **kwargs): + shutdown_calls.append({"args": args, "kwargs": kwargs}) + + ctx = ThreadSensitiveContext() + assert ( + ctx.cancel_futures_on_exit is False + ), "default must preserve historic graceful drain behaviour" + async with ctx: + SyncToAsync.context_to_thread_executor[ctx] = _Recorder() + + assert shutdown_calls == [ + {"args": (), "kwargs": {}} + ], "default shutdown must remain wait=True with no cancel_futures" + + +@pytest.mark.asyncio +async def test_thread_sensitive_context_cancel_futures_on_exit_per_instance(): + """Per-instance opt-in cancels pending futures and skips the join.""" + shutdown_calls = [] + + class _Recorder: + def shutdown(self, *args, **kwargs): + shutdown_calls.append({"args": args, "kwargs": kwargs}) + + ctx = ThreadSensitiveContext(cancel_futures_on_exit=True) + async with ctx: + SyncToAsync.context_to_thread_executor[ctx] = _Recorder() + + assert shutdown_calls == [ + {"args": (), "kwargs": {"wait": False, "cancel_futures": True}} + ] + + +@pytest.mark.parametrize( + "value,expected", + [ + ("1", True), + ("true", True), + ("True", True), + ("YES", True), + ("0", False), + ("false", False), + ("", False), + ("anything-else", False), + ], +) +def test_cancel_futures_on_exit_default_from_env(monkeypatch, value, expected): + """Parsing of ``ASGIREF_CANCEL_FUTURES_ON_THREADSENSITIVE_EXIT``.""" + from asgiref.sync import _cancel_futures_on_exit_default_from_env + + monkeypatch.setenv("ASGIREF_CANCEL_FUTURES_ON_THREADSENSITIVE_EXIT", value) + assert _cancel_futures_on_exit_default_from_env() is expected + + +@pytest.mark.asyncio +async def test_thread_sensitive_context_explicit_false_overrides_class_attribute(): + """Instance argument ``cancel_futures_on_exit=False`` wins over a True class attribute.""" + shutdown_calls = [] + + class _Recorder: + def shutdown(self, *args, **kwargs): + shutdown_calls.append({"args": args, "kwargs": kwargs}) + + class _OptedIn(ThreadSensitiveContext): + cancel_futures_on_exit = True + + ctx = _OptedIn(cancel_futures_on_exit=False) + async with ctx: + SyncToAsync.context_to_thread_executor[ctx] = _Recorder() + + assert shutdown_calls == [{"args": (), "kwargs": {}}] + + +@pytest.mark.asyncio +async def test_thread_sensitive_context_cancel_futures_on_exit_via_subclass(): + """Subclass opt-in: override ``cancel_futures_on_exit`` on a subclass.""" + shutdown_calls = [] + + class _Recorder: + def shutdown(self, *args, **kwargs): + shutdown_calls.append({"args": args, "kwargs": kwargs}) + + class _NoDrain(ThreadSensitiveContext): + cancel_futures_on_exit = True + + ctx = _NoDrain() + async with ctx: + SyncToAsync.context_to_thread_executor[ctx] = _Recorder() + + assert shutdown_calls == [ + {"args": (), "kwargs": {"wait": False, "cancel_futures": True}} + ] + + def test_thread_sensitive_double_nested_sync(): """ Tests that thread_sensitive SyncToAsync nests inside itself where the