From 3d6ec40a20837a434be04fdda8a4f2e8e705b49f Mon Sep 17 00:00:00 2001 From: junhokim Date: Thu, 21 May 2026 14:21:07 +0900 Subject: [PATCH 1/3] Add opt-in cancel_futures_on_exit to ThreadSensitiveContext MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Default ThreadSensitiveContext.__aexit__ calls executor.shutdown() with the implicit wait=True. When sync middleware in the context calls back into the event loop via SyncToAsync, the request-scoped pool ends up waiting on a future the loop itself must produce — a textbook cross-call deadlock that shows up in production as never-ending requests and stuck health probes (see #545, #495, #458). This change adds an opt-in escape hatch that switches the shutdown call to shutdown(wait=False, cancel_futures=True) — pending futures are cancelled, the loop is freed, running workers finish in the background. Default behaviour is preserved; users can opt in three ways: 1. constructor argument: async with ThreadSensitiveContext(cancel_futures_on_exit=True): ... 2. subclass attribute (handy when ThreadSensitiveContext is instantiated by framework code you cannot easily reach): class MyTSCtx(ThreadSensitiveContext): cancel_futures_on_exit = True 3. environment variable for process-wide opt-in: ASGIREF_CANCEL_FUTURES_ON_THREADSENSITIVE_EXIT=1 Tests cover the historic graceful-drain default plus all three opt-in paths. --- asgiref/sync.py | 28 +++++++++++++-- tests/test_sync.py | 88 ++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 114 insertions(+), 2 deletions(-) diff --git a/asgiref/sync.py b/asgiref/sync.py index e733c8e5..3c5b2989 100644 --- a/asgiref/sync.py +++ b/asgiref/sync.py @@ -126,10 +126,31 @@ class ThreadSensitiveContext: >>> import time >>> async with ThreadSensitiveContext(): ... await sync_to_async(time.sleep, 1)() + + ``__aexit__`` waits for every worker in the context's ``ThreadPoolExecutor`` + to finish before returning. That is the safe default, but it deadlocks + when sync code inside the context calls back into the event loop via + ``SyncToAsync`` and the loop ends up cleaning the context while those + workers are still parked on a future the loop itself must produce + (see issues #545, #495, #458). + + Set ``cancel_futures_on_exit`` (per-instance, per-subclass, or globally + via ``ASGIREF_CANCEL_FUTURES_ON_THREADSENSITIVE_EXIT=1``) to switch + ``executor.shutdown()`` to ``shutdown(wait=False, cancel_futures=True)`` — + pending futures are cancelled, the event loop is freed, and running + workers finish in the background. Trade graceful drain for liveness. """ - def __init__(self): + #: Opt-in: skip the graceful drain on exit. Default preserves historic behaviour. + cancel_futures_on_exit: bool = ( + os.environ.get("ASGIREF_CANCEL_FUTURES_ON_THREADSENSITIVE_EXIT", "").lower() + in ("1", "true", "yes") + ) + + def __init__(self, cancel_futures_on_exit: bool | None = None): self.token = None + if cancel_futures_on_exit is not None: + self.cancel_futures_on_exit = cancel_futures_on_exit async def __aenter__(self): try: @@ -145,7 +166,10 @@ async def __aexit__(self, exc, value, tb): executor = SyncToAsync.context_to_thread_executor.pop(self, None) if executor: - executor.shutdown() + if self.cancel_futures_on_exit: + executor.shutdown(wait=False, cancel_futures=True) + else: + executor.shutdown() SyncToAsync.thread_sensitive_context.reset(self.token) diff --git a/tests/test_sync.py b/tests/test_sync.py index 4199ce65..26de529a 100644 --- a/tests/test_sync.py +++ b/tests/test_sync.py @@ -15,6 +15,7 @@ from asgiref.sync import ( AsyncSingleThreadContext, + SyncToAsync, ThreadSensitiveContext, async_to_sync, iscoroutinefunction, @@ -690,6 +691,93 @@ async def test_thread_sensitive_context_without_sync_work(): pass +@pytest.mark.asyncio +async def test_thread_sensitive_context_default_drains_executor_gracefully(monkeypatch): + """Default behaviour: ``executor.shutdown()`` called without ``cancel_futures``.""" + monkeypatch.delenv("ASGIREF_CANCEL_FUTURES_ON_THREADSENSITIVE_EXIT", raising=False) + + shutdown_calls = [] + + class _Recorder: + def shutdown(self, *args, **kwargs): + shutdown_calls.append({"args": args, "kwargs": kwargs}) + + ctx = ThreadSensitiveContext() + assert ctx.cancel_futures_on_exit is False, ( + "default must preserve historic graceful drain behaviour" + ) + async with ctx: + SyncToAsync.context_to_thread_executor[ctx] = _Recorder() + + assert shutdown_calls == [{"args": (), "kwargs": {}}], ( + "default shutdown must remain wait=True with no cancel_futures" + ) + + +@pytest.mark.asyncio +async def test_thread_sensitive_context_cancel_futures_on_exit_per_instance(): + """Per-instance opt-in cancels pending futures and skips the join.""" + shutdown_calls = [] + + class _Recorder: + def shutdown(self, *args, **kwargs): + shutdown_calls.append({"args": args, "kwargs": kwargs}) + + ctx = ThreadSensitiveContext(cancel_futures_on_exit=True) + async with ctx: + SyncToAsync.context_to_thread_executor[ctx] = _Recorder() + + assert shutdown_calls == [ + {"args": (), "kwargs": {"wait": False, "cancel_futures": True}} + ] + + +@pytest.mark.asyncio +async def test_thread_sensitive_context_cancel_futures_on_exit_via_env(monkeypatch): + """Global opt-in via ``ASGIREF_CANCEL_FUTURES_ON_THREADSENSITIVE_EXIT``.""" + monkeypatch.setenv("ASGIREF_CANCEL_FUTURES_ON_THREADSENSITIVE_EXIT", "1") + + # Re-evaluate the class default by reimporting; in-process we mimic that + # by setting the class attribute, which is what the env var feeds into. + monkeypatch.setattr(ThreadSensitiveContext, "cancel_futures_on_exit", True) + + shutdown_calls = [] + + class _Recorder: + def shutdown(self, *args, **kwargs): + shutdown_calls.append({"args": args, "kwargs": kwargs}) + + ctx = ThreadSensitiveContext() + assert ctx.cancel_futures_on_exit is True + async with ctx: + SyncToAsync.context_to_thread_executor[ctx] = _Recorder() + + assert shutdown_calls == [ + {"args": (), "kwargs": {"wait": False, "cancel_futures": True}} + ] + + +@pytest.mark.asyncio +async def test_thread_sensitive_context_cancel_futures_on_exit_via_subclass(): + """Subclass opt-in: override ``cancel_futures_on_exit`` on a subclass.""" + shutdown_calls = [] + + class _Recorder: + def shutdown(self, *args, **kwargs): + shutdown_calls.append({"args": args, "kwargs": kwargs}) + + class _NoDrain(ThreadSensitiveContext): + cancel_futures_on_exit = True + + ctx = _NoDrain() + async with ctx: + SyncToAsync.context_to_thread_executor[ctx] = _Recorder() + + assert shutdown_calls == [ + {"args": (), "kwargs": {"wait": False, "cancel_futures": True}} + ] + + def test_thread_sensitive_double_nested_sync(): """ Tests that thread_sensitive SyncToAsync nests inside itself where the From d51c9250e1cc917245db60bf0c360e373396e057 Mon Sep 17 00:00:00 2001 From: junhokim Date: Thu, 21 May 2026 15:27:44 +0900 Subject: [PATCH 2/3] Apply black/isort formatting --- asgiref/sync.py | 7 +++---- tests/test_sync.py | 12 ++++++------ 2 files changed, 9 insertions(+), 10 deletions(-) diff --git a/asgiref/sync.py b/asgiref/sync.py index 3c5b2989..5b5f7fd7 100644 --- a/asgiref/sync.py +++ b/asgiref/sync.py @@ -142,10 +142,9 @@ class ThreadSensitiveContext: """ #: Opt-in: skip the graceful drain on exit. Default preserves historic behaviour. - cancel_futures_on_exit: bool = ( - os.environ.get("ASGIREF_CANCEL_FUTURES_ON_THREADSENSITIVE_EXIT", "").lower() - in ("1", "true", "yes") - ) + cancel_futures_on_exit: bool = os.environ.get( + "ASGIREF_CANCEL_FUTURES_ON_THREADSENSITIVE_EXIT", "" + ).lower() in ("1", "true", "yes") def __init__(self, cancel_futures_on_exit: bool | None = None): self.token = None diff --git a/tests/test_sync.py b/tests/test_sync.py index 26de529a..90de06f2 100644 --- a/tests/test_sync.py +++ b/tests/test_sync.py @@ -703,15 +703,15 @@ def shutdown(self, *args, **kwargs): shutdown_calls.append({"args": args, "kwargs": kwargs}) ctx = ThreadSensitiveContext() - assert ctx.cancel_futures_on_exit is False, ( - "default must preserve historic graceful drain behaviour" - ) + assert ( + ctx.cancel_futures_on_exit is False + ), "default must preserve historic graceful drain behaviour" async with ctx: SyncToAsync.context_to_thread_executor[ctx] = _Recorder() - assert shutdown_calls == [{"args": (), "kwargs": {}}], ( - "default shutdown must remain wait=True with no cancel_futures" - ) + assert shutdown_calls == [ + {"args": (), "kwargs": {}} + ], "default shutdown must remain wait=True with no cancel_futures" @pytest.mark.asyncio From 534fd372aea167e461a08bbe71cfa12576f5e23d Mon Sep 17 00:00:00 2001 From: junhokim Date: Thu, 21 May 2026 15:39:48 +0900 Subject: [PATCH 3/3] Address review feedback: env parser helper, Python 3.9 compat, CHANGELOG - Extract _cancel_futures_on_exit_default_from_env() so tests can exercise env parsing without a subprocess (asgiref has no existing subprocess test convention). - Replace bool | None with Optional[bool] to keep Python 3.9 compatibility. - Document that the env var is read once at import time; runtime changes require the constructor argument or a subclass attribute. - Add parametrized env-parser test, explicit-False-override test. - Add CHANGELOG.txt Unreleased entry. --- CHANGELOG.txt | 12 ++++++++++++ asgiref/sync.py | 21 +++++++++++++++++---- tests/test_sync.py | 39 +++++++++++++++++++++++++++------------ 3 files changed, 56 insertions(+), 16 deletions(-) diff --git a/CHANGELOG.txt b/CHANGELOG.txt index ee4c69c0..4bf7f75f 100644 --- a/CHANGELOG.txt +++ b/CHANGELOG.txt @@ -1,3 +1,15 @@ +Unreleased +---------- + +* Added an opt-in ``cancel_futures_on_exit`` flag to ``ThreadSensitiveContext``. + When enabled, ``__aexit__`` calls ``executor.shutdown(wait=False, + cancel_futures=True)`` instead of waiting for worker threads to drain — an + escape hatch for the cross-call deadlock that occurs when sync code in the + context calls back into the event loop via ``SyncToAsync``. The default is + unchanged. The flag can be set per-instance, per-subclass, or process-wide + via the ``ASGIREF_CANCEL_FUTURES_ON_THREADSENSITIVE_EXIT`` environment + variable. + 3.11.1 (2026-02-03) ------------------- diff --git a/asgiref/sync.py b/asgiref/sync.py index 5b5f7fd7..ffa00402 100644 --- a/asgiref/sync.py +++ b/asgiref/sync.py @@ -108,6 +108,17 @@ def __exit__(self, exc, value, tb): AsyncToSync.async_single_thread_context.reset(self.token) +def _cancel_futures_on_exit_default_from_env() -> bool: + """Read ``ASGIREF_CANCEL_FUTURES_ON_THREADSENSITIVE_EXIT`` from the environment. + + Split out so tests can exercise the parsing without importing in a + subprocess. + """ + return os.environ.get( + "ASGIREF_CANCEL_FUTURES_ON_THREADSENSITIVE_EXIT", "" + ).lower() in ("1", "true", "yes") + + class ThreadSensitiveContext: """Async context manager to manage context for thread sensitive mode @@ -139,14 +150,16 @@ class ThreadSensitiveContext: ``executor.shutdown()`` to ``shutdown(wait=False, cancel_futures=True)`` — pending futures are cancelled, the event loop is freed, and running workers finish in the background. Trade graceful drain for liveness. + + The environment variable is read once when the class is defined, so it + must be set before ``asgiref.sync`` is imported. Use the constructor + argument or a subclass attribute to change the value at runtime. """ #: Opt-in: skip the graceful drain on exit. Default preserves historic behaviour. - cancel_futures_on_exit: bool = os.environ.get( - "ASGIREF_CANCEL_FUTURES_ON_THREADSENSITIVE_EXIT", "" - ).lower() in ("1", "true", "yes") + cancel_futures_on_exit: bool = _cancel_futures_on_exit_default_from_env() - def __init__(self, cancel_futures_on_exit: bool | None = None): + def __init__(self, cancel_futures_on_exit: Optional[bool] = None): self.token = None if cancel_futures_on_exit is not None: self.cancel_futures_on_exit = cancel_futures_on_exit diff --git a/tests/test_sync.py b/tests/test_sync.py index 90de06f2..afc344b4 100644 --- a/tests/test_sync.py +++ b/tests/test_sync.py @@ -732,29 +732,44 @@ def shutdown(self, *args, **kwargs): ] -@pytest.mark.asyncio -async def test_thread_sensitive_context_cancel_futures_on_exit_via_env(monkeypatch): - """Global opt-in via ``ASGIREF_CANCEL_FUTURES_ON_THREADSENSITIVE_EXIT``.""" - monkeypatch.setenv("ASGIREF_CANCEL_FUTURES_ON_THREADSENSITIVE_EXIT", "1") +@pytest.mark.parametrize( + "value,expected", + [ + ("1", True), + ("true", True), + ("True", True), + ("YES", True), + ("0", False), + ("false", False), + ("", False), + ("anything-else", False), + ], +) +def test_cancel_futures_on_exit_default_from_env(monkeypatch, value, expected): + """Parsing of ``ASGIREF_CANCEL_FUTURES_ON_THREADSENSITIVE_EXIT``.""" + from asgiref.sync import _cancel_futures_on_exit_default_from_env + + monkeypatch.setenv("ASGIREF_CANCEL_FUTURES_ON_THREADSENSITIVE_EXIT", value) + assert _cancel_futures_on_exit_default_from_env() is expected - # Re-evaluate the class default by reimporting; in-process we mimic that - # by setting the class attribute, which is what the env var feeds into. - monkeypatch.setattr(ThreadSensitiveContext, "cancel_futures_on_exit", True) +@pytest.mark.asyncio +async def test_thread_sensitive_context_explicit_false_overrides_class_attribute(): + """Instance argument ``cancel_futures_on_exit=False`` wins over a True class attribute.""" shutdown_calls = [] class _Recorder: def shutdown(self, *args, **kwargs): shutdown_calls.append({"args": args, "kwargs": kwargs}) - ctx = ThreadSensitiveContext() - assert ctx.cancel_futures_on_exit is True + class _OptedIn(ThreadSensitiveContext): + cancel_futures_on_exit = True + + ctx = _OptedIn(cancel_futures_on_exit=False) async with ctx: SyncToAsync.context_to_thread_executor[ctx] = _Recorder() - assert shutdown_calls == [ - {"args": (), "kwargs": {"wait": False, "cancel_futures": True}} - ] + assert shutdown_calls == [{"args": (), "kwargs": {}}] @pytest.mark.asyncio