test: stabilize flaky checkpoint test + add regression for #1339 fallback list

- tests/test_issue765_streaming_persistence.py — replace timing-based polling
  in test_checkpoint_fires_on_activity_counter_increment with deterministic
  threading.Event-driven sync. The old version used time.sleep(0.15)+(0.25)+(0.25)
  with a 0.1s polling thread, which under CI scheduling jitter could miss the
  second increment and complete with only 1 save instead of 2. Now waits up
  to 3.0s for save_count to advance to the target after each increment.
  Locally observed flake on Python 3.11 in CI run 25175204451.

- tests/test_pr1339_fallback_providers_list.py — new structural test that
  asserts streaming.py handles both legacy fallback_model (single dict) and
  new fallback_providers (list form) without calling .get() on a list. Three
  assertions: both keys consulted, list-form has explicit isinstance check,
  _fallback_resolved defaults to None.
This commit is contained in:
nesquena-hermes
2026-04-30 16:20:05 +00:00
parent d4b055c30b
commit 50418cd47b
2 changed files with 93 additions and 6 deletions
+21 -6
View File
@@ -112,7 +112,12 @@ class TestPeriodicCheckpoint:
"""
def test_checkpoint_fires_on_activity_counter_increment(self):
"""Checkpoint saves when _checkpoint_activity counter grows."""
"""Checkpoint saves when _checkpoint_activity counter grows.
Deterministic: instead of relying on time-based polling windows, we
wait for the checkpoint thread's save_count to advance after each
increment. Generous timeout guards against CI scheduling jitter.
"""
s = _make_session("ckpt1")
s.pending_user_message = "do a long task"
s.save() # initial save (like routes.py does before streaming starts)
@@ -120,28 +125,38 @@ class TestPeriodicCheckpoint:
stop_event = threading.Event()
_checkpoint_activity = [0]
save_count = [0]
save_event = threading.Event()
def periodic_checkpoint():
last = 0
while not stop_event.wait(0.1): # fast interval for test
while not stop_event.wait(0.02): # fast poll for low-jitter test
try:
cur = _checkpoint_activity[0]
if cur > last:
s.save(skip_index=True)
last = cur
save_count[0] += 1
save_event.set()
except Exception:
pass
t = threading.Thread(target=periodic_checkpoint, daemon=True)
t.start()
# Simulate on_tool() completing twice (as would happen during a real agent run)
time.sleep(0.15)
def _wait_for_save(target_count, timeout=3.0):
"""Wait until save_count[0] >= target_count, or timeout."""
deadline = time.monotonic() + timeout
while save_count[0] < target_count and time.monotonic() < deadline:
save_event.wait(timeout=0.05)
save_event.clear()
return save_count[0] >= target_count
# Simulate on_tool() completing twice
_checkpoint_activity[0] += 1 # first tool completes
time.sleep(0.25)
assert _wait_for_save(1), f"Expected 1 save after first increment; got {save_count[0]}"
_checkpoint_activity[0] += 1 # second tool completes
time.sleep(0.25)
assert _wait_for_save(2), f"Expected 2 saves after second increment; got {save_count[0]}"
stop_event.set()
t.join(timeout=2)
@@ -0,0 +1,72 @@
"""Test for PR #1339 — streaming.py must support both single-dict `fallback_model`
and list-form `fallback_providers` config without crashing on `.get()`.
Before the fix, when config had `fallback_providers: [{provider, model, ...}, ...]`,
streaming.py read it as if it were a dict and called `.get('model', '')` on a list,
which would raise `AttributeError: 'list' object has no attribute 'get'`.
The fix makes streaming.py handle both legacy dict form and new list form, picking
the first entry from the list when given a list.
"""
import re
from pathlib import Path
STREAMING_PY = Path(__file__).resolve().parent.parent / "api" / "streaming.py"
def _extract_fallback_block():
"""Return the source range that handles fallback_model/fallback_providers."""
src = STREAMING_PY.read_text(encoding="utf-8")
# Locate the resolved-fallback region
idx = src.find("# Fallback model from profile config")
assert idx != -1, "Fallback block marker not found in streaming.py"
end = src.find("# Build kwargs defensively", idx)
assert end != -1, "End-of-block marker not found"
return src[idx:end]
def test_fallback_handles_both_dict_and_list_config():
"""Block must read either fallback_model (dict) or fallback_providers (list)."""
block = _extract_fallback_block()
# Both keys must be consulted
assert "fallback_model" in block, "Must still support legacy single-dict fallback_model"
assert "fallback_providers" in block, (
"Must support new list-form fallback_providers (PR #1339)"
)
def test_fallback_list_iteration_picks_first_valid_entry():
"""When given a list, code must pick the first valid dict entry, not call .get on the list."""
block = _extract_fallback_block()
# Must isinstance-check before calling .get
assert "isinstance(_fallback, list)" in block, (
"Must detect list-form fallback_providers explicitly to avoid AttributeError"
)
assert "isinstance(_fallback, dict)" in block or "isinstance(_fallback,dict)" in block, (
"Must keep legacy single-dict path explicitly"
)
# No bare _fallback.get() — every .get() on _fallback must be guarded by an isinstance(_fallback, dict) check.
# We verify this structurally: every line containing `_fallback.get(` must be inside or preceded by an isinstance(_fallback, dict) gate.
lines = block.split("\n")
in_dict_block = False
for i, line in enumerate(lines):
if "isinstance(_fallback, dict)" in line:
in_dict_block = True
if "_fallback.get(" in line and not in_dict_block:
# Look back up to 3 lines for the isinstance gate on the same elif/if
window = "\n".join(lines[max(0, i - 3): i + 1])
assert "isinstance(_fallback, dict)" in window, (
f"Line {i} calls _fallback.get() without a nearby isinstance(_fallback, dict) gate:\n{line}"
)
def test_fallback_resolved_initialized_to_none():
"""_fallback_resolved must default to None so AIAgent gets an explicit None when no fallback."""
block = _extract_fallback_block()
# The variable must be assignable to None at the top of the block
assert "_fallback_resolved = None" in block, (
"_fallback_resolved must be initialized to None so callers can rely on its presence"
)