Merge pull request #2156 into stage-346

Issue #2057 Slice 2: Add guarded worktree remove action
This commit is contained in:
Hermes Agent
2026-05-13 06:56:25 +00:00
9 changed files with 710 additions and 2 deletions
+23
View File
@@ -3173,6 +3173,7 @@ def handle_get(handler, parsed) -> bool:
if parsed.path.startswith("/static/"):
return _serve_static(handler, parsed)
if parsed.path == "/api/session/worktree/status":
query = parse_qs(parsed.query)
sid = query.get("session_id", [""])[0]
@@ -4391,6 +4392,28 @@ def handle_post(handler, parsed) -> bool:
logger.debug("Failed to close workspace terminal after workspace update")
set_last_workspace(new_ws)
return j(handler, {"session": s.compact() | {"messages": s.messages}})
if parsed.path == "/api/session/worktree/remove":
sid = body.get("session_id", "")
if not sid or not isinstance(sid, str) or not sid.strip():
return bad(handler, "session_id must be a non-empty string", status=400)
sid = sid.strip()
if not all(c in '0123456789abcdefghijklmnopqrstuvwxyz_' for c in sid):
return bad(handler, "Invalid session_id", 400)
try:
s = get_session(sid, metadata_only=True)
except KeyError:
return bad(handler, "Session not found", status=404)
force = bool(body.get("force", False))
try:
from api.worktrees import remove_worktree_for_session
result = remove_worktree_for_session(s, force=force)
return j(handler, result)
except ValueError as exc:
return bad(handler, str(exc), status=400)
except Exception as exc:
logger.exception("failed to remove worktree for session %s", sid)
return bad(handler, _sanitize_error(exc), status=500)
if parsed.path == "/api/session/delete":
sid = body.get("session_id", "")
+96
View File
@@ -201,6 +201,102 @@ def worktree_status_for_session(session) -> dict:
return status
def remove_worktree_for_session(session, *, force: bool = False) -> dict:
"""Remove a session's git worktree from disk.
Returns status dict with keys: ok, removed_path, warnings.
Raises ValueError for terminal blockers (locked by stream/terminal,
dirty with force=False).
"""
raw_path = getattr(session, "worktree_path", None)
if not raw_path:
raise ValueError("Session is not worktree-backed")
worktree_path = _resolve_path(raw_path)
if worktree_path is None:
raise ValueError("Session is not worktree-backed")
# Read current status before removal
status = worktree_status_for_session(session)
if not status["exists"]:
return {
"ok": True,
"removed_path": str(worktree_path),
"warnings": ["Worktree directory no longer exists on disk."],
}
warnings = []
# Guard: locked by stream
if status["locked_by_stream"]:
raise ValueError("Worktree is locked by an active streaming session")
# Guard: locked by terminal
if status["locked_by_terminal"]:
raise ValueError("Worktree is locked by an active terminal session")
# Guard: local changes and unpushed commits without explicit force.
if status["dirty"] and not force:
raise ValueError(
"Worktree has uncommitted changes. Use force=true to override."
)
if status["untracked_count"] > 0:
if force:
warnings.append(
f"{status['untracked_count']} untracked file(s) will be removed."
)
else:
raise ValueError(
f"Worktree has {status['untracked_count']} untracked file(s). "
"Use force=true to override."
)
ahead = int((status.get("ahead_behind") or {}).get("ahead") or 0)
if ahead > 0:
if force:
warnings.append(f"{ahead} unpushed commit(s) will be removed.")
else:
raise ValueError(
f"Worktree has {ahead} unpushed commit(s). "
"Use force=true to override."
)
# Remove the worktree — must run from the repo root, not the worktree dir
repo_root = getattr(session, "worktree_repo_root", None)
if not repo_root:
raise ValueError("Session missing worktree_repo_root")
try:
remove_args = ["worktree", "remove"]
if force:
remove_args.append("--force")
remove_args.append(str(worktree_path))
result = _run_git(remove_args, str(repo_root), timeout=10)
except (OSError, subprocess.TimeoutExpired) as exc:
raise ValueError(f"Failed to remove worktree: {exc}") from exc
if result.returncode != 0:
stderr = (result.stderr or "").strip().split("\n")[-1]
raise ValueError(
f"git worktree remove failed: {stderr or result.stdout.strip()}"
)
# Prune in case the worktree dir was already gone
try:
_run_git(
["worktree", "prune"],
str(repo_root),
timeout=5,
)
except Exception:
pass
return {
"ok": True,
"removed_path": str(worktree_path),
"warnings": warnings or None,
}
def find_git_repo_root(workspace: str | Path) -> Path:
"""Return the enclosing git repo root for *workspace*.