From 1cf0ff01b5ebb701a0db69fb64e4e66e67b860ce Mon Sep 17 00:00:00 2001 From: dobby-d-elf Date: Sun, 10 May 2026 06:51:46 -0600 Subject: [PATCH] feat: live context window status tracking during streaming --- api/streaming.py | 153 +++++++++++++++++++++++++++++++++++++++++++++ static/messages.js | 12 ++++ static/sessions.js | 11 ++++ 3 files changed, 176 insertions(+) diff --git a/api/streaming.py b/api/streaming.py index 5fcd1ae9..0b93fd9c 100644 --- a/api/streaming.py +++ b/api/streaming.py @@ -1937,6 +1937,97 @@ def _run_agent_streaming( STREAM_REASONING_TEXT[stream_id] = '' # start accumulating reasoning trace (#1361 §A) STREAM_LIVE_TOOL_CALLS[stream_id] = [] # start accumulating tool calls (#1361 §B) + agent = None + _live_prompt_estimate_tokens = [0] + _live_prompt_estimate_seen_ids = set() + + def _seed_live_prompt_estimate() -> int: + """Capture the latest exact prompt size before adding live tool deltas.""" + if _live_prompt_estimate_tokens[0] > 0: + return _live_prompt_estimate_tokens[0] + _base = 0 + _agent = agent + if _agent is not None: + try: + _cc = getattr(_agent, 'context_compressor', None) + if _cc: + _base = getattr(_cc, 'last_prompt_tokens', 0) or 0 + except Exception: + _base = 0 + if not _base: + try: + _session_obj = get_session(session_id) + _base = getattr(_session_obj, 'last_prompt_tokens', 0) or 0 + except Exception: + _base = 0 + _live_prompt_estimate_tokens[0] = int(_base or 0) + return _live_prompt_estimate_tokens[0] + + def _bump_live_prompt_estimate(messages) -> int: + """Increment a rough next-prompt estimate from live tool activity.""" + if not messages: + return _live_prompt_estimate_tokens[0] + try: + from agent.model_metadata import estimate_messages_tokens_rough + _delta = int(estimate_messages_tokens_rough(messages) or 0) + except Exception: + _delta = 0 + if _delta > 0: + _seed_live_prompt_estimate() + _live_prompt_estimate_tokens[0] += _delta + return _live_prompt_estimate_tokens[0] + + def _live_usage_snapshot(): + """Best-effort live usage payload for mid-stream UI updates. + + During tool execution the final `done` event has not fired yet, but the + frontend still benefits from seeing the latest known token / context + values. These are exact for the most recent model call and a truthful + lower bound for the pending next call after a tool result is appended. + """ + _usage = { + 'input_tokens': 0, + 'output_tokens': 0, + 'estimated_cost': 0, + 'context_length': 0, + 'threshold_tokens': 0, + 'last_prompt_tokens': 0, + } + try: + _session_obj = get_session(session_id) + except Exception: + _session_obj = None + + _agent = agent + if _agent is not None: + try: + _usage['input_tokens'] = getattr(_agent, 'session_prompt_tokens', 0) or 0 + _usage['output_tokens'] = getattr(_agent, 'session_completion_tokens', 0) or 0 + _usage['estimated_cost'] = getattr(_agent, 'session_estimated_cost_usd', 0) or 0 + except Exception: + pass + try: + _cc = getattr(_agent, 'context_compressor', None) + if _cc: + _usage['context_length'] = getattr(_cc, 'context_length', 0) or 0 + _usage['threshold_tokens'] = getattr(_cc, 'threshold_tokens', 0) or 0 + _usage['last_prompt_tokens'] = getattr(_cc, 'last_prompt_tokens', 0) or 0 + except Exception: + pass + + if _session_obj is not None: + for _field in ('input_tokens', 'output_tokens', 'estimated_cost', 'context_length', 'threshold_tokens', 'last_prompt_tokens'): + if not _usage.get(_field): + try: + _usage[_field] = getattr(_session_obj, _field, 0) or 0 + except Exception: + pass + + if _live_prompt_estimate_tokens[0] > (_usage.get('last_prompt_tokens') or 0): + _usage['last_prompt_tokens'] = _live_prompt_estimate_tokens[0] + + return _usage + # Register this stream with the global streaming meter meter().begin_session(stream_id) @@ -1954,6 +2045,7 @@ def _run_agent_streaming( break # stream was cancelled or ended — exit stats = meter().get_stats() stats['session_id'] = stream_id + stats['usage'] = _live_usage_snapshot() put('metering', stats) _metering_thread = threading.Thread(target=_metering_ticker, daemon=True) @@ -2200,6 +2292,35 @@ def _run_agent_streaming( # block is reordered later (Issue #765). _checkpoint_activity = [0] + def _record_live_tool_start(tool_call_id, name, args): + if not tool_call_id or tool_call_id in _live_prompt_estimate_seen_ids: + return + _live_prompt_estimate_seen_ids.add(tool_call_id) + _tool_call = { + 'id': tool_call_id, + 'type': 'function', + 'function': { + 'name': str(name or ''), + 'arguments': json.dumps(args if isinstance(args, dict) else {}, ensure_ascii=False, sort_keys=True), + }, + } + _bump_live_prompt_estimate([{ + 'role': 'assistant', + 'content': '', + 'tool_calls': [_tool_call], + }]) + + def _record_live_tool_complete(tool_call_id, name, function_result): + if not tool_call_id: + return + _result_text = _tool_result_snippet(function_result) + _bump_live_prompt_estimate([{ + 'role': 'tool', + 'name': str(name or ''), + 'tool_call_id': tool_call_id, + 'content': _result_text, + }]) + def on_tool(*cb_args, **cb_kwargs): nonlocal _reasoning_text event_type = None @@ -2255,6 +2376,10 @@ def _run_agent_streaming( 'preview': preview, 'args': args_snap, }) + _tool_stats = meter().get_stats() + _tool_stats['session_id'] = stream_id + _tool_stats['usage'] = _live_usage_snapshot() + put('metering', _tool_stats) # Fallback: poll for pending approval in case notify_cb wasn't # registered (e.g. older approval module without gateway support). try: @@ -2298,8 +2423,32 @@ def _run_agent_streaming( 'duration': cb_kwargs.get('duration'), 'is_error': bool(cb_kwargs.get('is_error', False)), }) + _tool_stats = meter().get_stats() + _tool_stats['session_id'] = stream_id + _tool_stats['usage'] = _live_usage_snapshot() + put('metering', _tool_stats) return + def on_tool_start(tool_call_id, name, args): + try: + _record_live_tool_start(tool_call_id, name, args) + _tool_stats = meter().get_stats() + _tool_stats['session_id'] = stream_id + _tool_stats['usage'] = _live_usage_snapshot() + put('metering', _tool_stats) + except Exception: + logger.debug('Failed to update live prompt estimate on tool start', exc_info=True) + + def on_tool_complete(tool_call_id, name, args, function_result): + try: + _record_live_tool_complete(tool_call_id, name, function_result) + _tool_stats = meter().get_stats() + _tool_stats['session_id'] = stream_id + _tool_stats['usage'] = _live_usage_snapshot() + put('metering', _tool_stats) + except Exception: + logger.debug('Failed to update live prompt estimate on tool completion', exc_info=True) + _AIAgent = _get_ai_agent() if _AIAgent is None: raise ImportError(_aiagent_import_error_detail()) @@ -2481,6 +2630,10 @@ def _run_agent_streaming( _agent_kwargs['reasoning_config'] = _reasoning_config if 'interim_assistant_callback' in _agent_params: _agent_kwargs['interim_assistant_callback'] = on_interim_assistant + if 'tool_start_callback' in _agent_params: + _agent_kwargs['tool_start_callback'] = on_tool_start + if 'tool_complete_callback' in _agent_params: + _agent_kwargs['tool_complete_callback'] = on_tool_complete if 'status_callback' in _agent_params: _agent_kwargs['status_callback'] = _agent_status_callback if 'max_iterations' in _agent_params and _max_iterations_cfg is not None: diff --git a/static/messages.js b/static/messages.js index 75758f7c..fd5cd036 100644 --- a/static/messages.js +++ b/static/messages.js @@ -1159,6 +1159,18 @@ function attachLiveStream(activeSid, streamId, uploaded=[], options={}){ try{ const d=JSON.parse(e.data||'{}'); if((d.session_id||activeSid)!==activeSid) return; + if(d.usage&&typeof _syncCtxIndicator==='function'){ + S.lastUsage={...(S.lastUsage||{}),...d.usage}; + if(S.session&&S.session.session_id===activeSid){ + S.session.input_tokens=d.usage.input_tokens??S.session.input_tokens; + S.session.output_tokens=d.usage.output_tokens??S.session.output_tokens; + S.session.estimated_cost=d.usage.estimated_cost??S.session.estimated_cost; + S.session.context_length=d.usage.context_length??S.session.context_length; + S.session.threshold_tokens=d.usage.threshold_tokens??S.session.threshold_tokens; + S.session.last_prompt_tokens=d.usage.last_prompt_tokens??S.session.last_prompt_tokens; + } + _syncCtxIndicator(S.lastUsage); + } if(d.estimated===true||d.tps_available!==true||typeof d.tps!=='number'||d.tps<=0){ if(typeof _setLiveAssistantTps==='function') _setLiveAssistantTps(null); return; diff --git a/static/sessions.js b/static/sessions.js index b243e3e8..525c646e 100644 --- a/static/sessions.js +++ b/static/sessions.js @@ -392,6 +392,17 @@ async function newSession(flash){ updateSendBtn(); setStatus(''); setComposerStatus(''); + if(typeof _setLiveAssistantTps==='function') _setLiveAssistantTps(null); + if(typeof _syncCtxIndicator==='function'){ + _syncCtxIndicator({ + input_tokens:data.session.input_tokens||0, + output_tokens:data.session.output_tokens||0, + estimated_cost:data.session.estimated_cost||0, + context_length:data.session.context_length||0, + last_prompt_tokens:data.session.last_prompt_tokens||0, + threshold_tokens:data.session.threshold_tokens||0, + }); + } updateQueueBadge(S.session.session_id); syncTopbar();renderMessages();loadDir('.'); // don't call renderSessionList here - callers do it when needed