Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions CHANGELOG.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,15 @@
Unreleased
----------

* Added an opt-in ``cancel_futures_on_exit`` flag to ``ThreadSensitiveContext``.
When enabled, ``__aexit__`` calls ``executor.shutdown(wait=False,
cancel_futures=True)`` instead of waiting for worker threads to drain — an
escape hatch for the cross-call deadlock that occurs when sync code in the
context calls back into the event loop via ``SyncToAsync``. The default is
unchanged. The flag can be set per-instance, per-subclass, or process-wide
via the ``ASGIREF_CANCEL_FUTURES_ON_THREADSENSITIVE_EXIT`` environment
variable.

3.11.1 (2026-02-03)
-------------------

Expand Down
40 changes: 38 additions & 2 deletions asgiref/sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,17 @@ def __exit__(self, exc, value, tb):
AsyncToSync.async_single_thread_context.reset(self.token)


def _cancel_futures_on_exit_default_from_env() -> bool:
"""Read ``ASGIREF_CANCEL_FUTURES_ON_THREADSENSITIVE_EXIT`` from the environment.

Split out so tests can exercise the parsing without importing in a
subprocess.
"""
return os.environ.get(
"ASGIREF_CANCEL_FUTURES_ON_THREADSENSITIVE_EXIT", ""
).lower() in ("1", "true", "yes")


class ThreadSensitiveContext:
"""Async context manager to manage context for thread sensitive mode

Expand All @@ -126,10 +137,32 @@ class ThreadSensitiveContext:
>>> import time
>>> async with ThreadSensitiveContext():
... await sync_to_async(time.sleep, 1)()

``__aexit__`` waits for every worker in the context's ``ThreadPoolExecutor``
to finish before returning. That is the safe default, but it deadlocks
when sync code inside the context calls back into the event loop via
``SyncToAsync`` and the loop ends up cleaning the context while those
workers are still parked on a future the loop itself must produce
(see issues #545, #495, #458).

Set ``cancel_futures_on_exit`` (per-instance, per-subclass, or globally
via ``ASGIREF_CANCEL_FUTURES_ON_THREADSENSITIVE_EXIT=1``) to switch
``executor.shutdown()`` to ``shutdown(wait=False, cancel_futures=True)`` —
pending futures are cancelled, the event loop is freed, and running
workers finish in the background. Trade graceful drain for liveness.

The environment variable is read once when the class is defined, so it
must be set before ``asgiref.sync`` is imported. Use the constructor
argument or a subclass attribute to change the value at runtime.
"""

def __init__(self):
#: Opt-in: skip the graceful drain on exit. Default preserves historic behaviour.
cancel_futures_on_exit: bool = _cancel_futures_on_exit_default_from_env()

def __init__(self, cancel_futures_on_exit: Optional[bool] = None):
self.token = None
if cancel_futures_on_exit is not None:
self.cancel_futures_on_exit = cancel_futures_on_exit

async def __aenter__(self):
try:
Expand All @@ -145,7 +178,10 @@ async def __aexit__(self, exc, value, tb):

executor = SyncToAsync.context_to_thread_executor.pop(self, None)
if executor:
executor.shutdown()
if self.cancel_futures_on_exit:
executor.shutdown(wait=False, cancel_futures=True)
else:
executor.shutdown()
SyncToAsync.thread_sensitive_context.reset(self.token)


Expand Down
103 changes: 103 additions & 0 deletions tests/test_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

from asgiref.sync import (
AsyncSingleThreadContext,
SyncToAsync,
ThreadSensitiveContext,
async_to_sync,
iscoroutinefunction,
Expand Down Expand Up @@ -690,6 +691,108 @@ async def test_thread_sensitive_context_without_sync_work():
pass


@pytest.mark.asyncio
async def test_thread_sensitive_context_default_drains_executor_gracefully(monkeypatch):
"""Default behaviour: ``executor.shutdown()`` called without ``cancel_futures``."""
monkeypatch.delenv("ASGIREF_CANCEL_FUTURES_ON_THREADSENSITIVE_EXIT", raising=False)

shutdown_calls = []

class _Recorder:
def shutdown(self, *args, **kwargs):
shutdown_calls.append({"args": args, "kwargs": kwargs})

ctx = ThreadSensitiveContext()
assert (
ctx.cancel_futures_on_exit is False
), "default must preserve historic graceful drain behaviour"
async with ctx:
SyncToAsync.context_to_thread_executor[ctx] = _Recorder()

assert shutdown_calls == [
{"args": (), "kwargs": {}}
], "default shutdown must remain wait=True with no cancel_futures"


@pytest.mark.asyncio
async def test_thread_sensitive_context_cancel_futures_on_exit_per_instance():
"""Per-instance opt-in cancels pending futures and skips the join."""
shutdown_calls = []

class _Recorder:
def shutdown(self, *args, **kwargs):
shutdown_calls.append({"args": args, "kwargs": kwargs})

ctx = ThreadSensitiveContext(cancel_futures_on_exit=True)
async with ctx:
SyncToAsync.context_to_thread_executor[ctx] = _Recorder()

assert shutdown_calls == [
{"args": (), "kwargs": {"wait": False, "cancel_futures": True}}
]


@pytest.mark.parametrize(
"value,expected",
[
("1", True),
("true", True),
("True", True),
("YES", True),
("0", False),
("false", False),
("", False),
("anything-else", False),
],
)
def test_cancel_futures_on_exit_default_from_env(monkeypatch, value, expected):
"""Parsing of ``ASGIREF_CANCEL_FUTURES_ON_THREADSENSITIVE_EXIT``."""
from asgiref.sync import _cancel_futures_on_exit_default_from_env

monkeypatch.setenv("ASGIREF_CANCEL_FUTURES_ON_THREADSENSITIVE_EXIT", value)
assert _cancel_futures_on_exit_default_from_env() is expected


@pytest.mark.asyncio
async def test_thread_sensitive_context_explicit_false_overrides_class_attribute():
"""Instance argument ``cancel_futures_on_exit=False`` wins over a True class attribute."""
shutdown_calls = []

class _Recorder:
def shutdown(self, *args, **kwargs):
shutdown_calls.append({"args": args, "kwargs": kwargs})

class _OptedIn(ThreadSensitiveContext):
cancel_futures_on_exit = True

ctx = _OptedIn(cancel_futures_on_exit=False)
async with ctx:
SyncToAsync.context_to_thread_executor[ctx] = _Recorder()

assert shutdown_calls == [{"args": (), "kwargs": {}}]


@pytest.mark.asyncio
async def test_thread_sensitive_context_cancel_futures_on_exit_via_subclass():
"""Subclass opt-in: override ``cancel_futures_on_exit`` on a subclass."""
shutdown_calls = []

class _Recorder:
def shutdown(self, *args, **kwargs):
shutdown_calls.append({"args": args, "kwargs": kwargs})

class _NoDrain(ThreadSensitiveContext):
cancel_futures_on_exit = True

ctx = _NoDrain()
async with ctx:
SyncToAsync.context_to_thread_executor[ctx] = _Recorder()

assert shutdown_calls == [
{"args": (), "kwargs": {"wait": False, "cancel_futures": True}}
]


def test_thread_sensitive_double_nested_sync():
"""
Tests that thread_sensitive SyncToAsync nests inside itself where the
Expand Down