v0.50.215: real /steer via agent.steer() — mid-turn correction without interrupt (#1069)

Co-authored-by: nesquena-hermes <nesquena-hermes@users.noreply.github.com>
Co-authored-by: nesquena <nesquena@users.noreply.github.com>
This commit is contained in:
nesquena-hermes
2026-04-25 19:21:00 -07:00
committed by GitHub
parent 520034c071
commit 3d96dc1498
11 changed files with 603 additions and 36 deletions
+9
View File
@@ -2,6 +2,15 @@
## [Unreleased]
## v0.50.215 — 2026-04-26
### Added
- **Real `/steer` command** — wires `/steer <text>` through the agent's thread-safe `agent.steer()` method rather than falling back to interrupt. Steer text is stashed in `_pending_steer` and injected into the next tool-result boundary without interrupting the current run, giving the agent a mid-turn course correction. New `/api/chat/steer` POST endpoint with five graceful fallback reasons (`no_cached_agent`, `agent_lacks_steer`, `session_not_found`, `not_running`, `stream_dead`) — any fallback transparently falls back to the existing interrupt+queue mechanism. (`api/routes.py`, `api/streaming.py`, `static/commands.js`, `static/messages.js`, `static/i18n.js`) Closes #720 follow-up [#1066 @nesquena]
- **Steer leftover delivery** — if the agent finishes its turn before hitting a tool boundary, the stashed steer text is drained and emitted as a `pending_steer_leftover` SSE event; the frontend queues it as a next-turn message, mirroring the CLI's existing leftover path. (`api/streaming.py`, `static/messages.js`) [#1066]
### Fixed
- **Pending files preserved on steer→interrupt fallback** — the busy-mode steer path in `send()` now defers `S.pendingFiles=[]` until after `_trySteer()` returns, so staged file attachments are not lost when the steer endpoint falls back to interrupt+queue. (`static/messages.js`)
## v0.50.214 — 2026-04-26
### Added
+1 -1
View File
@@ -3,7 +3,7 @@
> Goal: Full 1:1 parity with the Hermes CLI experience via a clean dark web UI.
> Everything you can do from the CLI terminal, you can do from this UI.
>
> Last updated: v0.50.214 (April 26, 2026) — 2299 tests collected
> Last updated: v0.50.215 (April 26, 2026) — 2319 tests collected
> Tests: 2107 collected (`pytest tests/ --collect-only -q`)
> Source: <repo>/
+1 -1
View File
@@ -8,7 +8,7 @@
> Prerequisites: SSH tunnel is active on port 8787. Open http://localhost:8787 in browser.
> Server health check: curl http://127.0.0.1:8787/health should return {"status":"ok"}.
>
> Automated coverage: 2276 tests collected via `pytest tests/ --collect-only -q`. Includes onboarding coverage for bootstrap/static wizard presence, real provider config persistence (`config.yaml` + `.env`), the `/api/onboarding/*` backend, the onboarding skip/existing-config guard, and CSS regression coverage for smooth thinking/tool card disclosure animation.
> Automated coverage: 2319 tests collected via `pytest tests/ --collect-only -q`. Includes onboarding coverage for bootstrap/static wizard presence, real provider config persistence (`config.yaml` + `.env`), the `/api/onboarding/*` backend, the onboarding skip/existing-config guard, and CSS regression coverage for smooth thinking/tool card disclosure animation.
> Run: `pytest tests/ -v --timeout=60`
>
> Local regression focus: verify that a previously closed workspace panel stays visually closed from first paint through boot completion on desktop refresh; there should be no brief open-then-close flash.
+4
View File
@@ -1345,6 +1345,10 @@ def handle_post(handler, parsed) -> bool:
if parsed.path == "/api/chat":
return _handle_chat_sync(handler, body)
if parsed.path == "/api/chat/steer":
from api.streaming import _handle_chat_steer
return _handle_chat_steer(handler, body)
# ── Cron API (POST) ──
if parsed.path == "/api/crons/create":
return _handle_cron_create(handler, body)
+93
View File
@@ -1936,6 +1936,23 @@ def _run_agent_streaming(session_id, msg_text, model, workspace, stream_id, atta
usage['threshold_tokens'] = getattr(_cc, 'threshold_tokens', 0) or 0
usage['last_prompt_tokens'] = getattr(_cc, 'last_prompt_tokens', 0) or 0
# (reasoning trace already attached + saved above, before s.save())
# Leftover-steer delivery: if a /steer was queued (via
# api/chat/steer) but the agent finished its turn before
# reaching a tool-result boundary that would consume it,
# the text is still stashed in agent._pending_steer. Drain
# it now and emit a pending_steer_leftover SSE event so the
# frontend can queue it for the next turn — same fallback
# path as the CLI in cli.py:8788-8794.
try:
_drain_pending_steer = getattr(agent, '_drain_pending_steer', None)
_leftover = _drain_pending_steer() if _drain_pending_steer else None
if _leftover:
put('pending_steer_leftover', {
'session_id': session_id,
'text': str(_leftover),
})
except Exception:
logger.debug("Failed to drain pending steer for session %s", session_id)
raw_session = s.compact() | {'messages': s.messages, 'tool_calls': tool_calls}
put('done', {'session': redact_session_data(raw_session), 'usage': usage})
# Emit metering stats for the header TPS label
@@ -2098,6 +2115,82 @@ def _run_agent_streaming(session_id, msg_text, model, workspace, stream_id, atta
# ============================================================
def _handle_chat_steer(handler, body: dict) -> bool:
"""Inject a /steer payload into the active agent for a session.
Mirrors the CLI's `/steer <text>` command (cli.py:6140-6155):
- Look up the cached AIAgent for the session (PR #1051's
SESSION_AGENT_CACHE).
- Verify a stream is currently active for this session.
- Call agent.steer(text) thread-safe, stashes text in
_pending_steer for application at the next tool-result boundary.
The agent's loop calls _apply_pending_steer_to_tool_results() at the
end of every tool batch and appends the steer text to the last tool
result's content with a marker, so the model sees the steer as part
of the tool output on its next iteration. The user's stream is NOT
interrupted.
If no agent is cached, the agent is too old to support steer, or no
stream is active, return {"accepted": False, "fallback": "<reason>"}
so the frontend can fall back to interrupt or queue mode. The
fallback path is the existing behaviour from PR #1062.
Returns 200 with {"accepted": bool, "fallback": str|None,
"stream_id": str|None}.
"""
from api.helpers import j, bad
from api.config import SESSION_AGENT_CACHE, SESSION_AGENT_CACHE_LOCK
sid = str((body or {}).get("session_id", "") or "").strip()
text = str((body or {}).get("text", "") or "").strip()
if not sid:
return bad(handler, "session_id required")
if not text:
return bad(handler, "text required")
with SESSION_AGENT_CACHE_LOCK:
cached = SESSION_AGENT_CACHE.get(sid)
if not cached:
# No active agent for this session — caller falls back to interrupt
return j(handler, {"accepted": False, "fallback": "no_cached_agent",
"stream_id": None})
agent = cached[0]
if not hasattr(agent, "steer"):
# Older hermes-agent that pre-dates the steer() method
return j(handler, {"accepted": False, "fallback": "agent_lacks_steer",
"stream_id": None})
# Verify the agent is currently running. Use the session's
# active_stream_id rather than calling load_session_locked() which
# would block on the streaming thread's lock.
try:
s = get_session(sid)
except KeyError:
return j(handler, {"accepted": False, "fallback": "session_not_found",
"stream_id": None})
active_stream_id = getattr(s, "active_stream_id", None) or None
if not active_stream_id:
return j(handler, {"accepted": False, "fallback": "not_running",
"stream_id": None})
with STREAMS_LOCK:
stream_alive = active_stream_id in STREAMS
if not stream_alive:
# Active stream id is stale — stream has ended; caller falls back
return j(handler, {"accepted": False, "fallback": "stream_dead",
"stream_id": None})
try:
accepted = bool(agent.steer(text))
except Exception as exc:
logger.debug("agent.steer() raised for session=%s: %s", sid, exc)
return j(handler, {"accepted": False, "fallback": "steer_error",
"stream_id": active_stream_id})
return j(handler, {"accepted": accepted, "fallback": None,
"stream_id": active_stream_id})
def cancel_stream(stream_id: str) -> bool:
"""Signal an in-flight stream to cancel. Returns True if the stream existed.
+52 -7
View File
@@ -573,23 +573,68 @@ async function cmdInterrupt(args){
}
/**
* /steer <message> Inject a steering hint mid-task.
* Currently falls back to interrupt behaviour because the WebUI cannot
* inject messages into an in-flight agent thread. Shows a toast to
* inform the user that true steering is not yet available.
* /steer <message> Inject a steering hint mid-task without interrupting.
*
* Calls POST /api/chat/steer which looks up the cached AIAgent for this
* session and calls agent.steer(text). The agent's run loop appends the
* steer text to the next tool-result message so the model sees it on its
* next iteration same pathway as the CLI's /steer command.
*
* Falls back to interrupt mode when the agent isn't running, isn't cached,
* or doesn't support steer (older hermes-agent versions).
*/
async function cmdSteer(args){
const msg=(args||'').trim();
if(!msg){showToast(t('cmd_steer_no_msg'));return;}
if(!S.busy||!S.activeStreamId){showToast(t('no_active_task'));return;}
if(!S.session){showToast(t('no_active_session'));return;}
// True steer (inject without cancelling) requires agent-side support
// that is not yet available in the WebUI. Fall back to interrupt.
await _trySteer(msg, /*explicitSteer=*/true);
}
/**
* Shared implementation for /steer and the busy_input_mode='steer' path.
*
* Tries the real steer endpoint first. On any non-accept response (no cached
* agent, agent lacks steer, stream dead, etc.) falls back to interrupt mode:
* queue the message + cancel the stream so the existing drain re-sends.
*
* @param {string} msg - The steer text.
* @param {boolean} explicitSteer - True if the user explicitly invoked /steer
* (vs the busy-mode auto-fallback). Affects toast wording only.
*/
async function _trySteer(msg, explicitSteer){
let result=null;
try{
result=await api('/api/chat/steer',{
method:'POST',
body:JSON.stringify({session_id:S.session.session_id,text:msg}),
});
}catch(e){
// Network or server error — fall back to interrupt
result={accepted:false, fallback:'network_error'};
}
if(result&&result.accepted){
showToast(t('cmd_steer_delivered'),2500);
return;
}
// Fall back to interrupt: queue the message + cancel the stream so the
// drain in setBusy(false) re-sends it as a fresh turn.
queueSessionMessage(S.session.session_id,{text:msg,files:[...S.pendingFiles],model:S.session&&S.session.model||($('modelSelect')&&$('modelSelect').value)||'',profile:S.activeProfile||'default'});
updateQueueBadge(S.session.session_id);
S.pendingFiles=[];renderTray();
if(typeof cancelStream==='function'){await cancelStream();}
showToast(t('cmd_steer_fallback'),2500);
// Toast wording differs based on why we're falling back so the user
// understands what just happened.
const reason=(result&&result.fallback)||'unknown';
if(explicitSteer){
showToast(t('cmd_steer_fallback'),2500);
} else if(reason==='no_cached_agent'||reason==='not_running'||reason==='stream_dead'){
// Busy mode hit the steer path before the agent was ready —
// interrupt is the natural fallback, no need to call out steer.
showToast(t('busy_interrupt_confirm'),2000);
} else {
showToast(t('busy_steer_fallback'),2500);
}
}
async function cmdTitle(args){
+18 -6
View File
@@ -108,7 +108,9 @@ const LOCALES = {
cmd_interrupt_no_msg: 'Usage: /interrupt <message>',
cmd_interrupt_confirm: 'Interrupted — sending new message',
cmd_steer_no_msg: 'Usage: /steer <message>',
cmd_steer_fallback: 'Steer is not yet available — interrupted and queued instead',
cmd_steer_fallback: 'Steer unavailable — interrupted and queued instead',
cmd_steer_delivered: 'Steer delivered — agent will see it on its next tool result',
steer_leftover_queued: 'Steer queued for next turn',
busy_steer_fallback: 'Steer not available — interrupted instead',
busy_interrupt_confirm: 'Interrupted — sending new message',
settings_label_busy_input_mode: 'Busy input mode',
@@ -734,7 +736,9 @@ const LOCALES = {
cmd_interrupt_no_msg: 'Использование: /interrupt <сообщение>',
cmd_interrupt_confirm: 'Прервано — отправка нового сообщения',
cmd_steer_no_msg: 'Использование: /steer <сообщение>',
cmd_steer_fallback: 'Steer пока недоступен — прервано и поставлено в очередь',
cmd_steer_fallback: 'Steer недоступен — прервано и поставлено в очередь',
cmd_steer_delivered: 'Steer доставлен — агент увидит его в следующем ответе инструмента',
steer_leftover_queued: 'Steer поставлен в очередь на следующий ход',
busy_steer_fallback: 'Steer недоступен — прервано',
busy_interrupt_confirm: 'Прервано — отправка нового сообщения',
settings_label_busy_input_mode: 'Режим ввода при занятости',
@@ -1343,7 +1347,9 @@ const LOCALES = {
cmd_interrupt_no_msg: 'Uso: /interrupt <mensaje>',
cmd_interrupt_confirm: 'Interrumpido \u2014 enviando nuevo mensaje',
cmd_steer_no_msg: 'Uso: /steer <mensaje>',
cmd_steer_fallback: 'Steer no disponible a\u00fan \u2014 interrumpido y encolado',
cmd_steer_fallback: 'Steer no disponible \u2014 interrumpido y encolado',
cmd_steer_delivered: 'Steer entregado \u2014 el agente lo ver\u00e1 en su pr\u00f3ximo resultado de herramienta',
steer_leftover_queued: 'Steer en cola para el pr\u00f3ximo turno',
busy_steer_fallback: 'Steer no disponible \u2014 interrumpido',
busy_interrupt_confirm: 'Interrumpido \u2014 enviando nuevo mensaje',
settings_label_busy_input_mode: 'Modo de entrada ocupada',
@@ -1921,7 +1927,9 @@ const LOCALES = {
cmd_interrupt_no_msg: 'Verwendung: /interrupt <Nachricht>',
cmd_interrupt_confirm: 'Unterbrochen \u2014 neue Nachricht wird gesendet',
cmd_steer_no_msg: 'Verwendung: /steer <Nachricht>',
cmd_steer_fallback: 'Steer noch nicht verf\u00fcgbar \u2014 unterbrochen und eingereiht',
cmd_steer_fallback: 'Steer nicht verf\u00fcgbar \u2014 unterbrochen und eingereiht',
cmd_steer_delivered: 'Steer geliefert \u2014 der Agent sieht es bei seinem n\u00e4chsten Tool-Ergebnis',
steer_leftover_queued: 'Steer f\u00fcr n\u00e4chsten Durchgang eingereiht',
busy_steer_fallback: 'Steer nicht verf\u00fcgbar \u2014 unterbrochen',
busy_interrupt_confirm: 'Unterbrochen \u2014 neue Nachricht wird gesendet',
settings_label_busy_input_mode: 'Eingabemodus bei Besch\u00e4ftigung',
@@ -2289,7 +2297,9 @@ const LOCALES = {
cmd_interrupt_no_msg: '\u7528\u6cd5\uff1a/interrupt <\u6d88\u606f>',
cmd_interrupt_confirm: '\u5df2\u4e2d\u65ad \u2014 \u6b63\u5728\u53d1\u9001\u65b0\u6d88\u606f',
cmd_steer_no_msg: '\u7528\u6cd5\uff1a/steer <\u6d88\u606f>',
cmd_steer_fallback: 'Steer \u5c1a\u4e0d\u53ef\u7528 \u2014 \u5df2\u4e2d\u65ad\u5e76\u52a0\u5165\u961f\u5217',
cmd_steer_fallback: 'Steer \u4e0d\u53ef\u7528 \u2014 \u5df2\u4e2d\u65ad\u5e76\u52a0\u5165\u961f\u5217',
cmd_steer_delivered: 'Steer \u5df2\u4ea4\u4ed8 \u2014 \u4ee3\u7406\u5c06\u5728\u4e0b\u4e00\u4e2a\u5de5\u5177\u7ed3\u679c\u4e2d\u770b\u5230',
steer_leftover_queued: 'Steer \u5df2\u52a0\u5165\u4e0b\u8f6e\u961f\u5217',
busy_steer_fallback: 'Steer \u4e0d\u53ef\u7528 \u2014 \u5df2\u4e2d\u65ad',
busy_interrupt_confirm: '\u5df2\u4e2d\u65ad \u2014 \u6b63\u5728\u53d1\u9001\u65b0\u6d88\u606f',
settings_label_busy_input_mode: '\u5fd9\u788c\u8f93\u5165\u6a21\u5f0f',
@@ -3212,7 +3222,9 @@ const LOCALES = {
cmd_interrupt_no_msg: '\u7528\u6cd5\uff1a/interrupt <\u8a0a\u606f>',
cmd_interrupt_confirm: '\u5df2\u4e2d\u65ad \u2014 \u6b63\u5728\u767c\u9001\u65b0\u8a0a\u606f',
cmd_steer_no_msg: '\u7528\u6cd5\uff1a/steer <\u8a0a\u606f>',
cmd_steer_fallback: 'Steer \u5c1a\u4e0d\u53ef\u7528 \u2014 \u5df2\u4e2d\u65ad\u4e26\u52a0\u5165\u4f47\u5217',
cmd_steer_fallback: 'Steer \u4e0d\u53ef\u7528 \u2014 \u5df2\u4e2d\u65ad\u4e26\u52a0\u5165\u4f47\u5217',
cmd_steer_delivered: 'Steer \u5df2\u9001\u9054 \u2014 \u4ee3\u7406\u5c07\u5728\u4e0b\u4e00\u500b\u5de5\u5177\u7d50\u679c\u4e2d\u770b\u5230',
steer_leftover_queued: 'Steer \u5df2\u52a0\u5165\u4e0b\u4e00\u8f2a\u4f47\u5217',
busy_steer_fallback: 'Steer \u4e0d\u53ef\u7528 \u2014 \u5df2\u4e2d\u65ad',
busy_interrupt_confirm: '\u5df2\u4e2d\u65ad \u2014 \u6b63\u5728\u767c\u9001\u65b0\u8a0a\u606f',
settings_label_busy_input_mode: '\u5fd9\u788c\u8f38\u5165\u6a21\u5f0f',
+1 -1
View File
@@ -669,7 +669,7 @@
<option value="interrupt" data-i18n="settings_busy_input_mode_interrupt">Interrupt current turn</option>
<option value="steer" data-i18n="settings_busy_input_mode_steer">Steer (interrupt + send)</option>
</select>
<div style="font-size:11px;color:var(--muted);margin-top:4px" data-i18n="settings_desc_busy_input_mode">Controls what happens when you send a message while the agent is running. Queue waits for the current task; Interrupt cancels and starts fresh; Steer sends a correction (currently falls back to interrupt).</div>
<div style="font-size:11px;color:var(--muted);margin-top:4px" data-i18n="settings_desc_busy_input_mode">Controls what happens when you send a message while the agent is running. Queue waits for the current task; Interrupt cancels and starts fresh; Steer injects a mid-turn correction without interrupting (falls back to interrupt when the agent is not yet cached or the stream has ended).</div>
</div>
<div class="settings-field">
<label style="display:flex;align-items:center;gap:8px;cursor:pointer">
+40 -8
View File
@@ -15,24 +15,34 @@ async function send(){
if(text){
if(!S.session){await newSession();await renderSessionList();}
const busyMode=window._busyInputMode||'queue';
if(busyMode==='interrupt'||busyMode==='steer'){
// Cancel the current stream, then queue so drain sends it after cleanup
if(busyMode==='steer'&&S.activeStreamId&&typeof _trySteer==='function'){
// Real steer: clear the input first so the user gets immediate
// feedback, then ship the steer payload via /api/chat/steer.
// _trySteer falls back to queue+cancel internally if the agent
// isn't running / cached / steer-capable.
$('msg').value='';autoResize();
// Do NOT clear pendingFiles yet — _trySteer may fall back to
// interrupt+queue and needs the files for queueSessionMessage.
// _trySteer clears pendingFiles itself in the fallback path, and
// the server returns accepted:true (no files sent) on success.
await _trySteer(text, /*explicitSteer=*/false);
// After _trySteer: clear any remaining files (success path).
S.pendingFiles=[];renderTray();
} else if(busyMode==='interrupt'){
// Queue the message, then cancel so drain re-sends it.
queueSessionMessage(S.session.session_id,{text,files:[...S.pendingFiles],model:S.session&&S.session.model||($('modelSelect')&&$('modelSelect').value)||'',profile:S.activeProfile||'default'});
updateQueueBadge(S.session.session_id);
$('msg').value='';autoResize();
S.pendingFiles=[];renderTray();
if(S.activeStreamId&&typeof cancelStream==='function'){
if(busyMode==='steer'){
showToast(t('busy_steer_fallback'),2500);
} else {
showToast(t('busy_interrupt_confirm'),2000);
}
showToast(t('busy_interrupt_confirm'),2000);
await cancelStream();
} else {
showToast(`Queued: "${text.slice(0,40)}${text.length>40?'…':''}"`,2000);
}
} else {
// Default: queue mode (current behavior)
// Default: queue mode (current behavior). Also the fallback for
// 'steer' mode when no stream is active or _trySteer is unavailable.
queueSessionMessage(S.session.session_id,{text,files:[...S.pendingFiles],model:S.session&&S.session.model||($('modelSelect')&&$('modelSelect').value)||'',profile:S.activeProfile||'default'});
$('msg').value='';autoResize();
S.pendingFiles=[];renderTray();
@@ -764,6 +774,28 @@ function attachLiveStream(activeSid, streamId, uploaded=[], options={}){
source.close();
});
source.addEventListener('pending_steer_leftover',e=>{
// The agent finished its turn with steer text still stashed (no
// tool-result boundary fired). Match the CLI's leftover-delivery
// behaviour: queue the leftover text as a next-turn user message
// so the existing drain in setBusy(false) ships it.
try{
const d=JSON.parse(e.data||'{}');
const sid=d.session_id||activeSid;
const txt=String(d.text||'').trim();
if(!txt||sid!==activeSid) return;
if(typeof queueSessionMessage==='function'){
queueSessionMessage(sid,{
text:txt,files:[],
model:S.session&&S.session.model||'',
profile:S.activeProfile||'default',
});
if(typeof updateQueueBadge==='function') updateQueueBadge(sid);
showToast(t('steer_leftover_queued'),3000);
}
}catch(_){}
});
source.addEventListener('compressed',e=>{
// Context was auto-compressed during this turn -- show a system message
if(!S.session||S.session.session_id!==activeSid) return;
+37 -12
View File
@@ -90,35 +90,60 @@ class TestSlashCommandHandlers:
assert "queueSessionMessage" in body, "/interrupt must queue the new message before cancelling"
assert "cancelStream" in body, "/interrupt must call cancelStream() so the drain re-sends"
def test_cmd_steer_falls_back_to_interrupt(self):
"""Steer is a placeholder — currently uses interrupt semantics with a different toast."""
def test_cmd_steer_delegates_to_try_steer(self):
"""/steer delegates to _trySteer which calls /api/chat/steer with
a queue+cancel fallback. The fallback path is exercised by tests
in test_real_steer.py this test just pins the delegation."""
idx = COMMANDS_JS.find("async function cmdSteer(")
assert idx >= 0
body = COMMANDS_JS[idx:idx + 800]
assert "queueSessionMessage" in body
assert "cancelStream" in body
# Toast should differ from interrupt to signal the placeholder
assert "cmd_steer_fallback" in body or "steer_fallback" in body
# cmdSteer now delegates to _trySteer; the fallback (queueSessionMessage
# + cancelStream) lives inside _trySteer.
assert "_trySteer" in body, "cmdSteer must call _trySteer to use the real /api/chat/steer endpoint"
# The shared helper must contain the fallback path
helper_idx = COMMANDS_JS.find("async function _trySteer(")
assert helper_idx >= 0, "_trySteer helper must exist"
helper_body = COMMANDS_JS[helper_idx:helper_idx + 1500]
assert "queueSessionMessage" in helper_body
assert "cancelStream" in helper_body
# Toast should differ from interrupt to signal it's the steer path
assert "cmd_steer_fallback" in helper_body or "steer_fallback" in helper_body
# ── send() busy branch ───────────────────────────────────────────────────
def test_slash_commands_clear_pending_files(self):
"""All three slash command handlers must clear S.pendingFiles and call
renderTray() after enqueuing, so staged files are not duplicated when
the next send() fires.
"""All three busy command handlers must clear S.pendingFiles (directly
or via _trySteer) after enqueuing, so staged files are not duplicated.
cmdQueue and cmdInterrupt call queueSessionMessage themselves and clear
S.pendingFiles directly. cmdSteer delegates to _trySteer. The fallback/interrupt path clears
S.pendingFiles inside _trySteer; the success path returns early and
send() handles the post-await clear. Either way files are not
duplicated we verify by checking _trySteer body for the clearing.
"""
for fn_name in ("cmdQueue", "cmdInterrupt", "cmdSteer"):
# cmdQueue and cmdInterrupt clear pendingFiles directly
for fn_name in ("cmdQueue", "cmdInterrupt"):
idx = COMMANDS_JS.find(f"function {fn_name}(")
assert idx >= 0, f"{fn_name} not found"
body = COMMANDS_JS[idx:idx + 800]
assert "S.pendingFiles=[]" in body, (
f"{fn_name} must clear S.pendingFiles after queueSessionMessage"
"otherwise staged files are re-attached on the next send()"
f"{fn_name} must clear S.pendingFiles after queueSessionMessage"
)
assert "renderTray()" in body, (
f"{fn_name} must call renderTray() after clearing pendingFiles"
)
# cmdSteer delegates to _trySteer; that helper clears pendingFiles
idx_try = COMMANDS_JS.find("function _trySteer(")
assert idx_try >= 0, "_trySteer not found"
try_body = COMMANDS_JS[idx_try:idx_try + 1200]
assert "S.pendingFiles=[]" in try_body, (
"_trySteer must clear S.pendingFiles in its fallback path — "
"without this, files are lost on steer→interrupt fallback"
)
assert "renderTray()" in try_body, (
"_trySteer must call renderTray() after clearing pendingFiles"
)
class TestSendBusyBranchDispatch:
+347
View File
@@ -0,0 +1,347 @@
"""Tests for real /steer functionality (follow-up to PR #1062).
Covers the new POST /api/chat/steer endpoint which mirrors the CLI's /steer
command (cli.py:6140-6155): the endpoint looks up the cached AIAgent for the
session, calls agent.steer(text), and the agent's run loop appends the steer
text to the next tool-result message no interruption.
Falls back to {"accepted": false, "fallback": "<reason>"} when the agent
isn't running, isn't cached, or doesn't support steer (older agent versions).
The frontend uses the fallback signal to drop back to interrupt mode.
Plus a leftover-delivery flow: if the agent finishes its turn before the
steer is consumed (no tool-call boundary), _drain_pending_steer is called
after run_conversation returns and a `pending_steer_leftover` SSE event is
emitted so the frontend can queue the leftover text as a next-turn message.
"""
import sys
import os
import unittest
from pathlib import Path
from unittest.mock import MagicMock, patch
import pytest
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..'))
@pytest.fixture(autouse=True)
def _restore_auth_sessions():
"""Snapshot and restore api.auth._sessions — see test_1058 for the rationale."""
import api.auth as _auth
snapshot = dict(_auth._sessions)
yield
_auth._sessions.clear()
_auth._sessions.update(snapshot)
@pytest.fixture
def _clear_caches():
"""Snapshot SESSION_AGENT_CACHE and STREAMS so tests don't bleed."""
from api.config import SESSION_AGENT_CACHE, SESSION_AGENT_CACHE_LOCK, STREAMS, STREAMS_LOCK
with SESSION_AGENT_CACHE_LOCK:
cache_snap = dict(SESSION_AGENT_CACHE)
SESSION_AGENT_CACHE.clear()
with STREAMS_LOCK:
streams_snap = dict(STREAMS)
STREAMS.clear()
yield
with SESSION_AGENT_CACHE_LOCK:
SESSION_AGENT_CACHE.clear()
SESSION_AGENT_CACHE.update(cache_snap)
with STREAMS_LOCK:
STREAMS.clear()
STREAMS.update(streams_snap)
def _make_handler():
"""Minimal handler stub matching the methods api.helpers.j() touches."""
h = MagicMock()
h.wfile = MagicMock()
h.headers = MagicMock()
h.headers.get = MagicMock(return_value="")
return h
def _captured_response(handler):
"""Pull the JSON body that j() wrote to handler.wfile."""
import json as _json
# j() calls handler.wfile.write(body)
write_calls = handler.wfile.write.call_args_list
assert write_calls, "no body was written to handler.wfile"
body = write_calls[-1][0][0]
return _json.loads(body.decode("utf-8"))
def _captured_status(handler):
"""Pull the HTTP status passed to handler.send_response()."""
calls = handler.send_response.call_args_list
assert calls, "no status was sent"
return calls[-1][0][0]
# ── Backend: the /api/chat/steer endpoint ─────────────────────────────────
class TestHandleChatSteerHappyPath:
"""Endpoint accepts text and calls agent.steer() when all gates pass."""
def test_accepts_when_agent_cached_and_running(self, _clear_caches):
from api.streaming import _handle_chat_steer
from api.config import SESSION_AGENT_CACHE, SESSION_AGENT_CACHE_LOCK, STREAMS, STREAMS_LOCK
sid, stream_id = "sid_happy", "stream_happy"
agent = MagicMock()
agent.steer = MagicMock(return_value=True)
with SESSION_AGENT_CACHE_LOCK:
SESSION_AGENT_CACHE[sid] = (agent, "sig")
with STREAMS_LOCK:
import queue as _q
STREAMS[stream_id] = _q.Queue()
sess = MagicMock()
sess.active_stream_id = stream_id
with patch("api.streaming.get_session", return_value=sess):
handler = _make_handler()
_handle_chat_steer(handler, {"session_id": sid, "text": "Use Python instead"})
agent.steer.assert_called_once_with("Use Python instead")
body = _captured_response(handler)
assert body == {"accepted": True, "fallback": None, "stream_id": stream_id}
class TestHandleChatSteerFallbacks:
"""Each gate that fails returns a structured fallback the frontend can branch on."""
def test_no_cached_agent(self, _clear_caches):
from api.streaming import _handle_chat_steer
handler = _make_handler()
_handle_chat_steer(handler, {"session_id": "sid_x", "text": "hint"})
body = _captured_response(handler)
assert body["accepted"] is False
assert body["fallback"] == "no_cached_agent"
def test_agent_lacks_steer_method(self, _clear_caches):
from api.streaming import _handle_chat_steer
from api.config import SESSION_AGENT_CACHE, SESSION_AGENT_CACHE_LOCK
sid = "sid_old"
# Older agent without steer() — use spec to suppress MagicMock auto-create
agent = MagicMock(spec=["interrupt", "run_conversation"])
with SESSION_AGENT_CACHE_LOCK:
SESSION_AGENT_CACHE[sid] = (agent, "sig")
handler = _make_handler()
_handle_chat_steer(handler, {"session_id": sid, "text": "hint"})
body = _captured_response(handler)
assert body["accepted"] is False
assert body["fallback"] == "agent_lacks_steer"
def test_session_not_found(self, _clear_caches):
from api.streaming import _handle_chat_steer
from api.config import SESSION_AGENT_CACHE, SESSION_AGENT_CACHE_LOCK
sid = "sid_missing"
agent = MagicMock()
agent.steer = MagicMock(return_value=True)
with SESSION_AGENT_CACHE_LOCK:
SESSION_AGENT_CACHE[sid] = (agent, "sig")
with patch("api.streaming.get_session", side_effect=KeyError(sid)):
handler = _make_handler()
_handle_chat_steer(handler, {"session_id": sid, "text": "hint"})
body = _captured_response(handler)
assert body["accepted"] is False
assert body["fallback"] == "session_not_found"
agent.steer.assert_not_called() # never reached the steer call
def test_session_not_running(self, _clear_caches):
from api.streaming import _handle_chat_steer
from api.config import SESSION_AGENT_CACHE, SESSION_AGENT_CACHE_LOCK
sid = "sid_idle"
agent = MagicMock()
agent.steer = MagicMock(return_value=True)
with SESSION_AGENT_CACHE_LOCK:
SESSION_AGENT_CACHE[sid] = (agent, "sig")
sess = MagicMock()
sess.active_stream_id = None # idle session
with patch("api.streaming.get_session", return_value=sess):
handler = _make_handler()
_handle_chat_steer(handler, {"session_id": sid, "text": "hint"})
body = _captured_response(handler)
assert body["accepted"] is False
assert body["fallback"] == "not_running"
agent.steer.assert_not_called()
def test_stream_dead(self, _clear_caches):
"""Session has active_stream_id but the stream is gone from STREAMS (e.g. crashed)."""
from api.streaming import _handle_chat_steer
from api.config import SESSION_AGENT_CACHE, SESSION_AGENT_CACHE_LOCK
sid = "sid_zombie"
agent = MagicMock()
agent.steer = MagicMock(return_value=True)
with SESSION_AGENT_CACHE_LOCK:
SESSION_AGENT_CACHE[sid] = (agent, "sig")
sess = MagicMock()
sess.active_stream_id = "stream_zombie"
with patch("api.streaming.get_session", return_value=sess):
handler = _make_handler()
_handle_chat_steer(handler, {"session_id": sid, "text": "hint"})
body = _captured_response(handler)
assert body["accepted"] is False
assert body["fallback"] == "stream_dead"
agent.steer.assert_not_called()
def test_steer_raises(self, _clear_caches):
"""If agent.steer() raises, return steer_error rather than 500."""
from api.streaming import _handle_chat_steer
from api.config import SESSION_AGENT_CACHE, SESSION_AGENT_CACHE_LOCK, STREAMS, STREAMS_LOCK
sid, stream_id = "sid_throws", "stream_throws"
agent = MagicMock()
agent.steer = MagicMock(side_effect=RuntimeError("boom"))
with SESSION_AGENT_CACHE_LOCK:
SESSION_AGENT_CACHE[sid] = (agent, "sig")
with STREAMS_LOCK:
import queue as _q
STREAMS[stream_id] = _q.Queue()
sess = MagicMock()
sess.active_stream_id = stream_id
with patch("api.streaming.get_session", return_value=sess):
handler = _make_handler()
_handle_chat_steer(handler, {"session_id": sid, "text": "hint"})
body = _captured_response(handler)
assert body["accepted"] is False
assert body["fallback"] == "steer_error"
class TestHandleChatSteerInputValidation:
"""Bad input → 400 Bad Request, not silent acceptance."""
def test_missing_session_id(self, _clear_caches):
from api.streaming import _handle_chat_steer
handler = _make_handler()
_handle_chat_steer(handler, {"text": "hint"})
assert _captured_status(handler) == 400
def test_missing_text(self, _clear_caches):
from api.streaming import _handle_chat_steer
handler = _make_handler()
_handle_chat_steer(handler, {"session_id": "sid"})
assert _captured_status(handler) == 400
def test_empty_text_after_strip(self, _clear_caches):
from api.streaming import _handle_chat_steer
handler = _make_handler()
_handle_chat_steer(handler, {"session_id": "sid", "text": " \n\t "})
assert _captured_status(handler) == 400
# ── Routing ───────────────────────────────────────────────────────────────
class TestRouting:
"""The POST handler must dispatch /api/chat/steer to _handle_chat_steer."""
def test_route_registered(self):
src = (Path(__file__).parent.parent / "api" / "routes.py").read_text(encoding="utf-8")
assert '/api/chat/steer' in src
assert '_handle_chat_steer' in src
# ── Frontend: cmdSteer + busy-mode steer use the new endpoint ────────────
class TestFrontendWiring:
"""The slash command and busy-mode steer paths must call /api/chat/steer."""
@classmethod
def setup_class(cls):
cls.cmds = (Path(__file__).parent.parent / "static" / "commands.js").read_text(encoding="utf-8")
cls.msgs = (Path(__file__).parent.parent / "static" / "messages.js").read_text(encoding="utf-8")
cls.i18n = (Path(__file__).parent.parent / "static" / "i18n.js").read_text(encoding="utf-8")
def test_cmd_steer_calls_endpoint(self):
idx = self.cmds.find("async function cmdSteer(")
assert idx >= 0
body = self.cmds[idx:idx + 600]
# Should call _trySteer (which calls the endpoint), not directly cancelStream
assert "_trySteer" in body, "cmdSteer must delegate to _trySteer"
def test_try_steer_calls_endpoint(self):
idx = self.cmds.find("async function _trySteer(")
assert idx >= 0
body = self.cmds[idx:idx + 1500]
assert "/api/chat/steer" in body, "_trySteer must POST to /api/chat/steer"
assert "method:'POST'" in body or 'method:"POST"' in body
def test_try_steer_handles_fallback(self):
idx = self.cmds.find("async function _trySteer(")
body = self.cmds[idx:idx + 1500]
# Must check result.accepted and fall back via queueSessionMessage + cancelStream
assert "result&&result.accepted" in body or "result.accepted" in body
assert "queueSessionMessage" in body
assert "cancelStream" in body, "fallback path must cancel the stream"
def test_send_busy_steer_uses_try_steer(self):
# send() in messages.js: when busyMode === 'steer', should call _trySteer
idx = self.msgs.find("busyMode==='steer'")
assert idx >= 0
block = self.msgs[idx:idx + 800]
assert "_trySteer" in block, "send()'s steer branch must delegate to _trySteer"
def test_pending_steer_leftover_listener(self):
"""Frontend must listen for pending_steer_leftover SSE events and queue them."""
idx = self.msgs.find("addEventListener('pending_steer_leftover'")
assert idx >= 0, "messages.js must add a listener for pending_steer_leftover"
block = self.msgs[idx:idx + 600]
assert "queueSessionMessage" in block, (
"pending_steer_leftover handler must queue the leftover text for the next turn"
)
# ── i18n keys ─────────────────────────────────────────────────────────────
class TestI18nKeys:
"""The two new keys (cmd_steer_delivered, steer_leftover_queued) must be in all 6 locales."""
@classmethod
def setup_class(cls):
cls.i18n = (Path(__file__).parent.parent / "static" / "i18n.js").read_text(encoding="utf-8")
def test_cmd_steer_delivered_in_all_locales(self):
assert self.i18n.count("cmd_steer_delivered:") >= 6, (
f"cmd_steer_delivered appears {self.i18n.count('cmd_steer_delivered:')} times; "
f"expected ≥6 (one per locale)"
)
def test_steer_leftover_queued_in_all_locales(self):
assert self.i18n.count("steer_leftover_queued:") >= 6, (
f"steer_leftover_queued appears {self.i18n.count('steer_leftover_queued:')} times; "
f"expected ≥6 (one per locale)"
)
# ── Leftover SSE delivery: streaming.py emits pending_steer_leftover ─────
class TestLeftoverDelivery:
"""After run_conversation returns, _drain_pending_steer is called and a
pending_steer_leftover SSE event is emitted if there's still text stashed."""
def test_leftover_drain_call_in_streaming(self):
"""Verify the streaming.py source contains the drain call before put('done', ...)."""
src = (Path(__file__).parent.parent / "api" / "streaming.py").read_text(encoding="utf-8")
assert "_drain_pending_steer" in src, (
"_run_agent_streaming must call agent._drain_pending_steer() to deliver leftovers"
)
assert "pending_steer_leftover" in src, (
"_run_agent_streaming must emit a pending_steer_leftover SSE event"
)
def test_leftover_drain_runs_before_done_event(self):
"""The drain must happen BEFORE put('done', ...) so frontend gets both events
on the same turn."""
src = (Path(__file__).parent.parent / "api" / "streaming.py").read_text(encoding="utf-8")
# Find the drain invocation and the next put('done', ...) AFTER it
drain_idx = src.find("_drain_pending_steer()")
assert drain_idx >= 0
done_idx = src.find("put('done'", drain_idx)
assert done_idx >= 0
# No put('done', ...) should appear BEFORE the drain in the same code block
# (we already check the drain is in the file; ordering matters within the
# non-ephemeral success path)
assert drain_idx < done_idx, (
"_drain_pending_steer must run before put('done', ...) so the SSE listener "
"sees the leftover before stream_end fires"
)