diff --git a/CHANGELOG.md b/CHANGELOG.md index 2153acdb..9d737b24 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,16 @@ ## [Unreleased] +### Fixed + +- **PR #2136** by @LumenYoung — Stale stream writebacks no longer poison the active session transcript. `cancel_stream()` intentionally clears `active_stream_id` early so the UI can accept a follow-up turn while an old worker is unwinding — but the old worker could still return later from `run_conversation()` and persist its stale result over the newer transcript, causing visible transcript / turn journal / `state.db` to disagree (especially around cancel+retry on compressed continuations). Adds a single-line ownership check `_stream_writeback_is_current(session, stream_id)` (token equality against `session.active_stream_id`) and short-circuits both finalize paths: the success path in `_run_agent_streaming` and the cancel-handler path in `cancel_stream()`. When the stream no longer owns the writeback, both paths log `Skipping stale stream/cancel writeback` and return cleanly without persisting. 89-line regression suite in `tests/test_stale_stream_writeback.py`; companion updates to `tests/test_issue1361_cancel_data_loss.py` and `tests/test_sprint42.py` for the new return-without-persist behavior. + +### Added + +- **PR #2150** by @Jordan-SkyLF — "Refresh usage" button on the Provider quota card in Settings → Providers. Calls `/api/provider/quota?refresh=1&ts=` with `cache: 'no-store'` to bypass browser, service worker, and reverse-proxy caches that may have stamped a previous quota response, then re-renders just the quota card from the fresh response and shows a `Last checked ...` timestamp. Disabled `Refreshing…` state during the in-flight request; success toast on completion or failure toast if the refresh fails. Note: the `refresh=1` query param is a no-op at the server today (`get_provider_quota()` has no in-process cache layer), so the win is strictly browser-side cache-bust + the `no-store` fetch option. A future maintainer follow-up may add server-side TTL caching of OAuth account-limit fetches, at which point the `refresh=1` param becomes load-bearing on both sides. + +## [v0.51.51] — 2026-05-12 — Release AA (stage-344 — 16-PR contributor batch — i18n + insights bucketing/mobile + manual-compress async + workspace recovery + iOS PWA scroll + Cloudflare login health + fr locale) + ### Added - **PR #2130** by @dso2ng — Lazy lineage-report fetch on sidebar segment-badge expand. The sidebar already showed `N segments` for collapsed compression lineage rows (refs #1906, #1943) and the backend report endpoint is now shipped (refs #2012), but some rows only had a backend `_compression_segment_count` from `/api/sessions` while the browser hadn't materialized the older segment rows — clicking the badge couldn't reveal the full bounded list. Adds a small per-sidebar-cache lineage-report cache/inflight map in `static/sessions.js`, invalidates it on each fresh `/api/sessions` refresh, and on expand fetches `GET /api/session/lineage/report?session_id=` only when `_sessionSegmentCount(s)` exceeds the locally-materialized `_lineage_segments` count. Merges returned report `segments` by `session_id` with existing client segments, skipping the visible tip and `child_session` rows. Leaves report `children` out of the compression-segment list so subagent/fork child semantics remain separate. 132-line regression suite covering fetch-needed detection, report-segment merging/dedup, endpoint construction, and inflight cache de-duping. diff --git a/api/streaming.py b/api/streaming.py index 8db29e25..09022ff2 100644 --- a/api/streaming.py +++ b/api/streaming.py @@ -1705,6 +1705,16 @@ def _drop_checkpointed_current_user_from_context(messages, msg_text): return history +def _stream_writeback_is_current(session, stream_id): + """Return True only while a worker still owns the session writeback. + + cancel_stream() intentionally clears ``active_stream_id`` early so the UI can + accept a follow-up turn while the old worker is unwinding. That old worker + must not later persist its stale result over the newer transcript. + """ + return bool(stream_id) and getattr(session, 'active_stream_id', None) == stream_id + + def _merge_display_messages_after_agent_result(previous_display, previous_context, result_messages, msg_text): """Keep UI transcript durable while allowing model context to compact. @@ -3123,6 +3133,14 @@ def _run_agent_streaming( if _ckpt_thread is not None: _ckpt_thread.join(timeout=15) with _agent_lock: + if not ephemeral and not _stream_writeback_is_current(s, stream_id): + logger.info( + "Skipping stale stream writeback for session %s stream %s; active_stream_id=%s", + getattr(s, 'session_id', session_id), + stream_id, + getattr(s, 'active_stream_id', None), + ) + return _result_messages = result.get('messages') or _previous_context_messages _next_context_messages = _restore_reasoning_metadata( _previous_context_messages, @@ -4241,6 +4259,14 @@ def cancel_stream(stream_id: str) -> bool: with _get_session_agent_lock(_cancel_session_id): try: _cs = get_session(_cancel_session_id) + if not _stream_writeback_is_current(_cs, stream_id): + logger.info( + "Skipping stale cancel writeback for session %s stream %s; active_stream_id=%s", + _cancel_session_id, + stream_id, + getattr(_cs, 'active_stream_id', None), + ) + return True # ── Preserve the user's typed message before clearing pending state (#1298) ── # The agent's internal messages list (where the user message was appended at # the start of run_conversation()) may not have been merged back into diff --git a/static/panels.js b/static/panels.js index 680d58ee..d942fac0 100644 --- a/static/panels.js +++ b/static/panels.js @@ -5485,13 +5485,20 @@ function _buildPluginCard(plugin){ const _providerCardEls = new Map(); // providerId → {card, statusDot, input, saveBtn, removeBtn} +async function _fetchProviderQuotaStatus(force=false){ + const endpoint=force?`/api/provider/quota?refresh=1&ts=${Date.now()}`:'/api/provider/quota'; + const status=await api(endpoint,{cache:'no-store'}); + if(status&&typeof status==='object') status.client_fetched_at=new Date().toISOString(); + return status; +} + async function loadProvidersPanel(){ const list=$('providersList'); const empty=$('providersEmpty'); if(!list) return; try{ const data=await api('/api/providers'); - const quota=await api('/api/provider/quota').catch(e=>({ok:false,status:'unavailable',quota:null,message:e.message||'Quota status unavailable'})); + const quota=await _fetchProviderQuotaStatus(false).catch(e=>({ok:false,status:'unavailable',quota:null,message:e.message||'Quota status unavailable',client_fetched_at:new Date().toISOString()})); const providers=(data.providers||[]).filter(p=>p.configurable||p.is_oauth); list.innerHTML=''; _providerCardEls.clear(); @@ -5512,6 +5519,40 @@ async function loadProvidersPanel(){ } } +async function _refreshProviderQuota(card,button){ + if(!card) return; + if(button){ + button.disabled=true; + button.textContent='Refreshing…'; + button.setAttribute('aria-busy','true'); + } + let failed=false; + let next; + try{ + next=await _fetchProviderQuotaStatus(true); + failed=next&&next.ok===false; + }catch(e){ + failed=true; + next={ok:false,status:'unavailable',quota:null,message:e.message||'Quota status unavailable',client_fetched_at:new Date().toISOString()}; + } + try{ + const fresh=_buildProviderQuotaCard(next); + if(fresh){ + card.replaceWith(fresh); + if(typeof showToast==='function') showToast(failed?'Provider usage refresh failed':'Provider usage refreshed'); + return; + } + }catch(e){ + failed=true; + } + if(card.isConnected&&button){ + button.disabled=false; + button.textContent='Refresh usage'; + button.removeAttribute('aria-busy'); + } + if(typeof showToast==='function') showToast('Provider usage refresh failed'); +} + function _formatProviderQuotaMoney(value){ if(value===null||value===undefined||value==='') return '—'; const n=Number(value); @@ -5543,6 +5584,15 @@ function _formatProviderQuotaWindowLabel(accountLimits,w){ return raw||'Window'; } +function _formatProviderQuotaLastChecked(status){ + const accountLimits=status&&status.account_limits; + const value=(accountLimits&&accountLimits.fetched_at)||status&&status.client_fetched_at; + if(!value) return 'Last checked after refresh'; + const d=new Date(value); + if(Number.isNaN(d.getTime())) return 'Last checked after refresh'; + try{return 'Last checked '+d.toLocaleString();}catch(e){return 'Last checked '+value;} +} + function _buildProviderQuotaCard(status){ if(!status) return null; const card=document.createElement('div'); @@ -5590,11 +5640,17 @@ function _buildProviderQuotaCard(status){
Active provider quota
${esc(provider)}
+
${esc(_formatProviderQuotaLastChecked(status))}
+
+
+ ${esc(state.replace(/_/g,' '))} +
- ${esc(state.replace(/_/g,' '))}
${body}
`; + const refreshBtn=card.querySelector('[data-provider-quota-refresh]'); + if(refreshBtn) refreshBtn.addEventListener('click',()=>_refreshProviderQuota(card,refreshBtn)); return card; } diff --git a/static/style.css b/static/style.css index 84dd4649..98661718 100644 --- a/static/style.css +++ b/static/style.css @@ -2494,7 +2494,12 @@ main.main.showing-logs > #mainLogs{display:flex;} .provider-quota-header{display:flex;align-items:flex-start;justify-content:space-between;gap:12px;margin-bottom:10px;} .provider-quota-title{font-size:13px;font-weight:650;color:var(--text);line-height:1.2;} .provider-quota-subtitle{font-size:11px;color:var(--muted);line-height:1.3;margin-top:2px;} +.provider-quota-checked{font-size:10.5px;color:var(--muted);line-height:1.3;margin-top:2px;} +.provider-quota-actions{display:flex;align-items:center;gap:8px;flex-wrap:wrap;justify-content:flex-end;} .provider-quota-badge{font-size:10.5px;font-weight:650;text-transform:capitalize;padding:2px 8px;border-radius:999px;background:var(--accent-bg);color:var(--accent-text);white-space:nowrap;} +.provider-quota-refresh{appearance:none;border:1px solid var(--border);border-radius:999px;background:var(--sidebar);color:var(--text);font-size:11px;font-weight:650;line-height:1;padding:5px 9px;cursor:pointer;white-space:nowrap;} +.provider-quota-refresh:hover:not(:disabled){border-color:var(--accent);color:var(--accent);} +.provider-quota-refresh:disabled{opacity:.65;cursor:wait;} .provider-quota-body{display:flex;flex-wrap:wrap;gap:8px;} .provider-quota-metric{flex:1;min-width:88px;border:1px solid var(--border);border-radius:8px;background:var(--sidebar);padding:8px 10px;} .provider-quota-metric span{display:block;font-size:10.5px;color:var(--muted);margin-bottom:2px;} diff --git a/tests/test_issue1361_cancel_data_loss.py b/tests/test_issue1361_cancel_data_loss.py index 09fc77f0..7fd082c4 100644 --- a/tests/test_issue1361_cancel_data_loss.py +++ b/tests/test_issue1361_cancel_data_loss.py @@ -95,6 +95,7 @@ def _make_session(session_id="cancel_sid_1361", def _setup_cancel_state(session_id, stream_id="stream_1361"): """Wire up STREAMS/CANCEL_FLAGS/AGENT_INSTANCES for cancel_stream().""" + models.SESSIONS[session_id].active_stream_id = stream_id config.STREAMS[stream_id] = queue.Queue() config.CANCEL_FLAGS[stream_id] = threading.Event() mock_agent = Mock() diff --git a/tests/test_issue1857_usage_overwrite.py b/tests/test_issue1857_usage_overwrite.py index 948fd332..affbd72b 100644 --- a/tests/test_issue1857_usage_overwrite.py +++ b/tests/test_issue1857_usage_overwrite.py @@ -110,6 +110,7 @@ def test_stream_completion_overwrites_session_usage_with_latest_turn(cleanup_tes fake_session = FakeSession() fake_stream_id = "stream_issue1857_usage_overwrite" + fake_session.active_stream_id = fake_stream_id fake_queue = queue.Queue() fake_runtime_module = types.ModuleType("hermes_cli.runtime_provider") fake_runtime_module.resolve_runtime_provider = mock.Mock( diff --git a/tests/test_issue1897_profile_switch_agent_cache.py b/tests/test_issue1897_profile_switch_agent_cache.py index 0b82a3a6..97190f66 100644 --- a/tests/test_issue1897_profile_switch_agent_cache.py +++ b/tests/test_issue1897_profile_switch_agent_cache.py @@ -188,6 +188,7 @@ def test_same_session_profile_switch_rebuilds_agent_under_new_soul_home(tmp_path def run_turn(profile_name: str, stream_id: str, text: str): fake_session.profile = profile_name + fake_session.active_stream_id = stream_id streaming.STREAMS[stream_id] = queue.Queue() streaming._run_agent_streaming( session_id=fake_session.session_id, diff --git a/tests/test_issue765_streaming_persistence.py b/tests/test_issue765_streaming_persistence.py index f55bb188..cc693429 100644 --- a/tests/test_issue765_streaming_persistence.py +++ b/tests/test_issue765_streaming_persistence.py @@ -333,7 +333,10 @@ class TestIssue765FollowupHardening: ) stop_idx = src.find("if _checkpoint_stop is not None:\n _checkpoint_stop.set()") join_idx = src.find("if _ckpt_thread is not None:\n _ckpt_thread.join(timeout=15)") - lock_idx = src.find("with _agent_lock:\n _result_messages =") + lock_idx = src.find( + "with _agent_lock:\n" + " if not ephemeral and not _stream_writeback_is_current(s, stream_id):" + ) save_idx = src.find("s.context_messages = _next_context_messages") assert stop_idx != -1, "Success path must stop the checkpoint thread" @@ -353,7 +356,10 @@ class TestIssue765FollowupHardening: src = (Path(__file__).parent.parent / "api" / "streaming.py").read_text( encoding="utf-8" ) - outer_lock_idx = src.find("with _agent_lock:\n _result_messages =") + outer_lock_idx = src.find( + "with _agent_lock:\n" + " if not ephemeral and not _stream_writeback_is_current(s, stream_id):" + ) silent_failure_idx = src.find("if not _assistant_added and not _token_sent:") inner_lock_idx = src.find("with _agent_lock:", outer_lock_idx + 1) compression_idx = src.find("# ── Handle context compression side effects ──") diff --git a/tests/test_provider_quota_status.py b/tests/test_provider_quota_status.py index 8da72e98..d50c40b6 100644 --- a/tests/test_provider_quota_status.py +++ b/tests/test_provider_quota_status.py @@ -541,7 +541,8 @@ def test_provider_quota_route_is_registered(): def test_provider_quota_card_is_rendered_in_providers_panel(): """The Providers panel should show active provider quota/status before cards.""" panels = (ROOT / "static" / "panels.js").read_text(encoding="utf-8") - assert "api('/api/provider/quota')" in panels + assert "_fetchProviderQuotaStatus(false)" in panels + assert "'/api/provider/quota'" in panels assert "function _buildProviderQuotaCard" in panels assert "Active provider quota" in panels assert "provider-quota-card" in panels @@ -551,6 +552,21 @@ def test_provider_quota_card_is_rendered_in_providers_panel(): assert "5-hour limit" in panels +def test_provider_quota_card_has_manual_refresh_control(): + """The quota card should let users force an immediate fresh usage lookup.""" + panels = (ROOT / "static" / "panels.js").read_text(encoding="utf-8") + assert "function _refreshProviderQuota" in panels + assert "function _fetchProviderQuotaStatus" in panels + assert "refresh=1" in panels + assert "cache:'no-store'" in panels + assert "data-provider-quota-refresh" in panels + assert "Refresh usage" in panels + assert "Provider usage refreshed" in panels + assert "Provider usage refresh failed" in panels + assert "card.isConnected&&button" in panels + assert "Last checked" in panels + + def test_provider_quota_styles_exist(): """Quota UI should have visible supported/unavailable/invalid states.""" css = (ROOT / "static" / "style.css").read_text(encoding="utf-8") @@ -562,6 +578,9 @@ def test_provider_quota_styles_exist(): ".provider-quota-card-invalid_key", ".provider-quota-details", ".provider-quota-window", + ".provider-quota-actions", + ".provider-quota-refresh", + ".provider-quota-checked", ): assert token in css diff --git a/tests/test_sprint42.py b/tests/test_sprint42.py index 79a3314b..995a6614 100644 --- a/tests/test_sprint42.py +++ b/tests/test_sprint42.py @@ -213,6 +213,7 @@ class TestRuntimeRouteInjection(unittest.TestCase): fake_session = FakeSession() fake_stream_id = "stream-runtime-route" + fake_session.active_stream_id = fake_stream_id fake_queue = queue.Queue() fake_runtime_module = types.ModuleType("hermes_cli.runtime_provider") fake_runtime_module.resolve_runtime_provider = resolve_runtime_provider @@ -362,7 +363,10 @@ class TestRuntimeRouteInjection(unittest.TestCase): fake_hermes_state = types.ModuleType("hermes_state") fake_hermes_state.SessionDB = mock.Mock(return_value=object()) - with mock.patch.object(streaming, "get_session", return_value=FakeSession()), \ + fake_session = FakeSession() + fake_session.active_stream_id = fake_stream_id + + with mock.patch.object(streaming, "get_session", return_value=fake_session), \ mock.patch.object(streaming, "_get_ai_agent", return_value=CapturingAgent), \ mock.patch.object(streaming, "resolve_model_provider", return_value=("gpt-4o", "openai-codex", None)), \ mock.patch("api.config.get_config", return_value={}), \ @@ -506,7 +510,10 @@ class TestRuntimeRouteInjection(unittest.TestCase): fake_hermes_state = types.ModuleType("hermes_state") fake_hermes_state.SessionDB = mock.Mock(return_value=object()) - with mock.patch.object(streaming, "get_session", return_value=FakeSession()), \ + fake_session = FakeSession() + fake_session.active_stream_id = fake_stream_id + + with mock.patch.object(streaming, "get_session", return_value=fake_session), \ mock.patch.object(streaming, "_get_ai_agent", return_value=CapturingAgent), \ mock.patch.object(streaming, "resolve_model_provider", return_value=("gpt-5.4", "openai-codex", None)), \ mock.patch.object(streaming, "get_config", return_value={"clarify": {"timeout": 300}}), \ @@ -841,7 +848,10 @@ class TestCredentialPoolBackwardCompat(unittest.TestCase): fake_hermes_state = types.ModuleType("hermes_state") fake_hermes_state.SessionDB = mock.Mock(return_value=None) - with mock.patch.object(streaming, "get_session", return_value=FakeSession()), \ + fake_session = FakeSession() + fake_session.active_stream_id = fake_stream_id + + with mock.patch.object(streaming, "get_session", return_value=fake_session), \ mock.patch.object(streaming, "_get_ai_agent", return_value=OlderAgent), \ mock.patch.object(streaming, "resolve_model_provider", return_value=("gpt-4o", "openai", None)), \ mock.patch("api.config.get_config", return_value={}), \ diff --git a/tests/test_stale_stream_writeback.py b/tests/test_stale_stream_writeback.py new file mode 100644 index 00000000..03aa55da --- /dev/null +++ b/tests/test_stale_stream_writeback.py @@ -0,0 +1,89 @@ +import queue +import threading +from pathlib import Path +from unittest.mock import Mock + +import pytest + +import api.config as config +import api.models as models +import api.streaming as streaming +from api.models import Session + + +@pytest.fixture(autouse=True) +def _isolate_sessions(tmp_path, monkeypatch): + session_dir = tmp_path / "sessions" + session_dir.mkdir() + index_file = session_dir / "_index.json" + monkeypatch.setattr(models, "SESSION_DIR", session_dir) + monkeypatch.setattr(models, "SESSION_INDEX_FILE", index_file) + monkeypatch.setattr(streaming, "SESSION_DIR", session_dir) + monkeypatch.setattr(config, "SESSION_INDEX_FILE", index_file, raising=False) + models.SESSIONS.clear() + config.STREAMS.clear() + config.CANCEL_FLAGS.clear() + config.AGENT_INSTANCES.clear() + config.SESSION_AGENT_LOCKS.clear() + yield + models.SESSIONS.clear() + config.STREAMS.clear() + config.CANCEL_FLAGS.clear() + config.AGENT_INSTANCES.clear() + config.SESSION_AGENT_LOCKS.clear() + + +def test_stream_writeback_requires_active_stream_ownership(): + s = Session(session_id="ownership", messages=[]) + s.active_stream_id = "current-stream" + + assert streaming._stream_writeback_is_current(s, "current-stream") is True + + s.active_stream_id = None + assert streaming._stream_writeback_is_current(s, "current-stream") is False + + s.active_stream_id = "newer-stream" + assert streaming._stream_writeback_is_current(s, "current-stream") is False + + +def test_cancel_stream_does_not_append_marker_after_stream_ownership_rotated(): + sid = "rotated_cancel_sid" + old_stream = "old-stream" + s = Session( + session_id=sid, + title="Rotated stream", + messages=[{"role": "user", "content": "newer prompt"}], + ) + s.active_stream_id = "newer-stream" + s.pending_user_message = "newer prompt" + s.pending_started_at = 456.0 + s.save() + models.SESSIONS[sid] = s + + config.STREAMS[old_stream] = queue.Queue() + config.CANCEL_FLAGS[old_stream] = threading.Event() + mock_agent = Mock() + mock_agent.session_id = sid + mock_agent.interrupt = Mock() + config.AGENT_INSTANCES[old_stream] = mock_agent + + assert streaming.cancel_stream(old_stream) is True + + assert s.active_stream_id == "newer-stream" + assert s.pending_user_message == "newer prompt" + assert [m["content"] for m in s.messages] == ["newer prompt"] + assert all(m.get("content") != "*Task cancelled.*" for m in s.messages) + + +def test_success_path_checks_stream_ownership_before_persisting_result(): + src = Path("api/streaming.py").read_text(encoding="utf-8") + guard = "if not ephemeral and not _stream_writeback_is_current(s, stream_id):" + guard_pos = src.find(guard) + result_merge_pos = src.find("_result_messages = result.get('messages') or _previous_context_messages") + compression_pos = src.find("Handle context compression side effects") + + assert guard_pos != -1 + assert result_merge_pos != -1 + assert compression_pos != -1 + assert guard_pos < result_merge_pos + assert guard_pos < compression_pos