Merge pull request #2109 from franksong2702/issue-2057-worktree-status

Add read-only worktree status endpoint (refs #2057)
This commit is contained in:
Hermes Agent
2026-05-12 05:10:18 +00:00
4 changed files with 430 additions and 0 deletions
+2
View File
@@ -4,6 +4,8 @@
### Added
- Read-only worktree status endpoint for the #2057 lifecycle tracker. `GET /api/session/worktree/status?session_id=...` now returns the session-owned worktree path, filesystem existence, dirty/untracked state, ahead/behind counts when an upstream is configured, and live stream/embedded-terminal lock flags without mutating git state. This is the non-destructive status surface Nathan requested before any future explicit remove-worktree action.
- **PR #2105** by @Michaelyklam — Hermes run adapter contract RFC at `docs/rfcs/hermes-run-adapter-contract.md` (refs #1925). 315-line spec/gap matrix that defines the event/control compatibility contract WebUI needs before browser-originated chat turns can be routed to Hermes-owned runtime execution. Documents the ownership boundary (Hermes Agent owns run creation, lifecycle, event ordering, replay, terminal state, approvals, clarify, cancellation; WebUI owns browser auth, transcript rendering, tool cards, approval/clarify widgets, workspace UX), the minimum `start_run`/`observe_run`/`get_run`/`cancel_run`/`queue_or_continue`/`respond_approval`/`respond_clarify` IPC surface, and a gap matrix mapping current `STREAMS`/`CANCEL_FLAGS`/`AGENT_INSTANCES`/callback queues to Hermes-owned targets with explicit "no private callback queue" / "no runtime surrogate" non-goals. First success criterion is restart/reattach (start a non-trivial run, restart hermes-webui, browser reconnects, replays from last cursor, cancels with Hermes-emitted terminal state) — not "basic chat streamed once." Status: Proposed.
### Fixed
+19
View File
@@ -3053,6 +3053,25 @@ def handle_get(handler, parsed) -> bool:
if parsed.path.startswith("/static/"):
return _serve_static(handler, parsed)
if parsed.path == "/api/session/worktree/status":
query = parse_qs(parsed.query)
sid = query.get("session_id", [""])[0]
if not sid:
return bad(handler, "session_id is required", status=400)
try:
s = get_session(sid, metadata_only=True)
except KeyError:
return bad(handler, "Session not found", status=404)
try:
from api.worktrees import worktree_status_for_session
return j(handler, {"status": worktree_status_for_session(s)})
except ValueError as exc:
return bad(handler, str(exc), status=400)
except Exception as exc:
logger.exception("failed to read worktree status for session %s", sid)
return bad(handler, _sanitize_error(exc), status=500)
if parsed.path == "/api/session":
import time as _time
_t0 = _time.monotonic()
+188
View File
@@ -13,6 +13,194 @@ import logging
logger = logging.getLogger(__name__)
def _run_git(args: list[str], cwd: str | Path, timeout: float = 5) -> subprocess.CompletedProcess:
return subprocess.run(
["git", *args],
cwd=str(cwd),
text=True,
capture_output=True,
timeout=timeout,
check=False,
)
def _resolve_path(path: str | Path | None) -> Path | None:
if not path:
return None
try:
return Path(path).expanduser().resolve(strict=False)
except (OSError, RuntimeError):
return Path(path).expanduser()
def _worktree_list_cwd(worktree_path: Path, repo_root: str | Path | None) -> Path | None:
repo = _resolve_path(repo_root)
if repo and repo.is_dir():
return repo
if worktree_path.is_dir():
return worktree_path
return None
def _parse_worktree_list_porcelain(output: str) -> set[str]:
paths: set[str] = set()
for line in str(output or "").splitlines():
if not line.startswith("worktree "):
continue
path = line[len("worktree "):].strip()
if not path:
continue
resolved = _resolve_path(path)
paths.add(str(resolved or Path(path).expanduser()))
return paths
def _worktree_listed(worktree_path: Path, repo_root: str | Path | None) -> bool:
"""Return whether git currently lists the worktree.
False is a safe fallback for probe failures, not definitive orphan proof.
Future cleanup UI must combine this with the rest of the status payload.
"""
cwd = _worktree_list_cwd(worktree_path, repo_root)
if cwd is None:
return False
try:
result = _run_git(["worktree", "list", "--porcelain"], cwd)
except (OSError, subprocess.TimeoutExpired):
return False
if result.returncode != 0:
return False
return str(worktree_path) in _parse_worktree_list_porcelain(result.stdout)
def _status_porcelain(worktree_path: Path) -> tuple[bool, int]:
try:
result = _run_git(
["status", "--porcelain", "--untracked-files=normal"],
worktree_path,
)
except (OSError, subprocess.TimeoutExpired):
return False, 0
if result.returncode != 0:
return False, 0
lines = [line for line in result.stdout.splitlines() if line]
return bool(lines), sum(1 for line in lines if line.startswith("??"))
def _ahead_behind(worktree_path: Path) -> dict:
payload = {
"ahead": 0,
"behind": 0,
"available": False,
"upstream": None,
}
try:
upstream = _run_git(
["rev-parse", "--abbrev-ref", "--symbolic-full-name", "@{u}"],
worktree_path,
)
except (OSError, subprocess.TimeoutExpired):
return payload
if upstream.returncode != 0:
return payload
upstream_ref = upstream.stdout.strip()
if not upstream_ref:
return payload
payload["upstream"] = upstream_ref
try:
counts = _run_git(
["rev-list", "--left-right", "--count", "HEAD...@{u}"],
worktree_path,
)
except (OSError, subprocess.TimeoutExpired):
return payload
if counts.returncode != 0:
return payload
parts = counts.stdout.strip().split()
if len(parts) != 2:
return payload
try:
payload["ahead"] = max(0, int(parts[0]))
payload["behind"] = max(0, int(parts[1]))
payload["available"] = True
except ValueError:
pass
return payload
def _locked_by_stream(session) -> bool:
stream_id = getattr(session, "active_stream_id", None)
if not stream_id:
return False
try:
from api.config import STREAMS, STREAMS_LOCK
with STREAMS_LOCK:
return stream_id in STREAMS
except Exception:
return False
def _locked_by_terminal(session_id: str, worktree_path: Path) -> bool:
try:
from api.terminal import get_terminal
term = get_terminal(session_id)
except Exception:
return False
if not term:
return False
try:
if not term.is_alive():
return False
terminal_workspace = _resolve_path(getattr(term, "workspace", None))
return terminal_workspace == worktree_path
except Exception:
return False
def worktree_status_for_session(session) -> dict:
"""Return a read-only worktree status snapshot for a WebUI session."""
raw_path = getattr(session, "worktree_path", None)
if not raw_path:
raise ValueError("Session is not worktree-backed")
worktree_path = _resolve_path(raw_path)
if worktree_path is None:
raise ValueError("Session is not worktree-backed")
exists = worktree_path.is_dir()
status = {
"path": str(worktree_path),
"exists": bool(exists),
"dirty": False,
"untracked_count": 0,
"ahead_behind": {
"ahead": 0,
"behind": 0,
"available": False,
"upstream": None,
},
"locked_by_stream": _locked_by_stream(session),
"locked_by_terminal": _locked_by_terminal(
getattr(session, "session_id", ""),
worktree_path,
),
"listed": _worktree_listed(
worktree_path,
getattr(session, "worktree_repo_root", None),
),
}
if not exists:
return status
dirty, untracked_count = _status_porcelain(worktree_path)
status["dirty"] = dirty
status["untracked_count"] = untracked_count
status["ahead_behind"] = _ahead_behind(worktree_path)
return status
def find_git_repo_root(workspace: str | Path) -> Path:
"""Return the enclosing git repo root for *workspace*.
+221
View File
@@ -0,0 +1,221 @@
import subprocess
from types import SimpleNamespace
from urllib.parse import urlparse
import pytest
import api.models as models
from api.models import SESSIONS, Session
def _git(cwd, *args):
return subprocess.run(
["git", *args],
cwd=cwd,
text=True,
capture_output=True,
check=True,
)
@pytest.fixture(autouse=True)
def _isolate_sessions(tmp_path, monkeypatch):
session_dir = tmp_path / "sessions"
session_dir.mkdir()
monkeypatch.setattr(models, "SESSION_DIR", session_dir)
monkeypatch.setattr(models, "SESSION_INDEX_FILE", session_dir / "_index.json")
SESSIONS.clear()
yield session_dir
SESSIONS.clear()
@pytest.fixture
def git_worktree(tmp_path):
repo = tmp_path / "repo"
remote = tmp_path / "remote.git"
worktree = tmp_path / "hermes-status"
repo.mkdir()
_git(repo, "init")
_git(repo, "config", "user.email", "test@example.com")
_git(repo, "config", "user.name", "Hermes Test")
_git(repo, "branch", "-M", "main")
(repo / "README.md").write_text("hello\n", encoding="utf-8")
_git(repo, "add", "README.md")
_git(repo, "commit", "-m", "initial")
_git(remote.parent, "init", "--bare", remote.name)
_git(repo, "remote", "add", "origin", str(remote))
_git(repo, "push", "-u", "origin", "main")
_git(repo, "worktree", "add", "-b", "hermes/status", str(worktree), "main")
_git(worktree, "push", "-u", "origin", "hermes/status")
return repo, worktree
def _session_for_worktree(repo, worktree, **kwargs):
return Session(
session_id=kwargs.pop("session_id", "wtstatus001"),
workspace=str(worktree),
worktree_path=str(worktree),
worktree_branch="hermes/status",
worktree_repo_root=str(repo),
worktree_created_at=123.0,
**kwargs,
)
def test_worktree_status_reports_clean_existing_worktree(git_worktree):
from api.worktrees import worktree_status_for_session
repo, worktree = git_worktree
status = worktree_status_for_session(_session_for_worktree(repo, worktree))
assert status["path"] == str(worktree.resolve())
assert status["exists"] is True
assert status["listed"] is True
assert status["dirty"] is False
assert status["untracked_count"] == 0
assert status["ahead_behind"]["available"] is True
assert status["ahead_behind"]["ahead"] == 0
assert status["ahead_behind"]["behind"] == 0
assert status["locked_by_stream"] is False
assert status["locked_by_terminal"] is False
def test_worktree_status_reports_dirty_untracked_and_ahead(git_worktree):
from api.worktrees import worktree_status_for_session
repo, worktree = git_worktree
(worktree / "README.md").write_text("hello\nedited\n", encoding="utf-8")
(worktree / "scratch.txt").write_text("local-only\n", encoding="utf-8")
status = worktree_status_for_session(_session_for_worktree(repo, worktree))
assert status["dirty"] is True
assert status["untracked_count"] == 1
assert status["ahead_behind"]["ahead"] == 0
_git(worktree, "add", "README.md")
_git(worktree, "commit", "-m", "local change")
status = worktree_status_for_session(_session_for_worktree(repo, worktree))
assert status["dirty"] is True
assert status["untracked_count"] == 1
assert status["ahead_behind"]["available"] is True
assert status["ahead_behind"]["ahead"] == 1
assert status["ahead_behind"]["behind"] == 0
def test_worktree_status_handles_missing_path_without_git_mutation(tmp_path):
from api.worktrees import worktree_status_for_session
missing = tmp_path / "missing-worktree"
status = worktree_status_for_session(
SimpleNamespace(
session_id="missing",
worktree_path=str(missing),
worktree_repo_root=str(tmp_path / "repo"),
active_stream_id=None,
)
)
assert status["path"] == str(missing.resolve())
assert status["exists"] is False
assert status["dirty"] is False
assert status["untracked_count"] == 0
assert status["ahead_behind"]["ahead"] == 0
assert status["ahead_behind"]["behind"] == 0
def test_worktree_status_uses_live_stream_registry(git_worktree):
from api.config import STREAMS, STREAMS_LOCK
from api.worktrees import worktree_status_for_session
repo, worktree = git_worktree
session = _session_for_worktree(
repo,
worktree,
active_stream_id="live-stream",
)
with STREAMS_LOCK:
STREAMS["live-stream"] = object()
try:
assert worktree_status_for_session(session)["locked_by_stream"] is True
finally:
with STREAMS_LOCK:
STREAMS.pop("live-stream", None)
assert worktree_status_for_session(session)["locked_by_stream"] is False
def test_worktree_status_reports_live_terminal_lock(git_worktree, monkeypatch):
import api.terminal as terminal
from api.worktrees import worktree_status_for_session
repo, worktree = git_worktree
class FakeTerminal:
workspace = str(worktree.resolve())
def is_alive(self):
return True
monkeypatch.setattr(terminal, "get_terminal", lambda session_id: FakeTerminal())
status = worktree_status_for_session(_session_for_worktree(repo, worktree))
assert status["locked_by_terminal"] is True
def test_worktree_status_endpoint_returns_session_owned_status(git_worktree, monkeypatch):
import api.routes as routes
repo, worktree = git_worktree
session = _session_for_worktree(repo, worktree, session_id="route_wt")
session.save()
captured = {}
monkeypatch.setattr(
routes,
"j",
lambda handler, payload, status=200, extra_headers=None: captured.update(
payload=payload,
status=status,
) or True,
)
handled = routes.handle_get(
object(),
urlparse("/api/session/worktree/status?session_id=route_wt"),
)
assert handled is True
assert captured["status"] == 200
assert captured["payload"]["status"]["path"] == str(worktree.resolve())
assert captured["payload"]["status"]["exists"] is True
def test_worktree_status_endpoint_rejects_non_worktree_session(tmp_path, monkeypatch):
import api.routes as routes
workspace = tmp_path / "workspace"
workspace.mkdir()
session = Session(session_id="plain", workspace=str(workspace))
session.save()
captured = {}
monkeypatch.setattr(
routes,
"bad",
lambda handler, message, status=400: captured.update(
message=message,
status=status,
) or True,
)
handled = routes.handle_get(
object(),
urlparse("/api/session/worktree/status?session_id=plain"),
)
assert handled is True
assert captured["status"] == 400
assert "not worktree-backed" in captured["message"]