mirror of
https://github.com/nesquena/hermes-webui.git
synced 2026-05-25 11:10:18 +00:00
Stage 323: PR #1895 — MCP Option A rewrite — canonical api.models/api.profiles imports by @samuelgudi
This commit is contained in:
@@ -170,6 +170,33 @@ def _is_root_profile(name: str) -> bool:
|
||||
return name in _root_profile_name_cache
|
||||
|
||||
|
||||
def _profiles_match(row_profile, active_profile) -> bool:
|
||||
"""Return True if a session/project row's profile matches the active profile.
|
||||
|
||||
Treats both the literal alias 'default' and any renamed-root display name
|
||||
(per _is_root_profile) as equivalent, so legacy rows tagged 'default'
|
||||
still surface when the user has renamed the root profile to e.g. 'kinni',
|
||||
and vice versa.
|
||||
|
||||
A row with no profile (`None` or empty string) is treated as belonging to
|
||||
the root profile — that's the convention used by the legacy backfill at
|
||||
api/models.py::all_sessions, and matches the default seen in
|
||||
`static/sessions.js` (`S.activeProfile||'default'`).
|
||||
|
||||
Originally lived in api/routes.py; relocated here so both routes.py and
|
||||
out-of-process consumers (mcp_server.py) can import the canonical helper
|
||||
instead of duplicating the body. See #1614 for the visibility model.
|
||||
"""
|
||||
row = row_profile or 'default'
|
||||
active = active_profile or 'default'
|
||||
if row == active:
|
||||
return True
|
||||
# Cross-alias the renamed root.
|
||||
if _is_root_profile(row) and _is_root_profile(active):
|
||||
return True
|
||||
return False
|
||||
|
||||
|
||||
def get_active_profile_name() -> str:
|
||||
"""Return the currently active profile name.
|
||||
|
||||
|
||||
+5
-23
@@ -72,29 +72,11 @@ _STALE_MESSAGING_END_REASONS = {"session_reset", "session_switch"}
|
||||
# when the active profile is `'default'`. _is_root_profile() is the
|
||||
# canonical check.
|
||||
|
||||
def _profiles_match(row_profile, active_profile) -> bool:
|
||||
"""Return True if a session/project row's profile matches the active profile.
|
||||
|
||||
Treats both the literal alias 'default' and any renamed-root display name
|
||||
(per _is_root_profile) as equivalent, so legacy rows tagged 'default'
|
||||
still surface when the user has renamed the root profile to e.g. 'kinni',
|
||||
and vice versa.
|
||||
|
||||
A row with no profile (`None` or empty string) is treated as belonging to
|
||||
the root profile — that's the convention used by the legacy backfill at
|
||||
api/models.py::all_sessions, and matches the default seen in
|
||||
`static/sessions.js` (`S.activeProfile||'default'`).
|
||||
"""
|
||||
from api.profiles import _is_root_profile
|
||||
|
||||
row = row_profile or 'default'
|
||||
active = active_profile or 'default'
|
||||
if row == active:
|
||||
return True
|
||||
# Cross-alias the renamed root.
|
||||
if _is_root_profile(row) and _is_root_profile(active):
|
||||
return True
|
||||
return False
|
||||
# Canonical helper now lives in api.profiles so out-of-process consumers
|
||||
# (mcp_server.py) can import it without duplicating the visibility model.
|
||||
# Re-exported here so existing `_profiles_match(...)` call sites in this
|
||||
# module keep resolving without per-call-site refactors.
|
||||
from api.profiles import _profiles_match # noqa: F401, E402 (re-export)
|
||||
|
||||
|
||||
def _all_profiles_query_flag(parsed_url) -> bool:
|
||||
|
||||
+567
@@ -0,0 +1,567 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Hermes WebUI MCP Server — exposes project and session management
|
||||
as MCP tools for any MCP-compatible agent.
|
||||
|
||||
Option A rewrite (2026-05-08): imports api.models and api.profiles
|
||||
directly from the webui codebase, using canonical helpers for
|
||||
locking, profile scoping, index consistency, and validation.
|
||||
|
||||
pip install mcp # one-time setup
|
||||
python3 mcp_server.py # start via stdio
|
||||
|
||||
MCP config for Hermes Agent (add to config.yaml):
|
||||
mcp_servers:
|
||||
hermes-webui:
|
||||
command: /path/to/venv/bin/python3
|
||||
args: [/path/to/hermes-webui/mcp_server.py]
|
||||
env:
|
||||
HERMES_WEBUI_PASSWORD: your_password
|
||||
|
||||
Profile override (optional):
|
||||
args: [/path/to/hermes-webui/mcp_server.py, --profile, myprofile]
|
||||
|
||||
AI-authoring disclosure: this file was rewritten by MILO (Hermes Agent)
|
||||
under human direction, per maintainer guidelines for #1616.
|
||||
"""
|
||||
|
||||
import argparse
|
||||
import json
|
||||
import os
|
||||
import re
|
||||
import sys
|
||||
import time
|
||||
import uuid
|
||||
from pathlib import Path
|
||||
|
||||
from mcp.server import Server
|
||||
from mcp.server.stdio import stdio_server
|
||||
from mcp.types import Tool, TextContent
|
||||
|
||||
# ── Ensure the repo root is on sys.path so api.* imports work ─────────────
|
||||
_REPO_ROOT = Path(__file__).parent.resolve()
|
||||
if str(_REPO_ROOT) not in sys.path:
|
||||
sys.path.insert(0, str(_REPO_ROOT))
|
||||
|
||||
# ── CLI: optional --profile override ──────────────────────────────────────
|
||||
_profile_arg: str | None = None
|
||||
_parser = argparse.ArgumentParser(add_help=False)
|
||||
_parser.add_argument("--profile", type=str, default=None)
|
||||
_args, _unknown = _parser.parse_known_args()
|
||||
_profile_arg = _args.profile
|
||||
|
||||
# ── Import webui canonical modules (after path setup) ─────────────────────
|
||||
import api.config as _cfg
|
||||
from api.config import (
|
||||
STATE_DIR, SESSION_DIR, SESSION_INDEX_FILE, PROJECTS_FILE, HOME,
|
||||
)
|
||||
from api.models import load_projects, save_projects
|
||||
from api.profiles import get_active_profile_name, _is_root_profile, _profiles_match
|
||||
|
||||
# ── Apply --profile override before any module uses get_active_profile_name
|
||||
if _profile_arg is not None:
|
||||
import api.profiles as _profiles
|
||||
_profiles._active_profile = _profile_arg
|
||||
|
||||
# ── API auth state ─────────────────────────────────────────────────────────
|
||||
# Mirror the env-var contract used by api/config.py:32-33 so a non-default
|
||||
# WebUI port/host (e.g. when 8787 is held by another service on the host)
|
||||
# Just Works without configuration drift between the WebUI process and MCP.
|
||||
WEBUI_HOST = os.environ.get("HERMES_WEBUI_HOST", "127.0.0.1")
|
||||
WEBUI_PORT = os.environ.get("HERMES_WEBUI_PORT", "8787")
|
||||
WEBUI_URL = f"http://{WEBUI_HOST}:{WEBUI_PORT}"
|
||||
_auth_cookie: str | None = None
|
||||
_auth_expires: float = 0 # unix timestamp after which we re-auth
|
||||
|
||||
server = Server("hermes-webui")
|
||||
|
||||
|
||||
# ═══════════════════════════════════════════════════════════════════════════
|
||||
# Helpers — filesystem (project CRUD via canonical api.models)
|
||||
# ═══════════════════════════════════════════════════════════════════════════
|
||||
|
||||
def _active_profile() -> str:
|
||||
"""Shorthand for the current profile name (--profile or auto-detected)."""
|
||||
return get_active_profile_name() or 'default'
|
||||
|
||||
|
||||
def _validate_color(color: str | None) -> str | None:
|
||||
"""Return an error string if color is invalid, else None."""
|
||||
if color is not None and not re.match(r"^#[0-9a-fA-F]{3,8}$", color):
|
||||
return "Invalid color format (use #RGB, #RRGGBB, or #RRGGBBAA)"
|
||||
return None
|
||||
|
||||
|
||||
def _load_index() -> list:
|
||||
"""Read the session index. Falls back to empty list on failure."""
|
||||
if not SESSION_INDEX_FILE.exists():
|
||||
return []
|
||||
try:
|
||||
return json.loads(SESSION_INDEX_FILE.read_text(encoding="utf-8"))
|
||||
except Exception:
|
||||
return []
|
||||
|
||||
|
||||
def _session_compact(row: dict) -> dict:
|
||||
"""Lightweight compact representation of a session index entry."""
|
||||
return {
|
||||
"session_id": row.get("session_id"),
|
||||
"title": row.get("title"),
|
||||
"project_id": row.get("project_id"),
|
||||
"workspace": row.get("workspace"),
|
||||
"model": row.get("model"),
|
||||
"message_count": row.get("message_count", 0),
|
||||
"source_tag": row.get("source_tag"),
|
||||
"is_cli_session": row.get("is_cli_session", False),
|
||||
"profile": row.get("profile"),
|
||||
}
|
||||
|
||||
|
||||
# ═══════════════════════════════════════════════════════════════════════════
|
||||
# Helpers — HTTP API (for mutations that need cache sync)
|
||||
# ═══════════════════════════════════════════════════════════════════════════
|
||||
|
||||
def _api_password() -> str | None:
|
||||
"""Return the plaintext webui password from HERMES_WEBUI_PASSWORD, or None.
|
||||
|
||||
settings.json stores only the bcrypt hash, which the login endpoint cannot
|
||||
accept — it calls verify_password(plaintext) against the stored hash. So
|
||||
there's no usable fallback when the env var is unset; the MCP simply runs
|
||||
in unauthenticated mode and any auth-protected mutation will fail clearly
|
||||
with the server's 401 instead of silently sending an unusable hash.
|
||||
"""
|
||||
pw = os.environ.get("HERMES_WEBUI_PASSWORD", "").strip()
|
||||
return pw or None
|
||||
|
||||
|
||||
def _api_auth() -> str | None:
|
||||
"""Authenticate and return cookie value, or None if auth disabled/fails."""
|
||||
global _auth_cookie, _auth_expires
|
||||
|
||||
pw = _api_password()
|
||||
if not pw:
|
||||
return None # auth not enabled — API calls will fail anyway
|
||||
|
||||
# Reuse cookie if still valid (25 days — server issues 30-day cookies)
|
||||
if _auth_cookie and time.time() < _auth_expires:
|
||||
return _auth_cookie
|
||||
|
||||
import urllib.request
|
||||
|
||||
try:
|
||||
req = urllib.request.Request(
|
||||
f"{WEBUI_URL}/api/auth/login",
|
||||
data=json.dumps({"password": pw}).encode(),
|
||||
headers={"Content-Type": "application/json"},
|
||||
method="POST",
|
||||
)
|
||||
resp = urllib.request.urlopen(req, timeout=5)
|
||||
cookie = resp.headers.get("Set-Cookie", "")
|
||||
if cookie:
|
||||
_auth_cookie = cookie.split(";")[0] # "hermes_session=VALUE; ..."
|
||||
_auth_expires = time.time() + 25 * 86400 # 25 days
|
||||
return _auth_cookie
|
||||
except Exception:
|
||||
_auth_cookie = None
|
||||
return None
|
||||
|
||||
|
||||
def _api_post(endpoint: str, body: dict) -> dict:
|
||||
"""POST to webui API with auth cookie. Returns parsed JSON response."""
|
||||
import urllib.request
|
||||
import urllib.error
|
||||
|
||||
cookie = _api_auth()
|
||||
headers = {"Content-Type": "application/json"}
|
||||
if cookie:
|
||||
headers["Cookie"] = cookie
|
||||
|
||||
try:
|
||||
req = urllib.request.Request(
|
||||
f"{WEBUI_URL}{endpoint}",
|
||||
data=json.dumps(body).encode(),
|
||||
headers=headers,
|
||||
method="POST",
|
||||
)
|
||||
resp = urllib.request.urlopen(req, timeout=5)
|
||||
return json.loads(resp.read())
|
||||
except urllib.error.HTTPError as e:
|
||||
err_body = json.loads(e.read())
|
||||
return {"error": f"API {e.code}: {err_body.get('error', 'unknown')}"}
|
||||
except Exception as e:
|
||||
return {"error": f"API unreachable: {e}"}
|
||||
|
||||
|
||||
# ═══════════════════════════════════════════════════════════════════════════
|
||||
# Tool handlers — read-only (filesystem, profile-aware)
|
||||
# ═══════════════════════════════════════════════════════════════════════════
|
||||
|
||||
async def handle_list_projects(_arguments: dict) -> list[TextContent]:
|
||||
"""List all projects with session counts, scoped to active profile."""
|
||||
projects = load_projects()
|
||||
active = _active_profile()
|
||||
index = _load_index()
|
||||
|
||||
# Session counts per project (from index)
|
||||
counts: dict[str, int] = {}
|
||||
for s in index:
|
||||
pid = s.get("project_id")
|
||||
if pid:
|
||||
counts[pid] = counts.get(pid, 0) + 1
|
||||
|
||||
result = []
|
||||
for p in projects:
|
||||
# Profile filter: legacy untagged rows are treated as 'default' by
|
||||
# _profiles_match, so non-root profiles correctly hide them.
|
||||
if not _profiles_match(p.get("profile"), active):
|
||||
continue
|
||||
entry = dict(p)
|
||||
entry["session_count"] = counts.get(p["project_id"], 0)
|
||||
result.append(entry)
|
||||
|
||||
return [TextContent(type="text", text=json.dumps(result, ensure_ascii=False, indent=2))]
|
||||
|
||||
|
||||
async def handle_list_sessions(arguments: dict) -> list[TextContent]:
|
||||
"""List sessions, optionally filtered by project or unassigned status."""
|
||||
project_id = arguments.get("project_id")
|
||||
unassigned = arguments.get("unassigned", False)
|
||||
limit = max(1, min(500, arguments.get("limit", 50)))
|
||||
active = _active_profile()
|
||||
|
||||
index = _load_index()
|
||||
sessions = [_session_compact(s) for s in index if s.get("session_id")]
|
||||
|
||||
# Filter by profile: legacy untagged rows are treated as 'default' by
|
||||
# _profiles_match (canonical convention), so non-root profiles hide them.
|
||||
sessions = [s for s in sessions if _profiles_match(s.get("profile"), active)]
|
||||
|
||||
if unassigned:
|
||||
sessions = [s for s in sessions if not s["project_id"]]
|
||||
elif project_id:
|
||||
sessions = [s for s in sessions if s["project_id"] == project_id]
|
||||
|
||||
sessions = sessions[:limit]
|
||||
return [TextContent(type="text", text=json.dumps(sessions, ensure_ascii=False, indent=2))]
|
||||
|
||||
|
||||
# ═══════════════════════════════════════════════════════════════════════════
|
||||
# Tool handlers — project CRUD (canonical helpers, profile-scoped)
|
||||
# ═══════════════════════════════════════════════════════════════════════════
|
||||
|
||||
async def handle_create_project(arguments: dict) -> list[TextContent]:
|
||||
"""Create a new project (profile-scoped, exact-match title collision)."""
|
||||
name = arguments.get("name", "").strip()[:128]
|
||||
if not name:
|
||||
return [TextContent(type="text", text=json.dumps(
|
||||
{"error": "name is required"}, ensure_ascii=False))]
|
||||
|
||||
color = arguments.get("color")
|
||||
color_err = _validate_color(color)
|
||||
if color_err:
|
||||
return [TextContent(type="text", text=json.dumps(
|
||||
{"error": color_err}, ensure_ascii=False))]
|
||||
|
||||
active = _active_profile()
|
||||
projects = load_projects()
|
||||
|
||||
# Title collision: exact match (consistent with ensure_cron_project)
|
||||
if any(p.get("name") == name and _profiles_match(p.get("profile"), active)
|
||||
for p in projects):
|
||||
return [TextContent(type="text", text=json.dumps(
|
||||
{"error": f"Project '{name}' already exists"}, ensure_ascii=False))]
|
||||
|
||||
proj = {
|
||||
"project_id": uuid.uuid4().hex[:12],
|
||||
"name": name,
|
||||
"color": color,
|
||||
"profile": active,
|
||||
"created_at": time.time(),
|
||||
}
|
||||
projects.append(proj)
|
||||
save_projects(projects)
|
||||
|
||||
proj["session_count"] = 0
|
||||
return [TextContent(type="text", text=json.dumps(proj, ensure_ascii=False, indent=2))]
|
||||
|
||||
|
||||
async def handle_rename_project(arguments: dict) -> list[TextContent]:
|
||||
"""Rename a project and optionally change its color (profile-checked)."""
|
||||
project_id = arguments.get("project_id")
|
||||
name = arguments.get("name", "").strip()[:128]
|
||||
if not project_id or not name:
|
||||
return [TextContent(type="text", text=json.dumps(
|
||||
{"error": "project_id and name are required"}, ensure_ascii=False))]
|
||||
|
||||
color = arguments.get("color")
|
||||
color_err = _validate_color(color)
|
||||
if color_err:
|
||||
return [TextContent(type="text", text=json.dumps(
|
||||
{"error": color_err}, ensure_ascii=False))]
|
||||
|
||||
active = _active_profile()
|
||||
projects = load_projects()
|
||||
proj = next((p for p in projects if p["project_id"] == project_id), None)
|
||||
if not proj:
|
||||
return [TextContent(type="text", text=json.dumps(
|
||||
{"error": "Project not found"}, ensure_ascii=False))]
|
||||
|
||||
# #1614: profile ownership check
|
||||
if not _profiles_match(proj.get("profile"), active):
|
||||
return [TextContent(type="text", text=json.dumps(
|
||||
{"error": "Project not found"}, ensure_ascii=False))]
|
||||
|
||||
proj["name"] = name
|
||||
if color is not None:
|
||||
proj["color"] = color
|
||||
save_projects(projects)
|
||||
return [TextContent(type="text", text=json.dumps(proj, ensure_ascii=False, indent=2))]
|
||||
|
||||
|
||||
async def handle_delete_project(arguments: dict) -> list[TextContent]:
|
||||
"""Delete a project and unassign all its sessions (profile-checked)."""
|
||||
project_id = arguments.get("project_id")
|
||||
if not project_id:
|
||||
return [TextContent(type="text", text=json.dumps(
|
||||
{"error": "project_id is required"}, ensure_ascii=False))]
|
||||
|
||||
active = _active_profile()
|
||||
projects = load_projects()
|
||||
proj = next((p for p in projects if p["project_id"] == project_id), None)
|
||||
if not proj:
|
||||
return [TextContent(type="text", text=json.dumps(
|
||||
{"error": "Project not found"}, ensure_ascii=False))]
|
||||
|
||||
# #1614: profile ownership check
|
||||
if not _profiles_match(proj.get("profile"), active):
|
||||
return [TextContent(type="text", text=json.dumps(
|
||||
{"error": "Project not found"}, ensure_ascii=False))]
|
||||
|
||||
projects = [p for p in projects if p["project_id"] != project_id]
|
||||
save_projects(projects)
|
||||
|
||||
# Unassign sessions only when we can do it cache-safely via the HTTP API.
|
||||
# The previous filesystem fallback wrote session_data directly with
|
||||
# os.replace(), which bypassed _write_session_index() in api/models.py
|
||||
# and left _index.json holding the stale project_id — a running WebUI
|
||||
# would still group those sessions under the deleted project until a
|
||||
# subsequent re-compact. Even calling Session.save() in-process would
|
||||
# not help because the WebUI's SESSIONS dict cache (a separate process)
|
||||
# still has the old project_id and overwrites our update on its next
|
||||
# save. The HTTP API is the only cache-safe path; without auth we
|
||||
# refuse and surface the limitation so the operator can act.
|
||||
has_auth = bool(_api_password())
|
||||
if not has_auth:
|
||||
return [TextContent(type="text", text=json.dumps({
|
||||
"ok": True,
|
||||
"deleted": proj["name"],
|
||||
"unassigned_sessions": 0,
|
||||
"warning": "Set HERMES_WEBUI_PASSWORD to unassign sessions; "
|
||||
"without auth the session index cannot be safely "
|
||||
"updated and direct filesystem writes would cause "
|
||||
"index drift in a running WebUI.",
|
||||
}, ensure_ascii=False))]
|
||||
|
||||
unassigned = 0
|
||||
if SESSION_DIR.exists():
|
||||
for p in SESSION_DIR.glob("*.json"):
|
||||
if p.name.startswith("_"):
|
||||
continue
|
||||
try:
|
||||
session_data = json.loads(p.read_text(encoding="utf-8"))
|
||||
if session_data.get("project_id") == project_id:
|
||||
sid = p.stem
|
||||
result = _api_post("/api/session/move",
|
||||
{"session_id": sid, "project_id": None})
|
||||
if "ok" in result or "session" in result:
|
||||
unassigned += 1
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
return [TextContent(type="text", text=json.dumps({
|
||||
"ok": True,
|
||||
"deleted": proj["name"],
|
||||
"unassigned_sessions": unassigned,
|
||||
}, ensure_ascii=False))]
|
||||
|
||||
|
||||
# ═══════════════════════════════════════════════════════════════════════════
|
||||
# Tool handlers — mutations (HTTP API with auth, cache-safe)
|
||||
# ═══════════════════════════════════════════════════════════════════════════
|
||||
|
||||
async def handle_rename_session(arguments: dict) -> list[TextContent]:
|
||||
"""Rename a session via the authenticated webui API (cache-safe)."""
|
||||
session_id = arguments.get("session_id")
|
||||
title = arguments.get("title", "").strip()[:80]
|
||||
if not session_id or not title:
|
||||
return [TextContent(type="text", text=json.dumps(
|
||||
{"error": "session_id and title are required"}, ensure_ascii=False))]
|
||||
|
||||
result = _api_post("/api/session/rename",
|
||||
{"session_id": session_id, "title": title})
|
||||
if "error" in result:
|
||||
return [TextContent(type="text", text=json.dumps(result, ensure_ascii=False))]
|
||||
|
||||
session = result.get("session", {})
|
||||
return [TextContent(type="text", text=json.dumps({
|
||||
"ok": True,
|
||||
"session_id": session_id,
|
||||
"title": session.get("title", title),
|
||||
"method": "api",
|
||||
}, ensure_ascii=False, indent=2))]
|
||||
|
||||
|
||||
async def handle_move_session(arguments: dict) -> list[TextContent]:
|
||||
"""Assign a session to a project via the authenticated webui API (cache-safe)."""
|
||||
session_id = arguments.get("session_id")
|
||||
project_id = arguments.get("project_id") # None/null = unassign
|
||||
if not session_id:
|
||||
return [TextContent(type="text", text=json.dumps(
|
||||
{"error": "session_id is required"}, ensure_ascii=False))]
|
||||
|
||||
# If project_id is provided, verify it exists and is profile-accessible
|
||||
if project_id is not None:
|
||||
projects = load_projects()
|
||||
active = _active_profile()
|
||||
target = next((p for p in projects if p["project_id"] == project_id), None)
|
||||
if not target:
|
||||
return [TextContent(type="text", text=json.dumps(
|
||||
{"error": "Project not found"}, ensure_ascii=False))]
|
||||
# #1614: refuse moves into projects owned by another profile
|
||||
if not _profiles_match(target.get("profile"), active):
|
||||
return [TextContent(type="text", text=json.dumps(
|
||||
{"error": "Project not found"}, ensure_ascii=False))]
|
||||
|
||||
result = _api_post("/api/session/move",
|
||||
{"session_id": session_id, "project_id": project_id})
|
||||
if "error" in result:
|
||||
return [TextContent(type="text", text=json.dumps(result, ensure_ascii=False))]
|
||||
|
||||
session = result.get("session", {})
|
||||
return [TextContent(type="text", text=json.dumps({
|
||||
"ok": True,
|
||||
"session_id": session_id,
|
||||
"project_id": project_id,
|
||||
"title": session.get("title"),
|
||||
"method": "api",
|
||||
}, ensure_ascii=False, indent=2))]
|
||||
|
||||
|
||||
# ═══════════════════════════════════════════════════════════════════════════
|
||||
# MCP Server wiring
|
||||
# ═══════════════════════════════════════════════════════════════════════════
|
||||
|
||||
TOOLS = [
|
||||
Tool(
|
||||
name="list_projects",
|
||||
description="List all session projects with their IDs, names, colors, and session counts (scoped to active profile).",
|
||||
inputSchema={"type": "object", "properties": {}, "required": []},
|
||||
),
|
||||
Tool(
|
||||
name="create_project",
|
||||
description="Create a new project for organizing sessions (profile-scoped).",
|
||||
inputSchema={
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"name": {"type": "string", "description": "Project name (max 128 chars)"},
|
||||
"color": {"type": "string", "description": "Optional hex color (#RGB, #RRGGBB, or #RRGGBBAA)"},
|
||||
},
|
||||
"required": ["name"],
|
||||
},
|
||||
),
|
||||
Tool(
|
||||
name="rename_project",
|
||||
description="Rename a project and optionally change its color (profile-checked).",
|
||||
inputSchema={
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"project_id": {"type": "string", "description": "12-char project ID"},
|
||||
"name": {"type": "string", "description": "New name (max 128 chars)"},
|
||||
"color": {"type": "string", "description": "Optional new hex color"},
|
||||
},
|
||||
"required": ["project_id", "name"],
|
||||
},
|
||||
),
|
||||
Tool(
|
||||
name="delete_project",
|
||||
description="Delete a project and unassign all its sessions (profile-checked).",
|
||||
inputSchema={
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"project_id": {"type": "string", "description": "12-char project ID to delete"},
|
||||
},
|
||||
"required": ["project_id"],
|
||||
},
|
||||
),
|
||||
Tool(
|
||||
name="rename_session",
|
||||
description="Rename a session (updates sidebar via authenticated API, cache-safe).",
|
||||
inputSchema={
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"session_id": {"type": "string", "description": "Session ID"},
|
||||
"title": {"type": "string", "description": "New title (max 80 chars)"},
|
||||
},
|
||||
"required": ["session_id", "title"],
|
||||
},
|
||||
),
|
||||
Tool(
|
||||
name="move_session",
|
||||
description="Assign a session to a project. Pass project_id=null to unassign. Uses authenticated API for cache safety (profile-checked).",
|
||||
inputSchema={
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"session_id": {"type": "string", "description": "Session ID"},
|
||||
"project_id": {"type": ["string", "null"], "description": "Project ID (or null to unassign)"},
|
||||
},
|
||||
"required": ["session_id", "project_id"],
|
||||
},
|
||||
),
|
||||
Tool(
|
||||
name="list_sessions",
|
||||
description="List sessions, optionally filtered by project or unassigned status (profile-scoped).",
|
||||
inputSchema={
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"project_id": {"type": "string", "description": "Filter sessions by project ID"},
|
||||
"unassigned": {"type": "boolean", "description": "Show only sessions with no project"},
|
||||
"limit": {"type": "integer", "description": "Max results (default: 50, max: 500)"},
|
||||
},
|
||||
"required": [],
|
||||
},
|
||||
),
|
||||
]
|
||||
|
||||
HANDLERS = {
|
||||
"list_projects": handle_list_projects,
|
||||
"create_project": handle_create_project,
|
||||
"rename_project": handle_rename_project,
|
||||
"delete_project": handle_delete_project,
|
||||
"rename_session": handle_rename_session,
|
||||
"move_session": handle_move_session,
|
||||
"list_sessions": handle_list_sessions,
|
||||
}
|
||||
|
||||
|
||||
@server.list_tools()
|
||||
async def list_tools() -> list[Tool]:
|
||||
return TOOLS
|
||||
|
||||
|
||||
@server.call_tool()
|
||||
async def call_tool(name: str, arguments: dict) -> list[TextContent]:
|
||||
handler = HANDLERS.get(name)
|
||||
if not handler:
|
||||
return [TextContent(type="text", text=json.dumps(
|
||||
{"error": f"Unknown tool: {name}"}, ensure_ascii=False))]
|
||||
return await handler(arguments)
|
||||
|
||||
|
||||
async def main():
|
||||
async with stdio_server() as (read, write):
|
||||
await server.run(read, write, server.create_initialization_options())
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
import asyncio
|
||||
asyncio.run(main())
|
||||
@@ -0,0 +1,697 @@
|
||||
"""Tests for mcp_server.py — Option A rewrite (Issue #1616).
|
||||
|
||||
Covers: project CRUD, profile scoping, title collision, color validation,
|
||||
session listing, cross-profile isolation.
|
||||
|
||||
Uses HERMES_WEBUI_STATE_DIR env var to point to a temp directory,
|
||||
so tests don't touch the real webui state. Module is re-imported
|
||||
per test class to ensure clean state.
|
||||
"""
|
||||
|
||||
import json
|
||||
import os
|
||||
import sys
|
||||
import tempfile
|
||||
import uuid
|
||||
from pathlib import Path
|
||||
|
||||
import pytest
|
||||
|
||||
pytestmark = pytest.mark.asyncio
|
||||
|
||||
# ── Ensure repo root on path ──────────────────────────────────────────────
|
||||
_REPO = Path(__file__).parent.parent.resolve()
|
||||
if str(_REPO) not in sys.path:
|
||||
sys.path.insert(0, str(_REPO))
|
||||
|
||||
|
||||
# ═══════════════════════════════════════════════════════════════════════════
|
||||
# Helpers
|
||||
# ═══════════════════════════════════════════════════════════════════════════
|
||||
|
||||
def _fresh_state_dir():
|
||||
"""Create a clean temp state dir and set HERMES_WEBUI_STATE_DIR."""
|
||||
td = tempfile.mkdtemp()
|
||||
state_dir = Path(td)
|
||||
sessions_dir = state_dir / "sessions"
|
||||
sessions_dir.mkdir(parents=True)
|
||||
(state_dir / "projects.json").write_text("[]", encoding="utf-8")
|
||||
(sessions_dir / "_index.json").write_text("[]", encoding="utf-8")
|
||||
os.environ["HERMES_WEBUI_STATE_DIR"] = str(state_dir)
|
||||
return state_dir
|
||||
|
||||
|
||||
def _cleanup_state_dir(state_dir: Path):
|
||||
"""Remove temp state dir and clear env var."""
|
||||
import shutil
|
||||
shutil.rmtree(state_dir, ignore_errors=True)
|
||||
os.environ.pop("HERMES_WEBUI_STATE_DIR", None)
|
||||
|
||||
|
||||
def _reimport_mcp():
|
||||
"""Re-import mcp_server with current env vars and profile.
|
||||
|
||||
Returns (mcp_module, profiles_module) — profiles_module is the
|
||||
live api.profiles reference that the re-imported mcp_server uses.
|
||||
"""
|
||||
# Clear cached module and api submodules that cache paths
|
||||
for key in list(sys.modules.keys()):
|
||||
if key == 'mcp_server' or key.startswith('mcp_server.') or \
|
||||
key == 'api.config' or key == 'api.models' or key == 'api.profiles':
|
||||
del sys.modules[key]
|
||||
|
||||
import importlib
|
||||
import api.config as cfg
|
||||
importlib.reload(cfg)
|
||||
|
||||
# Re-acquire api.profiles reference (old one is stale after sys.modules clear)
|
||||
import api.profiles as fresh_profiles
|
||||
fresh_profiles._active_profile = 'default'
|
||||
|
||||
import mcp_server as mod
|
||||
return mod, fresh_profiles
|
||||
|
||||
|
||||
async def _call(mod, tool_name, **kwargs):
|
||||
"""Call a tool handler and return parsed JSON."""
|
||||
handler = mod.HANDLERS[tool_name]
|
||||
result = await handler(kwargs)
|
||||
return json.loads(result[0].text)
|
||||
|
||||
|
||||
# ═══════════════════════════════════════════════════════════════════════════
|
||||
# Project CRUD
|
||||
# ═══════════════════════════════════════════════════════════════════════════
|
||||
|
||||
class TestCreateProject:
|
||||
@pytest.fixture(autouse=True)
|
||||
def setup(self):
|
||||
self.state_dir = _fresh_state_dir()
|
||||
self.mod, self.profiles = _reimport_mcp()
|
||||
yield
|
||||
_cleanup_state_dir(self.state_dir)
|
||||
|
||||
async def test_create_basic(self):
|
||||
result = await _call(self.mod, "create_project", name="Test Project")
|
||||
assert "project_id" in result
|
||||
assert result["name"] == "Test Project"
|
||||
assert result["profile"] == "default"
|
||||
assert result["session_count"] == 0
|
||||
|
||||
async def test_create_with_color(self):
|
||||
result = await _call(self.mod, "create_project",
|
||||
name="Colored", color="#ff6600")
|
||||
assert result["color"] == "#ff6600"
|
||||
|
||||
async def test_create_duplicate_exact_match(self):
|
||||
await _call(self.mod, "create_project", name="My Project")
|
||||
result = await _call(self.mod, "create_project", name="My Project")
|
||||
assert "error" in result
|
||||
assert "already exists" in result["error"]
|
||||
|
||||
async def test_create_case_sensitive_no_collision(self):
|
||||
"""Exact match: 'MY project' and 'My Project' are different."""
|
||||
await _call(self.mod, "create_project", name="My Project")
|
||||
result = await _call(self.mod, "create_project", name="MY project")
|
||||
assert "project_id" in result
|
||||
|
||||
async def test_create_empty_name(self):
|
||||
result = await _call(self.mod, "create_project", name="")
|
||||
assert "error" in result
|
||||
|
||||
async def test_create_invalid_color(self):
|
||||
result = await _call(self.mod, "create_project",
|
||||
name="Bad", color="not-a-color")
|
||||
assert "error" in result
|
||||
assert "Invalid color" in result["error"]
|
||||
|
||||
async def test_create_valid_color_formats(self):
|
||||
for color in ["#fff", "#ff6600", "#ff6600aa"]:
|
||||
result = await _call(self.mod, "create_project",
|
||||
name=f"Color-{color}", color=color)
|
||||
assert result["color"] == color
|
||||
|
||||
|
||||
class TestRenameProject:
|
||||
@pytest.fixture(autouse=True)
|
||||
def setup(self):
|
||||
self.state_dir = _fresh_state_dir()
|
||||
self.mod, self.profiles = _reimport_mcp()
|
||||
yield
|
||||
_cleanup_state_dir(self.state_dir)
|
||||
|
||||
async def test_rename_basic(self):
|
||||
created = await _call(self.mod, "create_project", name="Old")
|
||||
pid = created["project_id"]
|
||||
result = await _call(self.mod, "rename_project",
|
||||
project_id=pid, name="New")
|
||||
assert result["name"] == "New"
|
||||
assert result["project_id"] == pid
|
||||
|
||||
async def test_rename_with_color(self):
|
||||
created = await _call(self.mod, "create_project", name="X")
|
||||
result = await _call(self.mod, "rename_project",
|
||||
project_id=created["project_id"],
|
||||
name="X", color="#000")
|
||||
assert result["color"] == "#000"
|
||||
|
||||
async def test_rename_not_found(self):
|
||||
result = await _call(self.mod, "rename_project",
|
||||
project_id="nonexistent", name="Nope")
|
||||
assert "error" in result
|
||||
|
||||
async def test_rename_wrong_profile(self):
|
||||
created = await _call(self.mod, "create_project", name="DefaultOwned")
|
||||
pid = created["project_id"]
|
||||
self.profiles._active_profile = 'other'
|
||||
result = await _call(self.mod, "rename_project",
|
||||
project_id=pid, name="Stolen")
|
||||
assert "error" in result
|
||||
assert "not found" in result["error"].lower()
|
||||
|
||||
|
||||
class TestDeleteProject:
|
||||
@pytest.fixture(autouse=True)
|
||||
def setup(self):
|
||||
self.state_dir = _fresh_state_dir()
|
||||
self.mod, self.profiles = _reimport_mcp()
|
||||
yield
|
||||
_cleanup_state_dir(self.state_dir)
|
||||
|
||||
async def test_delete_basic(self):
|
||||
created = await _call(self.mod, "create_project", name="ToDelete")
|
||||
pid = created["project_id"]
|
||||
result = await _call(self.mod, "delete_project", project_id=pid)
|
||||
assert result["ok"] is True
|
||||
assert result["deleted"] == "ToDelete"
|
||||
|
||||
async def test_delete_not_found(self):
|
||||
result = await _call(self.mod, "delete_project",
|
||||
project_id="nonexistent")
|
||||
assert "error" in result
|
||||
|
||||
async def test_delete_wrong_profile(self):
|
||||
created = await _call(self.mod, "create_project", name="Owned")
|
||||
pid = created["project_id"]
|
||||
self.profiles._active_profile = 'other'
|
||||
result = await _call(self.mod, "delete_project", project_id=pid)
|
||||
assert "error" in result
|
||||
|
||||
async def test_delete_no_auth_refuses_unassign(self):
|
||||
"""Without HERMES_WEBUI_PASSWORD, delete_project must NOT touch
|
||||
session JSONs. Direct FS writes would bypass _write_session_index()
|
||||
and leave _index.json holding the stale project_id, causing a
|
||||
running WebUI to keep grouping sessions under the deleted project.
|
||||
|
||||
The handler should: delete the project from projects.json, leave
|
||||
every session JSON untouched, leave the index untouched, and
|
||||
surface a `warning` field telling the operator to set the env var.
|
||||
"""
|
||||
from api.config import SESSION_DIR, SESSION_INDEX_FILE
|
||||
os.environ.pop("HERMES_WEBUI_PASSWORD", None)
|
||||
|
||||
# Create project + a session JSON that points at it
|
||||
created = await _call(self.mod, "create_project", name="ToDelete")
|
||||
pid = created["project_id"]
|
||||
sid = "test_sess_001"
|
||||
session_path = SESSION_DIR / f"{sid}.json"
|
||||
session_payload = {
|
||||
"session_id": sid,
|
||||
"title": "T",
|
||||
"project_id": pid,
|
||||
"messages": [],
|
||||
}
|
||||
session_path.write_text(json.dumps(session_payload), encoding="utf-8")
|
||||
# Index references the session under the project
|
||||
SESSION_INDEX_FILE.write_text(
|
||||
json.dumps([{"session_id": sid, "project_id": pid, "title": "T"}]),
|
||||
encoding="utf-8")
|
||||
index_before = SESSION_INDEX_FILE.read_text(encoding="utf-8")
|
||||
session_before = session_path.read_text(encoding="utf-8")
|
||||
|
||||
result = await _call(self.mod, "delete_project", project_id=pid)
|
||||
|
||||
assert result["ok"] is True
|
||||
assert result["unassigned_sessions"] == 0
|
||||
assert "warning" in result
|
||||
assert "HERMES_WEBUI_PASSWORD" in result["warning"]
|
||||
# Session JSON untouched
|
||||
assert session_path.read_text(encoding="utf-8") == session_before
|
||||
# Index untouched
|
||||
assert SESSION_INDEX_FILE.read_text(encoding="utf-8") == index_before
|
||||
|
||||
|
||||
# ═══════════════════════════════════════════════════════════════════════════
|
||||
# Profile Scoping
|
||||
# ═══════════════════════════════════════════════════════════════════════════
|
||||
|
||||
class TestProfileScoping:
|
||||
@pytest.fixture(autouse=True)
|
||||
def setup(self):
|
||||
self.state_dir = _fresh_state_dir()
|
||||
self.mod, self.profiles = _reimport_mcp()
|
||||
yield
|
||||
_cleanup_state_dir(self.state_dir)
|
||||
|
||||
async def test_projects_tagged_with_profile(self):
|
||||
result = await _call(self.mod, "create_project", name="Tagged")
|
||||
assert result["profile"] == "default"
|
||||
|
||||
async def test_list_projects_respects_profile(self):
|
||||
# Create under default
|
||||
await _call(self.mod, "create_project", name="DefaultProject")
|
||||
|
||||
# Switch to other
|
||||
self.profiles._active_profile = 'other'
|
||||
await _call(self.mod, "create_project", name="OtherProject")
|
||||
|
||||
# List should only show current profile's projects
|
||||
projects = await _call(self.mod, "list_projects")
|
||||
names = [p["name"] for p in projects]
|
||||
assert "OtherProject" in names
|
||||
assert "DefaultProject" not in names
|
||||
|
||||
# Switch back
|
||||
self.profiles._active_profile = 'default'
|
||||
projects = await _call(self.mod, "list_projects")
|
||||
names = [p["name"] for p in projects]
|
||||
assert "DefaultProject" in names
|
||||
assert "OtherProject" not in names
|
||||
|
||||
async def test_cross_profile_isolation_create(self):
|
||||
"""Same name in different profiles should be allowed."""
|
||||
await _call(self.mod, "create_project", name="Shared")
|
||||
self.profiles._active_profile = 'other'
|
||||
result = await _call(self.mod, "create_project", name="Shared")
|
||||
assert "project_id" in result
|
||||
|
||||
async def test_legacy_untagged_hidden_from_non_root_profile(self):
|
||||
"""Untagged projects (no `profile` field) belong to the root profile.
|
||||
|
||||
Mirrors api/routes.py:_profiles_match where a missing profile coerces
|
||||
to 'default'. A non-root profile must NOT see legacy untagged rows.
|
||||
"""
|
||||
# Manually write a legacy untagged project (pre-#1614 schema)
|
||||
from api.config import PROJECTS_FILE
|
||||
legacy = [{
|
||||
"project_id": "legacy000001",
|
||||
"name": "LegacyUntagged",
|
||||
"color": None,
|
||||
"created_at": 1700000000.0,
|
||||
# No "profile" field on purpose
|
||||
}]
|
||||
PROJECTS_FILE.write_text(json.dumps(legacy), encoding="utf-8")
|
||||
|
||||
# Non-root profile must NOT see it
|
||||
self.profiles._active_profile = 'other'
|
||||
projects = await _call(self.mod, "list_projects")
|
||||
names = [p["name"] for p in projects]
|
||||
assert "LegacyUntagged" not in names
|
||||
|
||||
# Root profile still sees it (load_projects backfills `profile`
|
||||
# to 'default', so visibility is preserved for the root).
|
||||
self.profiles._active_profile = 'default'
|
||||
projects = await _call(self.mod, "list_projects")
|
||||
names = [p["name"] for p in projects]
|
||||
assert "LegacyUntagged" in names
|
||||
|
||||
async def test_legacy_untagged_rename_blocked_from_non_root(self):
|
||||
"""Non-root profile cannot rename a legacy untagged project."""
|
||||
from api.config import PROJECTS_FILE
|
||||
legacy = [{
|
||||
"project_id": "legacy000002",
|
||||
"name": "Legacy",
|
||||
"color": None,
|
||||
"created_at": 1700000000.0,
|
||||
}]
|
||||
PROJECTS_FILE.write_text(json.dumps(legacy), encoding="utf-8")
|
||||
self.profiles._active_profile = 'other'
|
||||
result = await _call(self.mod, "rename_project",
|
||||
project_id="legacy000002", name="Stolen")
|
||||
assert "error" in result
|
||||
|
||||
|
||||
# ═══════════════════════════════════════════════════════════════════════════
|
||||
# Session listing
|
||||
# ═══════════════════════════════════════════════════════════════════════════
|
||||
|
||||
class TestListSessions:
|
||||
@pytest.fixture(autouse=True)
|
||||
def setup(self):
|
||||
self.state_dir = _fresh_state_dir()
|
||||
self.mod, self.profiles = _reimport_mcp()
|
||||
yield
|
||||
_cleanup_state_dir(self.state_dir)
|
||||
|
||||
async def test_list_empty(self):
|
||||
result = await _call(self.mod, "list_sessions")
|
||||
assert result == []
|
||||
|
||||
async def test_list_with_limit(self):
|
||||
result = await _call(self.mod, "list_sessions", limit=10)
|
||||
assert isinstance(result, list)
|
||||
|
||||
async def test_list_unassigned(self):
|
||||
result = await _call(self.mod, "list_sessions", unassigned=True)
|
||||
assert isinstance(result, list)
|
||||
|
||||
|
||||
# ═══════════════════════════════════════════════════════════════════════════
|
||||
# Session mutations (HTTP API — basic validation only)
|
||||
# ═══════════════════════════════════════════════════════════════════════════
|
||||
|
||||
class TestSessionMutations:
|
||||
@pytest.fixture(autouse=True)
|
||||
def setup(self):
|
||||
self.state_dir = _fresh_state_dir()
|
||||
self.mod, self.profiles = _reimport_mcp()
|
||||
yield
|
||||
_cleanup_state_dir(self.state_dir)
|
||||
|
||||
async def test_rename_missing_args(self):
|
||||
result = await _call(self.mod, "rename_session",
|
||||
session_id="", title="")
|
||||
assert "error" in result
|
||||
|
||||
async def test_move_missing_args(self):
|
||||
result = await _call(self.mod, "move_session",
|
||||
session_id="", project_id="x")
|
||||
assert "error" in result
|
||||
|
||||
async def test_move_project_not_found(self):
|
||||
result = await _call(self.mod, "move_session",
|
||||
session_id="s1", project_id="nonexistent")
|
||||
assert "error" in result
|
||||
|
||||
async def test_move_target_owned_by_other_profile_rejected(self):
|
||||
"""A project owned by profile A is invisible to profile B (#1614)."""
|
||||
created = await _call(self.mod, "create_project", name="ATarget")
|
||||
pid = created["project_id"]
|
||||
self.profiles._active_profile = 'other'
|
||||
result = await _call(self.mod, "move_session",
|
||||
session_id="any", project_id=pid)
|
||||
assert "error" in result
|
||||
assert "not found" in result["error"].lower()
|
||||
|
||||
|
||||
# ═══════════════════════════════════════════════════════════════════════════
|
||||
# Auth helper
|
||||
# ═══════════════════════════════════════════════════════════════════════════
|
||||
|
||||
class TestApiPassword:
|
||||
@pytest.fixture(autouse=True)
|
||||
def setup(self):
|
||||
self.state_dir = _fresh_state_dir()
|
||||
# Ensure env var is unset for the test
|
||||
os.environ.pop("HERMES_WEBUI_PASSWORD", None)
|
||||
self.mod, self.profiles = _reimport_mcp()
|
||||
yield
|
||||
_cleanup_state_dir(self.state_dir)
|
||||
|
||||
async def test_no_env_no_settings_returns_none(self):
|
||||
assert self.mod._api_password() is None
|
||||
|
||||
async def test_password_hash_in_settings_is_ignored(self):
|
||||
"""settings.json holds a hash, not a plaintext password — must NOT
|
||||
be returned as if it were a usable password."""
|
||||
from api.config import STATE_DIR as _SD
|
||||
(_SD / "settings.json").write_text(
|
||||
json.dumps({"password_hash": "$2b$12$abcdefghijk"}),
|
||||
encoding="utf-8")
|
||||
assert self.mod._api_password() is None
|
||||
|
||||
async def test_env_var_returned(self):
|
||||
os.environ["HERMES_WEBUI_PASSWORD"] = "secret123"
|
||||
try:
|
||||
assert self.mod._api_password() == "secret123"
|
||||
finally:
|
||||
os.environ.pop("HERMES_WEBUI_PASSWORD", None)
|
||||
|
||||
|
||||
# ═══════════════════════════════════════════════════════════════════════════
|
||||
# _profiles_match parity (mcp_server vs api.routes vs api.profiles)
|
||||
# ═══════════════════════════════════════════════════════════════════════════
|
||||
#
|
||||
# Locks the canonical-helper relocation: mcp_server.py and api/routes.py both
|
||||
# now import _profiles_match from api/profiles.py. If anyone re-introduces a
|
||||
# local copy in either module, both the identity check and the input-matrix
|
||||
# parametrize trip immediately.
|
||||
|
||||
async def test_profiles_match_single_source_of_truth():
|
||||
"""All three module names resolve to the same canonical object.
|
||||
|
||||
This locks the relocation: mcp_server.py and api/routes.py both import
|
||||
_profiles_match from api/profiles.py rather than carrying a local copy.
|
||||
Re-introducing a local definition in either module trips this test
|
||||
immediately.
|
||||
|
||||
Imported here in a clean module-import context (not via _reimport_mcp,
|
||||
which would re-execute api/profiles.py and produce a distinct function
|
||||
object that's behaviorally identical but fails the `is` check).
|
||||
"""
|
||||
# Make sure no test fixture left a re-import side-effect on these modules.
|
||||
for k in ('mcp_server', 'api.routes', 'api.profiles'):
|
||||
sys.modules.pop(k, None)
|
||||
import api.profiles as _profiles_mod
|
||||
import api.routes as _routes_mod
|
||||
import mcp_server as _mcp_mod
|
||||
canonical = _profiles_mod._profiles_match
|
||||
assert _routes_mod._profiles_match is canonical
|
||||
assert _mcp_mod._profiles_match is canonical
|
||||
|
||||
|
||||
@pytest.mark.parametrize("a, b", [
|
||||
(None, None),
|
||||
(None, ''),
|
||||
('', None),
|
||||
('', ''),
|
||||
(None, 'default'),
|
||||
('default', None),
|
||||
('default', 'default'),
|
||||
('foo', 'foo'),
|
||||
('foo', 'bar'),
|
||||
('foo', None),
|
||||
(None, 'foo'),
|
||||
('default', 'foo'),
|
||||
('foo', 'default'),
|
||||
])
|
||||
async def test_profiles_match_input_matrix(a, b):
|
||||
"""mcp_server._profiles_match agrees with api.routes._profiles_match
|
||||
on every (row, active) pair across the visibility matrix.
|
||||
|
||||
Note: function-object identity is checked separately in
|
||||
test_profiles_match_single_source_of_truth — here we only assert
|
||||
behavioral parity, which is robust to test-fixture re-imports that
|
||||
clear and re-execute api.profiles."""
|
||||
from mcp_server import _profiles_match as mcp_match
|
||||
from api.routes import _profiles_match as routes_match
|
||||
assert mcp_match(a, b) == routes_match(a, b)
|
||||
|
||||
|
||||
# ═══════════════════════════════════════════════════════════════════════════
|
||||
# --profile CLI ordering regression
|
||||
# ═══════════════════════════════════════════════════════════════════════════
|
||||
#
|
||||
# Maintainer ask: verify that --profile is applied to _active_profile *before*
|
||||
# any api.models / api.profiles consumer reads the active profile. The risk
|
||||
# is that if the canonical helpers cached the profile on first read at import
|
||||
# time, a --profile foo flag passed at startup would bind too late.
|
||||
#
|
||||
# Today the helpers read _active_profile lazily (api/profiles.py:173 reads
|
||||
# the module global at every call) so the override is safe. This test locks
|
||||
# the behaviour: setting _active_profile = 'foo' before the first list call
|
||||
# produces results filtered to 'foo', not the default.
|
||||
|
||||
class TestProfileCliOrdering:
|
||||
@pytest.fixture(autouse=True)
|
||||
def setup(self):
|
||||
self.state_dir = _fresh_state_dir()
|
||||
self.mod, self.profiles = _reimport_mcp()
|
||||
yield
|
||||
_cleanup_state_dir(self.state_dir)
|
||||
|
||||
async def test_active_profile_override_takes_effect_before_first_read(self):
|
||||
"""--profile foo must filter list_projects to foo's rows immediately.
|
||||
|
||||
Simulates the CLI override path (mcp_server.py:62-64 sets
|
||||
_profiles._active_profile = _profile_arg right after import). If a
|
||||
helper had latched the profile at import time, the override here
|
||||
would be too late and the test would see 'default'-tagged rows."""
|
||||
from api.config import PROJECTS_FILE
|
||||
# Pre-seed two projects: one for default, one for foo.
|
||||
seeded = [
|
||||
{"project_id": "p_default_0001", "name": "DefaultRow",
|
||||
"color": None, "profile": "default", "created_at": 1.0},
|
||||
{"project_id": "p_foo_0001", "name": "FooRow",
|
||||
"color": None, "profile": "foo", "created_at": 2.0},
|
||||
]
|
||||
PROJECTS_FILE.write_text(json.dumps(seeded), encoding="utf-8")
|
||||
|
||||
# Apply the override BEFORE the first list call. This is what
|
||||
# mcp_server.py:62-64 does after argparse.
|
||||
self.profiles._active_profile = 'foo'
|
||||
|
||||
projects = await _call(self.mod, "list_projects")
|
||||
names = [p["name"] for p in projects]
|
||||
assert "FooRow" in names
|
||||
assert "DefaultRow" not in names
|
||||
|
||||
|
||||
# ═══════════════════════════════════════════════════════════════════════════
|
||||
# HTTP wire-format coverage for rename_session / move_session
|
||||
# ═══════════════════════════════════════════════════════════════════════════
|
||||
#
|
||||
# Maintainer ask: exercise the actual HTTP path so a typo in WEBUI_URL or in
|
||||
# the request body shape can't slip through validation-only tests. We stand
|
||||
# up a tiny http.server stub on a free localhost port, point WEBUI_URL at it,
|
||||
# and capture (path, body) from the requests our handlers issue. This is
|
||||
# the thing that would have caught the original 8788 vs 8787 mismatch.
|
||||
|
||||
import http.server
|
||||
import socket
|
||||
import threading
|
||||
|
||||
|
||||
class _RecordingHandler(http.server.BaseHTTPRequestHandler):
|
||||
"""Captures POST path + body, returns canned JSON. Class-level state is
|
||||
set by the fixture before each test so handlers can cross-reference."""
|
||||
captured = None # populated per-test as a list of (path, body, headers)
|
||||
canned_response = None # populated per-test: dict to be JSON-encoded
|
||||
|
||||
def log_message(self, *args, **kwargs): # noqa: D401 — silence stderr
|
||||
pass
|
||||
|
||||
def do_POST(self):
|
||||
length = int(self.headers.get("Content-Length", "0"))
|
||||
raw = self.rfile.read(length) if length else b""
|
||||
try:
|
||||
body = json.loads(raw.decode("utf-8")) if raw else {}
|
||||
except Exception:
|
||||
body = {"_raw": raw.decode("utf-8", errors="replace")}
|
||||
type(self).captured.append({
|
||||
"path": self.path,
|
||||
"body": body,
|
||||
"cookie": self.headers.get("Cookie"),
|
||||
"content_type": self.headers.get("Content-Type"),
|
||||
})
|
||||
payload = json.dumps(type(self).canned_response or {}).encode("utf-8")
|
||||
self.send_response(200)
|
||||
self.send_header("Content-Type", "application/json")
|
||||
self.send_header("Content-Length", str(len(payload)))
|
||||
self.end_headers()
|
||||
self.wfile.write(payload)
|
||||
|
||||
|
||||
def _free_port() -> int:
|
||||
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||||
s.bind(("127.0.0.1", 0))
|
||||
port = s.getsockname()[1]
|
||||
s.close()
|
||||
return port
|
||||
|
||||
|
||||
class TestApiWireFormat:
|
||||
@pytest.fixture(autouse=True)
|
||||
def setup(self):
|
||||
self.state_dir = _fresh_state_dir()
|
||||
# Stand up a recording HTTP server on a free port. We override
|
||||
# WEBUI_URL on the imported mcp_server module to point at it.
|
||||
self.port = _free_port()
|
||||
_RecordingHandler.captured = []
|
||||
_RecordingHandler.canned_response = {}
|
||||
self.httpd = http.server.HTTPServer(("127.0.0.1", self.port),
|
||||
_RecordingHandler)
|
||||
self.thread = threading.Thread(target=self.httpd.serve_forever,
|
||||
daemon=True)
|
||||
self.thread.start()
|
||||
|
||||
# Disable auth so _api_post() does not attempt a real /api/auth/login.
|
||||
os.environ.pop("HERMES_WEBUI_PASSWORD", None)
|
||||
|
||||
self.mod, self.profiles = _reimport_mcp()
|
||||
# Override AFTER import so the value sticks in the loaded module.
|
||||
self.mod.WEBUI_URL = f"http://127.0.0.1:{self.port}"
|
||||
yield
|
||||
self.httpd.shutdown()
|
||||
self.httpd.server_close()
|
||||
self.thread.join(timeout=2)
|
||||
_cleanup_state_dir(self.state_dir)
|
||||
|
||||
async def test_rename_session_posts_to_canonical_path(self):
|
||||
"""rename_session must POST {session_id, title} to /api/session/rename."""
|
||||
_RecordingHandler.canned_response = {
|
||||
"session": {"session_id": "abc123", "title": "Renamed"}
|
||||
}
|
||||
result = await _call(self.mod, "rename_session",
|
||||
session_id="abc123", title="Renamed")
|
||||
assert len(_RecordingHandler.captured) == 1
|
||||
req = _RecordingHandler.captured[0]
|
||||
assert req["path"] == "/api/session/rename"
|
||||
assert req["body"] == {"session_id": "abc123", "title": "Renamed"}
|
||||
assert req["content_type"] == "application/json"
|
||||
# Handler returns success-shaped result on 200.
|
||||
assert result["ok"] is True
|
||||
assert result["session_id"] == "abc123"
|
||||
assert result["title"] == "Renamed"
|
||||
assert result["method"] == "api"
|
||||
|
||||
async def test_move_session_posts_to_canonical_path(self):
|
||||
"""move_session (with a project_id) POSTs to /api/session/move
|
||||
after confirming the project exists locally."""
|
||||
# Need a real project so the pre-flight profile check passes.
|
||||
created = await _call(self.mod, "create_project", name="MoveTarget")
|
||||
pid = created["project_id"]
|
||||
_RecordingHandler.canned_response = {
|
||||
"ok": True,
|
||||
"session": {"session_id": "s1", "title": "T", "project_id": pid}
|
||||
}
|
||||
result = await _call(self.mod, "move_session",
|
||||
session_id="s1", project_id=pid)
|
||||
assert len(_RecordingHandler.captured) == 1
|
||||
req = _RecordingHandler.captured[0]
|
||||
assert req["path"] == "/api/session/move"
|
||||
assert req["body"] == {"session_id": "s1", "project_id": pid}
|
||||
assert result["ok"] is True
|
||||
assert result["session_id"] == "s1"
|
||||
assert result["project_id"] == pid
|
||||
assert result["method"] == "api"
|
||||
|
||||
async def test_move_session_unassign_sends_null_project_id(self):
|
||||
"""Passing project_id=None must serialize as JSON null (not omitted)."""
|
||||
_RecordingHandler.canned_response = {
|
||||
"ok": True, "session": {"session_id": "s1", "project_id": None}
|
||||
}
|
||||
result = await _call(self.mod, "move_session",
|
||||
session_id="s1", project_id=None)
|
||||
assert len(_RecordingHandler.captured) == 1
|
||||
req = _RecordingHandler.captured[0]
|
||||
assert req["path"] == "/api/session/move"
|
||||
assert req["body"] == {"session_id": "s1", "project_id": None}
|
||||
assert result["ok"] is True
|
||||
|
||||
async def test_url_built_from_env_vars(self):
|
||||
"""HERMES_WEBUI_HOST / HERMES_WEBUI_PORT govern WEBUI_URL.
|
||||
|
||||
Locks the maintainer-suggested env-var contract from #1895 review:
|
||||
the MCP must track the same env vars api/config.py:32-33 reads, so
|
||||
a non-default WebUI port (e.g. 8788 when 8787 is held by another
|
||||
service on the host) does not require a code edit."""
|
||||
os.environ["HERMES_WEBUI_HOST"] = "10.0.0.42"
|
||||
os.environ["HERMES_WEBUI_PORT"] = "9999"
|
||||
try:
|
||||
mod, _ = _reimport_mcp()
|
||||
assert mod.WEBUI_HOST == "10.0.0.42"
|
||||
assert mod.WEBUI_PORT == "9999"
|
||||
assert mod.WEBUI_URL == "http://10.0.0.42:9999"
|
||||
finally:
|
||||
os.environ.pop("HERMES_WEBUI_HOST", None)
|
||||
os.environ.pop("HERMES_WEBUI_PORT", None)
|
||||
|
||||
async def test_url_default_when_env_unset(self):
|
||||
"""Default upstream port is 8787, matching api/config.py:33."""
|
||||
os.environ.pop("HERMES_WEBUI_HOST", None)
|
||||
os.environ.pop("HERMES_WEBUI_PORT", None)
|
||||
mod, _ = _reimport_mcp()
|
||||
assert mod.WEBUI_HOST == "127.0.0.1"
|
||||
assert mod.WEBUI_PORT == "8787"
|
||||
assert mod.WEBUI_URL == "http://127.0.0.1:8787"
|
||||
Reference in New Issue
Block a user