Squash merge PR #1294: Normalize agent session source metadata

Adds shared source normalization contract for imported Hermes Agent sessions.
Exposes raw_source, session_source, and source_label through both /api/sessions
and gateway watcher snapshots. Existing source_tag / is_cli_session compatibility
fields unchanged — sidebar display behavior is preserved.

Refs #1013.

Co-authored-by: Frank Song <franksong2702@users.noreply.github.com>
This commit is contained in:
Hermes Agent
2026-04-30 02:39:12 +00:00
parent 607f9c58a9
commit a8c7c54f61
4 changed files with 123 additions and 1 deletions
+61
View File
@@ -6,6 +6,66 @@ from pathlib import Path
logger = logging.getLogger(__name__)
MESSAGING_SOURCES = {
'discord',
'slack',
'telegram',
'weixin',
}
SOURCE_LABELS = {
'api_server': 'API',
'cli': 'CLI',
'cron': 'Cron',
'discord': 'Discord',
'slack': 'Slack',
'telegram': 'Telegram',
'tool': 'Tool',
'webui': 'WebUI',
'weixin': 'Weixin',
}
def normalize_agent_session_source(raw_source: str | None) -> dict:
"""Return stable source metadata for Hermes Agent session rows.
``sessions.source`` is an Agent-level raw value. WebUI needs a smaller,
durable contract so routes, SSE snapshots, and future sidebar policies do
not each reimplement raw-source checks.
"""
raw = str(raw_source or '').strip().lower() or 'unknown'
if raw == 'webui':
session_source = 'webui'
elif raw == 'cli':
session_source = 'cli'
elif raw in MESSAGING_SOURCES:
session_source = 'messaging'
elif raw == 'cron':
session_source = 'cron'
elif raw == 'tool':
session_source = 'tool'
elif raw == 'api_server':
session_source = 'api'
else:
session_source = 'other'
label = SOURCE_LABELS.get(raw)
if not label:
label = raw.replace('_', ' ').title() if raw != 'unknown' else 'Agent'
return {
'raw_source': None if raw == 'unknown' else raw,
'session_source': session_source,
'source_label': label,
}
def _with_normalized_source(row: dict) -> dict:
normalized = normalize_agent_session_source(row.get('source'))
return {**row, **normalized}
def _optional_col(name: str, columns: set[str], fallback: str = "NULL") -> str:
return f"s.{name}" if name in columns else f"{fallback} AS {name}"
@@ -191,6 +251,7 @@ def read_importable_agent_session_rows(
params,
)
projected = _project_agent_session_rows([dict(row) for row in cur.fetchall()])
projected = [_with_normalized_source(row) for row in projected]
if limit is None:
return projected
return projected[:max(0, int(limit))]
+3
View File
@@ -65,6 +65,9 @@ def _get_agent_sessions_from_db() -> list:
'created_at': row['started_at'],
'updated_at': row['last_activity'] or row['started_at'],
'source': row['source'] or 'cli',
'raw_source': row.get('raw_source'),
'session_source': row.get('session_source'),
'source_label': row.get('source_label'),
})
return sessions
except Exception:
+3
View File
@@ -942,6 +942,9 @@ def get_cli_sessions() -> list:
'project_id': None,
'profile': profile,
'source_tag': _source,
'raw_source': row.get('raw_source'),
'session_source': row.get('session_source'),
'source_label': row.get('source_label'),
'is_cli_session': True,
})
except Exception as _cli_err:
+56 -1
View File
@@ -645,7 +645,7 @@ def test_gateway_sessions_excluded_when_disabled():
def test_gateway_session_has_correct_metadata():
"""Gateway sessions include source_tag and is_cli_session fields."""
"""Gateway sessions include legacy source fields and normalized source metadata."""
conn = _ensure_state_db()
try:
_insert_gateway_session(conn, session_id='gw_meta_001', source='telegram', title='Meta Test')
@@ -658,6 +658,9 @@ def test_gateway_session_has_correct_metadata():
gw = next((s for s in sessions if s['session_id'] == 'gw_meta_001'), None)
assert gw is not None, "Gateway session not found"
assert gw.get('source_tag') == 'telegram', f"Expected source_tag=telegram, got {gw.get('source_tag')}"
assert gw.get('raw_source') == 'telegram'
assert gw.get('session_source') == 'messaging'
assert gw.get('source_label') == 'Telegram'
assert gw.get('is_cli_session') is True, "is_cli_session should be True for agent sessions"
assert gw.get('title') == 'Meta Test'
finally:
@@ -669,6 +672,58 @@ def test_gateway_session_has_correct_metadata():
post('/api/settings', {'show_cli_sessions': False})
def test_agent_session_source_normalization_contract():
"""Raw Hermes Agent sources map to stable WebUI source categories."""
from api.agent_sessions import normalize_agent_session_source
cases = {
'cli': ('cli', 'CLI'),
'weixin': ('messaging', 'Weixin'),
'telegram': ('messaging', 'Telegram'),
'discord': ('messaging', 'Discord'),
'slack': ('messaging', 'Slack'),
'cron': ('cron', 'Cron'),
'tool': ('tool', 'Tool'),
'api_server': ('api', 'API'),
'something_new': ('other', 'Something New'),
None: ('other', 'Agent'),
}
for raw_source, (session_source, source_label) in cases.items():
normalized = normalize_agent_session_source(raw_source)
assert normalized['session_source'] == session_source
assert normalized['source_label'] == source_label
if raw_source:
assert normalized['raw_source'] == raw_source
else:
assert normalized['raw_source'] is None
def test_gateway_watcher_uses_normalized_source_metadata(monkeypatch):
"""SSE snapshots use the same normalized source contract as /api/sessions."""
conn = _ensure_state_db()
try:
_insert_gateway_session(conn, session_id='gw_watcher_source_001', source='weixin', title='Weixin Chat')
import api.gateway_watcher as gateway_watcher
monkeypatch.setattr(gateway_watcher, '_get_state_db_path', _get_state_db_path)
sessions = gateway_watcher._get_agent_sessions_from_db()
gw = next((s for s in sessions if s['session_id'] == 'gw_watcher_source_001'), None)
assert gw is not None
assert gw.get('source') == 'weixin'
assert gw.get('raw_source') == 'weixin'
assert gw.get('session_source') == 'messaging'
assert gw.get('source_label') == 'Weixin'
finally:
try:
_remove_test_sessions(conn, 'gw_watcher_source_001')
conn.close()
except Exception:
pass
def test_imported_cron_sessions_hidden_from_sidebar_by_default(cleanup_test_sessions):
"""Cron sessions already imported into the WebUI store should stay hidden from the sidebar."""
from api.models import Session