Merge pull request #2153 from nesquena/stage-345

stage-345: 2-PR low-risk batch — stream-ownership guard against stale writebacks + Refresh-usage button on provider quota card
This commit is contained in:
nesquena-hermes
2026-05-12 16:56:15 -07:00
committed by GitHub
11 changed files with 232 additions and 8 deletions
+10
View File
@@ -2,6 +2,16 @@
## [Unreleased]
### Fixed
- **PR #2136** by @LumenYoung — Stale stream writebacks no longer poison the active session transcript. `cancel_stream()` intentionally clears `active_stream_id` early so the UI can accept a follow-up turn while an old worker is unwinding — but the old worker could still return later from `run_conversation()` and persist its stale result over the newer transcript, causing visible transcript / turn journal / `state.db` to disagree (especially around cancel+retry on compressed continuations). Adds a single-line ownership check `_stream_writeback_is_current(session, stream_id)` (token equality against `session.active_stream_id`) and short-circuits both finalize paths: the success path in `_run_agent_streaming` and the cancel-handler path in `cancel_stream()`. When the stream no longer owns the writeback, both paths log `Skipping stale stream/cancel writeback` and return cleanly without persisting. 89-line regression suite in `tests/test_stale_stream_writeback.py`; companion updates to `tests/test_issue1361_cancel_data_loss.py` and `tests/test_sprint42.py` for the new return-without-persist behavior.
### Added
- **PR #2150** by @Jordan-SkyLF — "Refresh usage" button on the Provider quota card in Settings → Providers. Calls `/api/provider/quota?refresh=1&ts=<now>` with `cache: 'no-store'` to bypass browser, service worker, and reverse-proxy caches that may have stamped a previous quota response, then re-renders just the quota card from the fresh response and shows a `Last checked ...` timestamp. Disabled `Refreshing…` state during the in-flight request; success toast on completion or failure toast if the refresh fails. Note: the `refresh=1` query param is a no-op at the server today (`get_provider_quota()` has no in-process cache layer), so the win is strictly browser-side cache-bust + the `no-store` fetch option. A future maintainer follow-up may add server-side TTL caching of OAuth account-limit fetches, at which point the `refresh=1` param becomes load-bearing on both sides.
## [v0.51.51] — 2026-05-12 — Release AA (stage-344 — 16-PR contributor batch — i18n + insights bucketing/mobile + manual-compress async + workspace recovery + iOS PWA scroll + Cloudflare login health + fr locale)
### Added
- **PR #2130** by @dso2ng — Lazy lineage-report fetch on sidebar segment-badge expand. The sidebar already showed `N segments` for collapsed compression lineage rows (refs #1906, #1943) and the backend report endpoint is now shipped (refs #2012), but some rows only had a backend `_compression_segment_count` from `/api/sessions` while the browser hadn't materialized the older segment rows — clicking the badge couldn't reveal the full bounded list. Adds a small per-sidebar-cache lineage-report cache/inflight map in `static/sessions.js`, invalidates it on each fresh `/api/sessions` refresh, and on expand fetches `GET /api/session/lineage/report?session_id=<tip>` only when `_sessionSegmentCount(s)` exceeds the locally-materialized `_lineage_segments` count. Merges returned report `segments` by `session_id` with existing client segments, skipping the visible tip and `child_session` rows. Leaves report `children` out of the compression-segment list so subagent/fork child semantics remain separate. 132-line regression suite covering fetch-needed detection, report-segment merging/dedup, endpoint construction, and inflight cache de-duping.
+26
View File
@@ -1705,6 +1705,16 @@ def _drop_checkpointed_current_user_from_context(messages, msg_text):
return history
def _stream_writeback_is_current(session, stream_id):
"""Return True only while a worker still owns the session writeback.
cancel_stream() intentionally clears ``active_stream_id`` early so the UI can
accept a follow-up turn while the old worker is unwinding. That old worker
must not later persist its stale result over the newer transcript.
"""
return bool(stream_id) and getattr(session, 'active_stream_id', None) == stream_id
def _merge_display_messages_after_agent_result(previous_display, previous_context, result_messages, msg_text):
"""Keep UI transcript durable while allowing model context to compact.
@@ -3123,6 +3133,14 @@ def _run_agent_streaming(
if _ckpt_thread is not None:
_ckpt_thread.join(timeout=15)
with _agent_lock:
if not ephemeral and not _stream_writeback_is_current(s, stream_id):
logger.info(
"Skipping stale stream writeback for session %s stream %s; active_stream_id=%s",
getattr(s, 'session_id', session_id),
stream_id,
getattr(s, 'active_stream_id', None),
)
return
_result_messages = result.get('messages') or _previous_context_messages
_next_context_messages = _restore_reasoning_metadata(
_previous_context_messages,
@@ -4241,6 +4259,14 @@ def cancel_stream(stream_id: str) -> bool:
with _get_session_agent_lock(_cancel_session_id):
try:
_cs = get_session(_cancel_session_id)
if not _stream_writeback_is_current(_cs, stream_id):
logger.info(
"Skipping stale cancel writeback for session %s stream %s; active_stream_id=%s",
_cancel_session_id,
stream_id,
getattr(_cs, 'active_stream_id', None),
)
return True
# ── Preserve the user's typed message before clearing pending state (#1298) ──
# The agent's internal messages list (where the user message was appended at
# the start of run_conversation()) may not have been merged back into
+58 -2
View File
@@ -5485,13 +5485,20 @@ function _buildPluginCard(plugin){
const _providerCardEls = new Map(); // providerId → {card, statusDot, input, saveBtn, removeBtn}
async function _fetchProviderQuotaStatus(force=false){
const endpoint=force?`/api/provider/quota?refresh=1&ts=${Date.now()}`:'/api/provider/quota';
const status=await api(endpoint,{cache:'no-store'});
if(status&&typeof status==='object') status.client_fetched_at=new Date().toISOString();
return status;
}
async function loadProvidersPanel(){
const list=$('providersList');
const empty=$('providersEmpty');
if(!list) return;
try{
const data=await api('/api/providers');
const quota=await api('/api/provider/quota').catch(e=>({ok:false,status:'unavailable',quota:null,message:e.message||'Quota status unavailable'}));
const quota=await _fetchProviderQuotaStatus(false).catch(e=>({ok:false,status:'unavailable',quota:null,message:e.message||'Quota status unavailable',client_fetched_at:new Date().toISOString()}));
const providers=(data.providers||[]).filter(p=>p.configurable||p.is_oauth);
list.innerHTML='';
_providerCardEls.clear();
@@ -5512,6 +5519,40 @@ async function loadProvidersPanel(){
}
}
async function _refreshProviderQuota(card,button){
if(!card) return;
if(button){
button.disabled=true;
button.textContent='Refreshing…';
button.setAttribute('aria-busy','true');
}
let failed=false;
let next;
try{
next=await _fetchProviderQuotaStatus(true);
failed=next&&next.ok===false;
}catch(e){
failed=true;
next={ok:false,status:'unavailable',quota:null,message:e.message||'Quota status unavailable',client_fetched_at:new Date().toISOString()};
}
try{
const fresh=_buildProviderQuotaCard(next);
if(fresh){
card.replaceWith(fresh);
if(typeof showToast==='function') showToast(failed?'Provider usage refresh failed':'Provider usage refreshed');
return;
}
}catch(e){
failed=true;
}
if(card.isConnected&&button){
button.disabled=false;
button.textContent='Refresh usage';
button.removeAttribute('aria-busy');
}
if(typeof showToast==='function') showToast('Provider usage refresh failed');
}
function _formatProviderQuotaMoney(value){
if(value===null||value===undefined||value==='') return '—';
const n=Number(value);
@@ -5543,6 +5584,15 @@ function _formatProviderQuotaWindowLabel(accountLimits,w){
return raw||'Window';
}
function _formatProviderQuotaLastChecked(status){
const accountLimits=status&&status.account_limits;
const value=(accountLimits&&accountLimits.fetched_at)||status&&status.client_fetched_at;
if(!value) return 'Last checked after refresh';
const d=new Date(value);
if(Number.isNaN(d.getTime())) return 'Last checked after refresh';
try{return 'Last checked '+d.toLocaleString();}catch(e){return 'Last checked '+value;}
}
function _buildProviderQuotaCard(status){
if(!status) return null;
const card=document.createElement('div');
@@ -5590,11 +5640,17 @@ function _buildProviderQuotaCard(status){
<div>
<div class="provider-quota-title">Active provider quota</div>
<div class="provider-quota-subtitle">${esc(provider)}</div>
<div class="provider-quota-checked">${esc(_formatProviderQuotaLastChecked(status))}</div>
</div>
<div class="provider-quota-actions">
<span class="provider-quota-badge">${esc(state.replace(/_/g,' '))}</span>
<button class="provider-quota-refresh" type="button" data-provider-quota-refresh title="Refresh provider usage limits now">Refresh usage</button>
</div>
<span class="provider-quota-badge">${esc(state.replace(/_/g,' '))}</span>
</div>
<div class="provider-quota-body">${body}</div>
`;
const refreshBtn=card.querySelector('[data-provider-quota-refresh]');
if(refreshBtn) refreshBtn.addEventListener('click',()=>_refreshProviderQuota(card,refreshBtn));
return card;
}
+5
View File
@@ -2494,7 +2494,12 @@ main.main.showing-logs > #mainLogs{display:flex;}
.provider-quota-header{display:flex;align-items:flex-start;justify-content:space-between;gap:12px;margin-bottom:10px;}
.provider-quota-title{font-size:13px;font-weight:650;color:var(--text);line-height:1.2;}
.provider-quota-subtitle{font-size:11px;color:var(--muted);line-height:1.3;margin-top:2px;}
.provider-quota-checked{font-size:10.5px;color:var(--muted);line-height:1.3;margin-top:2px;}
.provider-quota-actions{display:flex;align-items:center;gap:8px;flex-wrap:wrap;justify-content:flex-end;}
.provider-quota-badge{font-size:10.5px;font-weight:650;text-transform:capitalize;padding:2px 8px;border-radius:999px;background:var(--accent-bg);color:var(--accent-text);white-space:nowrap;}
.provider-quota-refresh{appearance:none;border:1px solid var(--border);border-radius:999px;background:var(--sidebar);color:var(--text);font-size:11px;font-weight:650;line-height:1;padding:5px 9px;cursor:pointer;white-space:nowrap;}
.provider-quota-refresh:hover:not(:disabled){border-color:var(--accent);color:var(--accent);}
.provider-quota-refresh:disabled{opacity:.65;cursor:wait;}
.provider-quota-body{display:flex;flex-wrap:wrap;gap:8px;}
.provider-quota-metric{flex:1;min-width:88px;border:1px solid var(--border);border-radius:8px;background:var(--sidebar);padding:8px 10px;}
.provider-quota-metric span{display:block;font-size:10.5px;color:var(--muted);margin-bottom:2px;}
+1
View File
@@ -95,6 +95,7 @@ def _make_session(session_id="cancel_sid_1361",
def _setup_cancel_state(session_id, stream_id="stream_1361"):
"""Wire up STREAMS/CANCEL_FLAGS/AGENT_INSTANCES for cancel_stream()."""
models.SESSIONS[session_id].active_stream_id = stream_id
config.STREAMS[stream_id] = queue.Queue()
config.CANCEL_FLAGS[stream_id] = threading.Event()
mock_agent = Mock()
+1
View File
@@ -110,6 +110,7 @@ def test_stream_completion_overwrites_session_usage_with_latest_turn(cleanup_tes
fake_session = FakeSession()
fake_stream_id = "stream_issue1857_usage_overwrite"
fake_session.active_stream_id = fake_stream_id
fake_queue = queue.Queue()
fake_runtime_module = types.ModuleType("hermes_cli.runtime_provider")
fake_runtime_module.resolve_runtime_provider = mock.Mock(
@@ -188,6 +188,7 @@ def test_same_session_profile_switch_rebuilds_agent_under_new_soul_home(tmp_path
def run_turn(profile_name: str, stream_id: str, text: str):
fake_session.profile = profile_name
fake_session.active_stream_id = stream_id
streaming.STREAMS[stream_id] = queue.Queue()
streaming._run_agent_streaming(
session_id=fake_session.session_id,
+8 -2
View File
@@ -333,7 +333,10 @@ class TestIssue765FollowupHardening:
)
stop_idx = src.find("if _checkpoint_stop is not None:\n _checkpoint_stop.set()")
join_idx = src.find("if _ckpt_thread is not None:\n _ckpt_thread.join(timeout=15)")
lock_idx = src.find("with _agent_lock:\n _result_messages =")
lock_idx = src.find(
"with _agent_lock:\n"
" if not ephemeral and not _stream_writeback_is_current(s, stream_id):"
)
save_idx = src.find("s.context_messages = _next_context_messages")
assert stop_idx != -1, "Success path must stop the checkpoint thread"
@@ -353,7 +356,10 @@ class TestIssue765FollowupHardening:
src = (Path(__file__).parent.parent / "api" / "streaming.py").read_text(
encoding="utf-8"
)
outer_lock_idx = src.find("with _agent_lock:\n _result_messages =")
outer_lock_idx = src.find(
"with _agent_lock:\n"
" if not ephemeral and not _stream_writeback_is_current(s, stream_id):"
)
silent_failure_idx = src.find("if not _assistant_added and not _token_sent:")
inner_lock_idx = src.find("with _agent_lock:", outer_lock_idx + 1)
compression_idx = src.find("# ── Handle context compression side effects ──")
+20 -1
View File
@@ -541,7 +541,8 @@ def test_provider_quota_route_is_registered():
def test_provider_quota_card_is_rendered_in_providers_panel():
"""The Providers panel should show active provider quota/status before cards."""
panels = (ROOT / "static" / "panels.js").read_text(encoding="utf-8")
assert "api('/api/provider/quota')" in panels
assert "_fetchProviderQuotaStatus(false)" in panels
assert "'/api/provider/quota'" in panels
assert "function _buildProviderQuotaCard" in panels
assert "Active provider quota" in panels
assert "provider-quota-card" in panels
@@ -551,6 +552,21 @@ def test_provider_quota_card_is_rendered_in_providers_panel():
assert "5-hour limit" in panels
def test_provider_quota_card_has_manual_refresh_control():
"""The quota card should let users force an immediate fresh usage lookup."""
panels = (ROOT / "static" / "panels.js").read_text(encoding="utf-8")
assert "function _refreshProviderQuota" in panels
assert "function _fetchProviderQuotaStatus" in panels
assert "refresh=1" in panels
assert "cache:'no-store'" in panels
assert "data-provider-quota-refresh" in panels
assert "Refresh usage" in panels
assert "Provider usage refreshed" in panels
assert "Provider usage refresh failed" in panels
assert "card.isConnected&&button" in panels
assert "Last checked" in panels
def test_provider_quota_styles_exist():
"""Quota UI should have visible supported/unavailable/invalid states."""
css = (ROOT / "static" / "style.css").read_text(encoding="utf-8")
@@ -562,6 +578,9 @@ def test_provider_quota_styles_exist():
".provider-quota-card-invalid_key",
".provider-quota-details",
".provider-quota-window",
".provider-quota-actions",
".provider-quota-refresh",
".provider-quota-checked",
):
assert token in css
+13 -3
View File
@@ -213,6 +213,7 @@ class TestRuntimeRouteInjection(unittest.TestCase):
fake_session = FakeSession()
fake_stream_id = "stream-runtime-route"
fake_session.active_stream_id = fake_stream_id
fake_queue = queue.Queue()
fake_runtime_module = types.ModuleType("hermes_cli.runtime_provider")
fake_runtime_module.resolve_runtime_provider = resolve_runtime_provider
@@ -362,7 +363,10 @@ class TestRuntimeRouteInjection(unittest.TestCase):
fake_hermes_state = types.ModuleType("hermes_state")
fake_hermes_state.SessionDB = mock.Mock(return_value=object())
with mock.patch.object(streaming, "get_session", return_value=FakeSession()), \
fake_session = FakeSession()
fake_session.active_stream_id = fake_stream_id
with mock.patch.object(streaming, "get_session", return_value=fake_session), \
mock.patch.object(streaming, "_get_ai_agent", return_value=CapturingAgent), \
mock.patch.object(streaming, "resolve_model_provider", return_value=("gpt-4o", "openai-codex", None)), \
mock.patch("api.config.get_config", return_value={}), \
@@ -506,7 +510,10 @@ class TestRuntimeRouteInjection(unittest.TestCase):
fake_hermes_state = types.ModuleType("hermes_state")
fake_hermes_state.SessionDB = mock.Mock(return_value=object())
with mock.patch.object(streaming, "get_session", return_value=FakeSession()), \
fake_session = FakeSession()
fake_session.active_stream_id = fake_stream_id
with mock.patch.object(streaming, "get_session", return_value=fake_session), \
mock.patch.object(streaming, "_get_ai_agent", return_value=CapturingAgent), \
mock.patch.object(streaming, "resolve_model_provider", return_value=("gpt-5.4", "openai-codex", None)), \
mock.patch.object(streaming, "get_config", return_value={"clarify": {"timeout": 300}}), \
@@ -841,7 +848,10 @@ class TestCredentialPoolBackwardCompat(unittest.TestCase):
fake_hermes_state = types.ModuleType("hermes_state")
fake_hermes_state.SessionDB = mock.Mock(return_value=None)
with mock.patch.object(streaming, "get_session", return_value=FakeSession()), \
fake_session = FakeSession()
fake_session.active_stream_id = fake_stream_id
with mock.patch.object(streaming, "get_session", return_value=fake_session), \
mock.patch.object(streaming, "_get_ai_agent", return_value=OlderAgent), \
mock.patch.object(streaming, "resolve_model_provider", return_value=("gpt-4o", "openai", None)), \
mock.patch("api.config.get_config", return_value={}), \
+89
View File
@@ -0,0 +1,89 @@
import queue
import threading
from pathlib import Path
from unittest.mock import Mock
import pytest
import api.config as config
import api.models as models
import api.streaming as streaming
from api.models import Session
@pytest.fixture(autouse=True)
def _isolate_sessions(tmp_path, monkeypatch):
session_dir = tmp_path / "sessions"
session_dir.mkdir()
index_file = session_dir / "_index.json"
monkeypatch.setattr(models, "SESSION_DIR", session_dir)
monkeypatch.setattr(models, "SESSION_INDEX_FILE", index_file)
monkeypatch.setattr(streaming, "SESSION_DIR", session_dir)
monkeypatch.setattr(config, "SESSION_INDEX_FILE", index_file, raising=False)
models.SESSIONS.clear()
config.STREAMS.clear()
config.CANCEL_FLAGS.clear()
config.AGENT_INSTANCES.clear()
config.SESSION_AGENT_LOCKS.clear()
yield
models.SESSIONS.clear()
config.STREAMS.clear()
config.CANCEL_FLAGS.clear()
config.AGENT_INSTANCES.clear()
config.SESSION_AGENT_LOCKS.clear()
def test_stream_writeback_requires_active_stream_ownership():
s = Session(session_id="ownership", messages=[])
s.active_stream_id = "current-stream"
assert streaming._stream_writeback_is_current(s, "current-stream") is True
s.active_stream_id = None
assert streaming._stream_writeback_is_current(s, "current-stream") is False
s.active_stream_id = "newer-stream"
assert streaming._stream_writeback_is_current(s, "current-stream") is False
def test_cancel_stream_does_not_append_marker_after_stream_ownership_rotated():
sid = "rotated_cancel_sid"
old_stream = "old-stream"
s = Session(
session_id=sid,
title="Rotated stream",
messages=[{"role": "user", "content": "newer prompt"}],
)
s.active_stream_id = "newer-stream"
s.pending_user_message = "newer prompt"
s.pending_started_at = 456.0
s.save()
models.SESSIONS[sid] = s
config.STREAMS[old_stream] = queue.Queue()
config.CANCEL_FLAGS[old_stream] = threading.Event()
mock_agent = Mock()
mock_agent.session_id = sid
mock_agent.interrupt = Mock()
config.AGENT_INSTANCES[old_stream] = mock_agent
assert streaming.cancel_stream(old_stream) is True
assert s.active_stream_id == "newer-stream"
assert s.pending_user_message == "newer prompt"
assert [m["content"] for m in s.messages] == ["newer prompt"]
assert all(m.get("content") != "*Task cancelled.*" for m in s.messages)
def test_success_path_checks_stream_ownership_before_persisting_result():
src = Path("api/streaming.py").read_text(encoding="utf-8")
guard = "if not ephemeral and not _stream_writeback_is_current(s, stream_id):"
guard_pos = src.find(guard)
result_merge_pos = src.find("_result_messages = result.get('messages') or _previous_context_messages")
compression_pos = src.find("Handle context compression side effects")
assert guard_pos != -1
assert result_merge_pos != -1
assert compression_pos != -1
assert guard_pos < result_merge_pos
assert guard_pos < compression_pos