Stage 393: PR #2637

# Conflicts:
#	static/sessions.js
This commit is contained in:
nesquena-hermes
2026-05-20 22:24:40 +00:00
8 changed files with 285 additions and 3 deletions
+7 -2
View File
@@ -19,6 +19,8 @@ from contextlib import contextmanager
from pathlib import Path
from typing import Optional
from api.session_events import publish_session_list_changed
logger = logging.getLogger(__name__)
# ── Constants (match hermes_cli.profiles upstream) ─────────────────────────
@@ -411,8 +413,11 @@ def install_cron_scheduler_profile_isolation() -> None:
# the explicitly selected manual execution profile.
if _cron_profile_context_depth() > 0:
return original(job, *args, **kwargs)
with cron_profile_context_for_home(_home_for_scheduled_cron_job(job)):
return original(job, *args, **kwargs)
try:
with cron_profile_context_for_home(_home_for_scheduled_cron_job(job)):
return original(job, *args, **kwargs)
finally:
publish_session_list_changed("cron_complete")
_webui_profile_isolated_run_job._webui_profile_isolated = True
_webui_profile_isolated_run_job._webui_original_run_job = original
+60
View File
@@ -30,6 +30,11 @@ from api.agent_sessions import (
read_session_lineage_report,
)
from api.compression_anchor import visible_messages_for_anchor
from api.session_events import (
publish_session_list_changed,
subscribe_session_events,
unsubscribe_session_events,
)
logger = logging.getLogger(__name__)
@@ -827,6 +832,7 @@ def _run_cron_tracked(job, profile_home=None, execution_profile_home=None):
logger.debug("Failed to mark manual cron run failure for %s", job_id)
finally:
_mark_cron_done(job_id)
publish_session_list_changed("cron_complete")
_PROVIDER_ALIASES = {
"claude": "anthropic",
@@ -4255,6 +4261,9 @@ def handle_get(handler, parsed) -> bool:
if parsed.path == '/api/sessions/gateway/stream':
return _handle_gateway_sse_stream(handler, parsed)
if parsed.path == '/api/sessions/events':
return _handle_session_events_stream(handler)
if parsed.path == "/api/media":
return _handle_media(handler, parsed)
@@ -4633,6 +4642,8 @@ def handle_post(handler, parsed) -> bool:
project_id=body.get("project_id") or None,
worktree_info=worktree_info,
)
if worktree_info:
publish_session_list_changed("session_new")
return j(handler, {"session": s.compact() | {"messages": s.messages}})
if parsed.path == "/api/session/duplicate":
@@ -4694,6 +4705,7 @@ def handle_post(handler, parsed) -> bool:
# Without this explicit save, the duplicate is in-memory only — if the user
# refreshes before sending a turn, the duplicate vanishes.
copied_session.save()
publish_session_list_changed("session_duplicate")
return j(handler, {"session": copied_session.compact() | {"messages": copied_session.messages}})
except Exception as e:
@@ -4787,6 +4799,7 @@ def handle_post(handler, parsed) -> bool:
with _get_session_agent_lock(body["session_id"]):
s.title = str(body["title"]).strip()[:80] or "Untitled"
s.save()
publish_session_list_changed("session_rename")
return j(handler, {"session": s.compact()})
if parsed.path == "/api/personality/set":
@@ -5041,6 +5054,7 @@ def handle_post(handler, parsed) -> bool:
delete_cli_session(sid)
except Exception:
logger.debug("Failed to delete CLI session %s", sid)
publish_session_list_changed("session_delete")
return j(handler, {"ok": True, **worktree_retained})
if parsed.path == "/api/session/clear":
@@ -5166,6 +5180,7 @@ def handle_post(handler, parsed) -> bool:
# Persist only if there are messages (matches new_session pattern)
if forked_messages:
branch.save()
publish_session_list_changed("session_branch")
return j(handler, {
"session_id": branch.session_id,
@@ -5673,6 +5688,7 @@ def handle_post(handler, parsed) -> bool:
with _get_session_agent_lock(body["session_id"]):
s.pinned = pin_requested
s.save()
publish_session_list_changed("session_pin")
return j(handler, {"ok": True, "session": s.compact()})
# ── Session archive (POST) ──
@@ -5749,6 +5765,7 @@ def handle_post(handler, parsed) -> bool:
with _get_session_agent_lock(sid):
s.archived = bool(body.get("archived", True))
s.save(touch_updated_at=False)
publish_session_list_changed("session_archive")
return j(handler, {"ok": True, "session": s.compact(), **_worktree_retained_payload(s)})
# ── Session move to project (POST) ──
@@ -5777,6 +5794,7 @@ def handle_post(handler, parsed) -> bool:
with _get_session_agent_lock(body["session_id"]):
s.project_id = target_pid
s.save()
publish_session_list_changed("session_move")
return j(handler, {"ok": True, "session": s.compact()})
# ── Project CRUD (POST) ──
@@ -6541,6 +6559,32 @@ def _handle_gateway_sse_stream(handler, parsed):
return True
def _handle_session_events_stream(handler):
"""SSE endpoint for lightweight session-list invalidation events."""
handler.send_response(200)
handler.send_header('Content-Type', 'text/event-stream; charset=utf-8')
handler.send_header('Cache-Control', 'no-cache')
handler.send_header('X-Accel-Buffering', 'no')
handler.send_header('Connection', 'keep-alive')
handler.end_headers()
q = subscribe_session_events()
try:
while True:
try:
event_data = q.get(timeout=_SSE_HEARTBEAT_INTERVAL_SECONDS)
except queue.Empty:
handler.wfile.write(b': keepalive\n\n')
handler.wfile.flush()
continue
_sse(handler, event_data.get('type', 'sessions_changed'), event_data)
except _CLIENT_DISCONNECT_ERRORS:
pass
finally:
unsubscribe_session_events(q)
return True
def _content_disposition_value(disposition: str, filename: str) -> str:
"""Build a latin-1-safe Content-Disposition value with RFC 5987 filename*."""
import urllib.parse as _up
@@ -7987,6 +8031,16 @@ def _prepare_chat_start_session_for_stream(
s.save()
def _is_hidden_empty_session(s) -> bool:
return (
getattr(s, "title", "Untitled") == "Untitled"
and not getattr(s, "messages", None)
and not getattr(s, "active_stream_id", None)
and not getattr(s, "pending_user_message", None)
and not getattr(s, "worktree_path", None)
)
def _start_chat_stream_for_session(
s,
*,
@@ -8033,6 +8087,7 @@ def _start_chat_stream_for_session(
diag.stage("session_lock_wait") if diag else None
with session_lock:
diag.stage("save_pending_state") if diag else None
was_hidden_empty_session = _is_hidden_empty_session(s)
_prepare_chat_start_session_for_stream(
s,
msg=msg,
@@ -8042,6 +8097,8 @@ def _start_chat_stream_for_session(
model_provider=model_provider,
stream_id=stream_id,
)
if was_hidden_empty_session:
publish_session_list_changed("session_new")
diag.stage("turn_journal_submitted") if diag else None
journal_event = {}
try:
@@ -10467,6 +10524,7 @@ def _handle_session_import_cli(handler, body):
changed = True
if changed:
existing.save(touch_updated_at=False)
publish_session_list_changed("session_import_cli")
return j(
handler,
{
@@ -10583,6 +10641,7 @@ def _handle_session_import_cli(handler, body):
s.platform = cli_platform
s._cli_origin = sid
s.save(touch_updated_at=False)
publish_session_list_changed("session_import_cli")
return j(
handler,
{
@@ -10623,6 +10682,7 @@ def _handle_session_import(handler, body):
while len(SESSIONS) > SESSIONS_MAX:
SESSIONS.popitem(last=False)
s.save()
publish_session_list_changed("session_import")
return j(handler, {"ok": True, "session": s.compact() | {"messages": s.messages}})
+45
View File
@@ -0,0 +1,45 @@
"""Lightweight in-process invalidation events for session sidebar state."""
import queue
import threading
_SESSION_EVENTS_LOCK = threading.Lock()
_SESSION_EVENTS_SUBSCRIBERS: set[queue.Queue] = set()
_SESSION_EVENTS_VERSION = 0
def publish_session_list_changed(reason: str = "session_changed") -> None:
"""Notify connected browsers that the session sidebar may be stale."""
global _SESSION_EVENTS_VERSION
with _SESSION_EVENTS_LOCK:
_SESSION_EVENTS_VERSION += 1
payload = {
"type": "sessions_changed",
"version": _SESSION_EVENTS_VERSION,
"reason": reason,
}
subscribers = list(_SESSION_EVENTS_SUBSCRIBERS)
for q in subscribers:
try:
q.put_nowait(payload)
except queue.Full:
try:
q.get_nowait()
except queue.Empty:
pass
try:
q.put_nowait(payload)
except queue.Full:
pass
def subscribe_session_events() -> queue.Queue:
q: queue.Queue = queue.Queue(maxsize=1)
with _SESSION_EVENTS_LOCK:
_SESSION_EVENTS_SUBSCRIBERS.add(q)
return q
def unsubscribe_session_events(q: queue.Queue) -> None:
with _SESSION_EVENTS_LOCK:
_SESSION_EVENTS_SUBSCRIBERS.discard(q)
+84
View File
@@ -2069,6 +2069,7 @@ function _applySessionListPayload(sessData, projData){
animateNextSessionListRefresh({enterAll:true});
_sessionListFirstRenderAnimated=true;
}
ensureSessionEventsSSE();
renderSessionListFromCache(); // no-ops if rename is in progress
}
@@ -2107,6 +2108,12 @@ let _streamingPollTimer = null;
let _sessionTimeRefreshTimer = null;
let _activeSessionExternalRefreshTimer = null;
let _activeSessionExternalRefreshInFlight = false;
let _sessionEventsSSE = null;
let _sessionEventsRefreshTimer = 0;
let _sessionEventsReconnectTimer = 0;
let _sessionEventsNeedsRefreshOnOpen = false;
let _sessionListRefreshInFlight = false;
let _sessionListRefreshPendingReason = '';
function startStreamingPoll(){
if(_streamingPollTimer) return;
@@ -2172,6 +2179,83 @@ function ensureActiveSessionExternalRefreshPoll(){
}
}
async function refreshSessionList(reason='manual', opts={}){
const force = !!(opts && opts.force);
const refreshActive = !!(opts && opts.refreshActive);
if(!force && typeof document !== 'undefined' && document.hidden) return;
if(_sessionListRefreshInFlight){
_sessionListRefreshPendingReason = reason || 'session-list';
return;
}
_sessionListRefreshInFlight = true;
try{
await renderSessionList({deferWhileInteracting:!force});
if(refreshActive) await refreshActiveSessionIfExternallyUpdated(reason||'session-list');
}finally{
_sessionListRefreshInFlight = false;
const pendingReason = _sessionListRefreshPendingReason;
_sessionListRefreshPendingReason = '';
if(pendingReason) _scheduleSessionEventsRefresh(pendingReason);
}
}
function _scheduleSessionEventsRefresh(reason){
if(_sessionEventsRefreshTimer) return;
_sessionEventsRefreshTimer = setTimeout(() => {
_sessionEventsRefreshTimer = 0;
void refreshSessionList(reason||'event');
}, 300);
}
function _closeSessionEventsSSE(){
if(_sessionEventsSSE){
_sessionEventsSSE.close();
_sessionEventsSSE = null;
}
}
function ensureSessionEventsSSE(){
if(typeof document !== 'undefined' && !document._hermesSessionEventsVisibilityHook){
document.addEventListener('visibilitychange', () => {
if(document.hidden){
_closeSessionEventsSSE();
}else{
ensureSessionEventsSSE();
void refreshSessionList('visible');
}
});
document._hermesSessionEventsVisibilityHook = true;
}
if(typeof EventSource==='undefined') return;
if(typeof document !== 'undefined' && document.hidden) return;
if(_sessionEventsSSE) return;
try{
// Same-origin relative URL preserves subpath mounts and normal WebUI cookies.
_sessionEventsSSE = new EventSource('api/sessions/events');
_sessionEventsSSE.onopen = () => {
if(!_sessionEventsNeedsRefreshOnOpen) return;
_sessionEventsNeedsRefreshOnOpen = false;
void refreshSessionList('reconnect');
};
_sessionEventsSSE.addEventListener('sessions_changed', () => {
_scheduleSessionEventsRefresh('event');
});
_sessionEventsSSE.onerror = () => {
_sessionEventsNeedsRefreshOnOpen = true;
_closeSessionEventsSSE();
if(_sessionEventsReconnectTimer) return;
_sessionEventsReconnectTimer = setTimeout(() => {
_sessionEventsReconnectTimer = 0;
ensureSessionEventsSSE();
}, 5000);
};
}catch(e){
_closeSessionEventsSSE();
}
}
if(typeof window!=='undefined') window.refreshSessionList = refreshSessionList;
function startGatewayPollFallback(ms){
const intervalMs = Math.max(5000, Number(ms) || _gatewayFallbackPollMs);
if(_gatewayPollTimer) clearInterval(_gatewayPollTimer);
+8 -1
View File
@@ -2012,7 +2012,14 @@ if(typeof window!=='undefined') window._resetScrollDirectionTracker=_resetScroll
if(progress>0.3) e.preventDefault();
},{passive:false});
el.addEventListener('touchend',function(){
if(_ptrState===2){ window.location.reload(); return; }
if(_ptrState===2){
if(typeof window.refreshSessionList==='function'){
Promise.resolve(window.refreshSessionList('pull', {force:true, refreshActive:true})).catch(()=>{}).finally(_ptrReset);
}else{
window.location.reload();
}
return;
}
_ptrReset();
},{passive:true});
el.addEventListener('touchcancel',_ptrReset,{passive:true});
@@ -234,6 +234,7 @@ def test_webui_installs_profile_context_on_in_process_scheduler_run_job(tmp_path
monkeypatch.setitem(sys.modules, "cron.scheduler", cron_scheduler)
monkeypatch.setattr(p, "_DEFAULT_HERMES_HOME", default_home)
monkeypatch.setattr(p, "cron_profile_context_for_home", Ctx)
monkeypatch.setattr(p, "publish_session_list_changed", lambda reason: events.append(("publish", reason)))
p.install_cron_scheduler_profile_isolation()
@@ -242,6 +243,7 @@ def test_webui_installs_profile_context_on_in_process_scheduler_run_job(tmp_path
("enter", str(research_home)),
("run", "job1575"),
("exit", str(research_home)),
("publish", "cron_complete"),
]
@@ -279,6 +281,7 @@ def test_scheduler_run_job_wrapper_does_not_reenter_manual_cron_context(tmp_path
monkeypatch.setitem(sys.modules, "cron.scheduler", cron_scheduler)
monkeypatch.setattr(p, "_DEFAULT_HERMES_HOME", tmp_path / "home")
monkeypatch.setattr(p, "cron_profile_context_for_home", Ctx)
monkeypatch.setattr(p, "publish_session_list_changed", lambda reason: events.append(("unexpected-publish", reason)))
monkeypatch.setattr(p._tls, "cron_profile_depth", 1, raising=False)
p.install_cron_scheduler_profile_isolation()
+51
View File
@@ -0,0 +1,51 @@
from pathlib import Path
ROUTES = Path("api/routes.py").read_text(encoding="utf-8")
SESSION_EVENTS = Path("api/session_events.py").read_text(encoding="utf-8")
PROFILES = Path("api/profiles.py").read_text(encoding="utf-8")
def test_session_events_endpoint_and_bus_are_defined():
assert "_SESSION_EVENTS_SUBSCRIBERS" in SESSION_EVENTS
assert "def publish_session_list_changed" in SESSION_EVENTS
assert "def _handle_session_events_stream" in ROUTES
assert "parsed.path == '/api/sessions/events'" in ROUTES
assert "Content-Type', 'text/event-stream; charset=utf-8'" in ROUTES
def test_session_events_publish_for_minimal_sidebar_mutations():
for reason in (
"session_new",
"session_delete",
"session_duplicate",
"session_import",
"session_import_cli",
"session_archive",
"session_move",
"session_pin",
"session_rename",
):
assert f'publish_session_list_changed("{reason}")' in ROUTES
assert 'if worktree_info:\n publish_session_list_changed("session_new")' in ROUTES
assert "was_hidden_empty_session = _is_hidden_empty_session(s)" in ROUTES
assert 'if was_hidden_empty_session:\n publish_session_list_changed("session_new")' in ROUTES
assert 'publish_session_list_changed("chat_start")' not in ROUTES
assert 'publish_session_list_changed("cron_complete")' in ROUTES
assert 'publish_session_list_changed("cron_complete")' in PROFILES
def test_session_event_queue_is_bounded_and_latest_wins():
from api import session_events
q = session_events.subscribe_session_events()
try:
session_events.publish_session_list_changed("first")
session_events.publish_session_list_changed("second")
payload = q.get_nowait()
assert payload["type"] == "sessions_changed"
assert payload["reason"] == "second"
assert q.empty()
finally:
session_events.unsubscribe_session_events(q)
@@ -2,6 +2,7 @@ from pathlib import Path
SESSIONS_JS = Path("static/sessions.js").read_text(encoding="utf-8")
UI_JS = Path("static/ui.js").read_text(encoding="utf-8")
def test_load_session_supports_force_reload_for_external_refresh():
@@ -27,6 +28,32 @@ def test_active_session_external_refresh_has_focus_and_visibility_hooks():
assert "ensureActiveSessionExternalRefreshPoll();" in SESSIONS_JS
def test_session_list_external_refresh_uses_sse_invalidation_not_polling():
"""New sessions should refresh the sidebar from server invalidation events."""
assert "async function refreshSessionList(reason='manual', opts={})" in SESSIONS_JS
assert "function ensureSessionEventsSSE()" in SESSIONS_JS
assert "new EventSource('api/sessions/events')" in SESSIONS_JS
assert "addEventListener('sessions_changed'" in SESSIONS_JS
assert "function _scheduleSessionEventsRefresh(reason)" in SESSIONS_JS
assert "_sessionEventsNeedsRefreshOnOpen = true" in SESSIONS_JS
assert "void refreshSessionList('reconnect')" in SESSIONS_JS
assert "renderSessionList({deferWhileInteracting:!force})" in SESSIONS_JS
assert "const refreshActive = !!(opts && opts.refreshActive)" in SESSIONS_JS
assert "if(refreshActive) await refreshActiveSessionIfExternallyUpdated(reason||'session-list')" in SESSIONS_JS
assert "_sessionListRefreshPendingReason = reason || 'session-list'" in SESSIONS_JS
assert "if(pendingReason) _scheduleSessionEventsRefresh(pendingReason)" in SESSIONS_JS
assert "ensureSessionEventsSSE();" in SESSIONS_JS
assert "document._hermesSessionEventsVisibilityHook" in SESSIONS_JS
ensure_fn = SESSIONS_JS[SESSIONS_JS.find("function ensureSessionEventsSSE()") :]
assert ensure_fn.find("document._hermesSessionEventsVisibilityHook") < ensure_fn.find("document.hidden) return")
assert "_sessionListExternalRefreshMs" not in SESSIONS_JS
def test_pwa_pull_to_refresh_refreshes_session_list_not_page_when_available():
assert "window.refreshSessionList('pull', {force:true, refreshActive:true})" in UI_JS
assert "Promise.resolve(window.refreshSessionList('pull', {force:true, refreshActive:true})).catch(()=>{}).finally(_ptrReset)" in UI_JS
def test_force_reload_clears_stale_blocking_prompts_immediately():
"""External refresh should not leave old approval/clarify modals blocking the composer.