"""Git helpers for the workspace panel. The browser only sends session ids and workspace-relative paths. This module resolves the active workspace server-side, scopes paths before they become Git pathspecs, and keeps all Git subprocess calls shell-free and bounded. """ from __future__ import annotations import difflib import os import shutil import subprocess import tempfile import threading from contextlib import contextmanager from dataclasses import dataclass from pathlib import Path from typing import Iterable from api.workspace import safe_resolve_ws GIT_TIMEOUT = 5 GIT_REMOTE_TIMEOUT = 60 STATUS_FILE_LIMIT = 500 DIFF_SIZE_LIMIT = 512 * 1024 COMMIT_MESSAGE_DIFF_LIMIT = 64 * 1024 WORKSPACE_GIT_DESTRUCTIVE_ENV = "HERMES_WEBUI_WORKSPACE_GIT_DESTRUCTIVE" _GIT_ENV_SCRUB_KEYS = ( "GIT_DIR", "GIT_WORK_TREE", "GIT_CONFIG_GLOBAL", "GIT_CONFIG_SYSTEM", "GIT_CONFIG_COUNT", "GIT_CONFIG_PARAMETERS", ) _GIT_ENV_SCRUB_PREFIXES = ("GIT_CONFIG_KEY_", "GIT_CONFIG_VALUE_") _HERMES_BRANCH_SWITCH_STASH_PREFIX = "hermes-webui branch switch" def workspace_git_destructive_enabled() -> bool: return os.getenv(WORKSPACE_GIT_DESTRUCTIVE_ENV, "").strip().lower() in { "1", "true", "yes", "on", } def _clean_git_env(extra: dict[str, str] | None = None) -> dict[str, str]: env = os.environ.copy() if extra: env.update(extra) for key in _GIT_ENV_SCRUB_KEYS: env.pop(key, None) for key in list(env): if key.startswith(_GIT_ENV_SCRUB_PREFIXES): env.pop(key, None) return env class GitWorkspaceError(RuntimeError): """User-facing Git operation error.""" def __init__(self, message: str, code: str = "git_failed"): super().__init__(message) self.code = code @dataclass(frozen=True) class GitContext: workspace: Path repo_root: Path workspace_prefix: str _LOCKS_GUARD = threading.Lock() _OP_LOCKS: dict[str, threading.Lock] = {} @contextmanager def _git_mutation_lock(ctx: GitContext): # Key by repo root so sessions in the same repository serialize mutations. # Separate worktrees get separate locks; Git still protects shared metadata # with its own locks. key = str(ctx.repo_root) with _LOCKS_GUARD: lock = _OP_LOCKS.setdefault(key, threading.Lock()) if not lock.acquire(timeout=GIT_REMOTE_TIMEOUT): raise GitWorkspaceError("Another Git operation is still running", "operation_in_progress") try: yield finally: lock.release() def _classify_git_error(message: str, args: list[str] | None = None) -> str: text = (message or "").lower() joined = " ".join(args or []).lower() if "timed out" in text: return "timeout" if "not installed" in text or "no such file or directory: 'git'" in text: return "missing_git" if "not a git repository" in text: return "not_a_repo" if "outside the workspace" in text or "outside the git repository" in text: return "path_outside_workspace" if "authentication failed" in text or "permission denied" in text or "could not read username" in text: return "auth_failed" if "no upstream" in text or "no configured push destination" in text or "has no upstream branch" in text: return "no_upstream" if ( "non-fast-forward" in text or "fetch first" in text or ("rejected" in text and "push" in joined) ): return "non_fast_forward" if "conflict" in text or "unmerged" in text or ("merge" in text and "needs" in text): return "conflict" if "working tree" in text and ("clean" in text or "dirty" in text): return "dirty_worktree" if "local changes" in text or "would be overwritten by checkout" in text: return "dirty_worktree" if "invalid reference" in text or "not a valid" in text or "unknown revision" in text: return "invalid_ref" if "hook" in text: return "hook_failed" return "git_failed" def _run_git( ctx_or_cwd: GitContext | Path, args: list[str], *, timeout: int = GIT_TIMEOUT, check: bool = False, env: dict[str, str] | None = None, ) -> subprocess.CompletedProcess[str]: cwd = ctx_or_cwd.repo_root if isinstance(ctx_or_cwd, GitContext) else ctx_or_cwd run_env = _clean_git_env(env) try: result = subprocess.run( ["git", *args], cwd=str(cwd), shell=False, capture_output=True, text=True, timeout=timeout, env=run_env, ) except subprocess.TimeoutExpired as exc: raise GitWorkspaceError("Git command timed out", "timeout") from exc except FileNotFoundError as exc: raise GitWorkspaceError("Git is not installed or not available on PATH", "missing_git") from exc except OSError as exc: raise GitWorkspaceError(str(exc), _classify_git_error(str(exc), args)) from exc if check and result.returncode != 0: message = (result.stderr or result.stdout or "Git command failed").strip() raise GitWorkspaceError(message, _classify_git_error(message, args)) return result def resolve_git_context(workspace: str | Path) -> GitContext | None: ws = Path(workspace).expanduser().resolve() result = _run_git(ws, ["rev-parse", "--show-toplevel"], check=False) if result.returncode != 0: return None repo_root = Path(result.stdout.strip()).resolve() try: prefix = ws.relative_to(repo_root).as_posix() except ValueError: return None return GitContext(workspace=ws, repo_root=repo_root, workspace_prefix="" if prefix == "." else prefix) def _workspace_pathspec(ctx: GitContext) -> str: return ctx.workspace_prefix or "." def _repo_rel(ctx: GitContext, workspace_rel: str) -> str: try: target = safe_resolve_ws(ctx.workspace, workspace_rel or ".") except ValueError as exc: raise GitWorkspaceError(str(exc), "path_outside_workspace") from exc try: repo_rel = target.relative_to(ctx.repo_root).as_posix() except ValueError as exc: raise GitWorkspaceError("Path is outside the Git repository", "path_outside_workspace") from exc if ctx.workspace_prefix: try: target.relative_to(ctx.workspace) except ValueError as exc: raise GitWorkspaceError("Path is outside the workspace", "path_outside_workspace") from exc return repo_rel def _workspace_rel(ctx: GitContext, repo_rel: str) -> str | None: repo_rel = repo_rel.replace("\\", "/") if not ctx.workspace_prefix: return repo_rel prefix = ctx.workspace_prefix.rstrip("/") + "/" if repo_rel == ctx.workspace_prefix: return "." if repo_rel.startswith(prefix): return repo_rel[len(prefix) :] return None def _empty_status() -> dict: return { "changed": 0, "staged": 0, "unstaged": 0, "untracked": 0, "conflicts": 0, } def _status_code(xy: str, *, untracked: bool = False, renamed: bool = False) -> str: if untracked: return "??" if xy in {"DD", "AU", "UD", "UA", "DU", "AA", "UU"}: return xy if renamed: return "R" for ch in xy: if ch in "MADRCUT": return ch return xy.strip(".") or "M" def _parse_numstat(text: str, ctx: GitContext) -> dict[str, tuple[int, int, bool]]: stats: dict[str, tuple[int, int, bool]] = {} for line in text.splitlines(): parts = line.split("\t", 2) if len(parts) < 3: continue raw_add, raw_del, raw_path = parts binary = raw_add == "-" or raw_del == "-" additions = 0 if binary else int(raw_add or "0") deletions = 0 if binary else int(raw_del or "0") workspace_path = _workspace_rel(ctx, raw_path) if workspace_path is None: continue stats[workspace_path] = (additions, deletions, binary) return stats def _parse_path_list(text: str, ctx: GitContext) -> set[str]: paths: set[str] = set() for raw_path in text.split("\0"): if not raw_path: continue workspace_path = _workspace_rel(ctx, raw_path) if workspace_path is not None: paths.add(workspace_path) return paths def _collect_diff_paths(ctx: GitContext, cached: bool, *, ignore_cr_at_eol: bool = True) -> set[str] | None: args = ["diff", "--name-only", "-z"] if ignore_cr_at_eol: args.append("--ignore-cr-at-eol") if cached: args.append("--cached") args.extend(["--", _workspace_pathspec(ctx)]) result = _run_git(ctx, args, check=False) if result.returncode != 0: return None return _parse_path_list(result.stdout, ctx) def _collect_numstat( ctx: GitContext, cached: bool, *, ignore_cr_at_eol: bool = True, ) -> dict[str, tuple[int, int, bool]]: args = ["diff", "--numstat"] if ignore_cr_at_eol: args.append("--ignore-cr-at-eol") if cached: args.append("--cached") args.extend(["--", _workspace_pathspec(ctx)]) result = _run_git(ctx, args, check=False) if result.returncode != 0: return {} return _parse_numstat(result.stdout, ctx) def _count_untracked_file(path: Path) -> tuple[int, int, bool]: try: if not path.is_file() or path.stat().st_size > DIFF_SIZE_LIMIT: return 0, 0, False except OSError: return 0, 0, False try: data = path.read_bytes() except OSError: return 0, 0, False if b"\0" in data: return 0, 0, True try: text = data.decode("utf-8") except UnicodeDecodeError: return 0, 0, True return len(text.splitlines()) or (1 if text else 0), 0, False def git_status(workspace: str | Path) -> dict: ctx = resolve_git_context(workspace) if ctx is None: return {"is_git": False} result = _run_git( ctx, [ "status", "--porcelain=v2", "-z", "--branch", "--ignored=matching", "--untracked-files=all", "--", _workspace_pathspec(ctx), ], check=True, ) staged_stats = _collect_numstat(ctx, cached=True) unstaged_stats = _collect_numstat(ctx, cached=False) staged_raw_stats = _collect_numstat(ctx, cached=True, ignore_cr_at_eol=False) unstaged_raw_stats = _collect_numstat(ctx, cached=False, ignore_cr_at_eol=False) staged_diff_paths = _collect_diff_paths(ctx, cached=True) unstaged_diff_paths = _collect_diff_paths(ctx, cached=False) branch = "" upstream = "" ahead = 0 behind = 0 files: dict[str, dict] = {} filtered_noise = {"filemode_only": 0, "crlf_only": 0} tokens = result.stdout.split("\0") i = 0 truncated = False while i < len(tokens): rec = tokens[i] i += 1 if not rec: continue if rec.startswith("# "): parts = rec.split(" ", 2) if len(parts) >= 3 and parts[1] == "branch.head": branch = "" if parts[2] == "(detached)" else parts[2] elif len(parts) >= 3 and parts[1] == "branch.upstream": upstream = parts[2] elif len(parts) >= 3 and parts[1] == "branch.ab": for bit in parts[2].split(): if bit.startswith("+") and bit[1:].isdigit(): ahead = int(bit[1:]) elif bit.startswith("-") and bit[1:].isdigit(): behind = int(bit[1:]) continue old_path = None renamed = False if rec.startswith("? "): xy = "??" repo_path = rec[2:] untracked = True ignored = False elif rec.startswith("! "): xy = "!!" repo_path = rec[2:] untracked = False ignored = True elif rec.startswith("1 "): parts = rec.split(" ", 8) if len(parts) < 9: continue xy = parts[1] repo_path = parts[8] untracked = False ignored = False elif rec.startswith("2 "): parts = rec.split(" ", 9) if len(parts) < 10: continue xy = parts[1] repo_path = parts[9] if i < len(tokens): old_path = tokens[i] i += 1 renamed = True untracked = False ignored = False elif rec.startswith("u "): parts = rec.split(" ", 10) if len(parts) < 11: continue xy = parts[1] repo_path = parts[10] untracked = False ignored = False else: continue workspace_path = _workspace_rel(ctx, repo_path) if workspace_path is None: continue old_workspace_path = _workspace_rel(ctx, old_path) if old_path else None x = xy[0] if xy else "." y = xy[1] if len(xy) > 1 else "." conflict = xy in {"DD", "AU", "UD", "UA", "DU", "AA", "UU"} or rec.startswith("u ") additions, deletions, binary = 0, 0, False for source in (staged_stats, unstaged_stats): if workspace_path in source: a, d, b = source[workspace_path] additions += a deletions += d binary = binary or b if untracked: additions, deletions, binary = _count_untracked_file(ctx.workspace / workspace_path) staged = (x not in {".", "?"}) and not untracked unstaged = (y not in {".", " "}) and not untracked if staged and staged_diff_paths is not None and not renamed: raw_staged = staged staged = workspace_path in staged_diff_paths or ( old_workspace_path is not None and old_workspace_path in staged_diff_paths ) if raw_staged and not staged: if workspace_path in staged_raw_stats or ( old_workspace_path is not None and old_workspace_path in staged_raw_stats ): filtered_noise["crlf_only"] += 1 else: filtered_noise["filemode_only"] += 1 if unstaged and unstaged_diff_paths is not None and not renamed: raw_unstaged = unstaged unstaged = workspace_path in unstaged_diff_paths or ( old_workspace_path is not None and old_workspace_path in unstaged_diff_paths ) if raw_unstaged and not unstaged: if workspace_path in unstaged_raw_stats or ( old_workspace_path is not None and old_workspace_path in unstaged_raw_stats ): filtered_noise["crlf_only"] += 1 else: filtered_noise["filemode_only"] += 1 if ignored: files[workspace_path] = { "path": workspace_path, "old_path": None, "workspace_path": workspace_path, "status": "Ignored", "staged": False, "unstaged": False, "untracked": False, "ignored": True, "conflict": False, "additions": 0, "deletions": 0, "binary": False, } if len(files) >= STATUS_FILE_LIMIT: truncated = True break continue if not (staged or unstaged or untracked or conflict or renamed): continue if not (untracked or conflict or renamed or binary) and additions == 0 and deletions == 0: filtered_noise["crlf_only"] += 1 continue files[workspace_path] = { "path": workspace_path, "old_path": old_workspace_path, "workspace_path": workspace_path, "status": _status_code(xy, untracked=untracked, renamed=renamed), "staged": staged, "unstaged": unstaged, "untracked": untracked, "ignored": False, "conflict": conflict, "additions": additions, "deletions": deletions, "binary": binary, } if len(files) >= STATUS_FILE_LIMIT: truncated = True break file_list = sorted(files.values(), key=lambda f: (f["path"].lower())) totals = _empty_status() for item in file_list: if item.get("ignored"): continue if item["staged"]: totals["staged"] += 1 if item["unstaged"]: totals["unstaged"] += 1 if item["untracked"]: totals["untracked"] += 1 if item["conflict"]: totals["conflicts"] += 1 totals["changed"] = sum(1 for item in file_list if not item.get("ignored")) if not branch: branch = (_run_git(ctx, ["rev-parse", "--short", "HEAD"], check=False).stdout or "").strip() return { "is_git": True, "branch": branch or "HEAD", "upstream": upstream, "ahead": ahead, "behind": behind, "totals": totals, "files": file_list, "truncated": truncated, "noise_filtering": { **filtered_noise, "active": any(filtered_noise.values()), }, } def _branch_ahead_behind(ctx: GitContext, branch: str, upstream: str) -> tuple[int, int]: if not upstream: return 0, 0 result = _run_git(ctx, ["rev-list", "--left-right", "--count", f"{branch}...{upstream}"], check=False) if result.returncode != 0: return 0, 0 parts = result.stdout.strip().split() if len(parts) != 2: return 0, 0 try: return int(parts[0]), int(parts[1]) except ValueError: return 0, 0 def _for_each_ref(ctx: GitContext, ref_prefix: str) -> list[dict]: fmt = ( "%(refname)%00%(refname:short)%00%(upstream:short)%00%(objectname:short)%00" "%(committerdate:unix)%00%(committerdate:relative)%00%(authorname)%00%(subject)" ) result = _run_git(ctx, ["for-each-ref", f"--format={fmt}", ref_prefix], check=True) refs = [] for line in result.stdout.splitlines(): full_name, name, upstream, sha, updated, updated_relative, author, subject = ( line.split("\0") + ["", "", "", "", "", "", "", ""] )[:8] if not name or full_name.endswith("/HEAD") or name.endswith("/HEAD"): continue if ref_prefix == "refs/remotes" and "/" not in name: continue item = { "name": name, "sha": sha, "updated": int(updated) if str(updated).isdigit() else 0, "updated_relative": updated_relative, "author": author, "subject": subject, } if upstream: ahead, behind = _branch_ahead_behind(ctx, name, upstream) item.update({"upstream": upstream, "ahead": ahead, "behind": behind}) else: item.update({"upstream": "", "ahead": 0, "behind": 0}) refs.append(item) return sorted(refs, key=lambda item: item["name"].lower()) def git_branches(workspace: str | Path) -> dict: ctx = resolve_git_context(workspace) if ctx is None: raise GitWorkspaceError("Workspace is not a Git repository", "not_a_repo") head_name = _run_git(ctx, ["branch", "--show-current"], check=True).stdout.strip() detached = not bool(head_name) head_sha = _run_git(ctx, ["rev-parse", "--short", "HEAD"], check=True).stdout.strip() status = git_status(workspace) local = _for_each_ref(ctx, "refs/heads") remote = _for_each_ref(ctx, "refs/remotes") return { "is_git": True, "current": head_name or head_sha or "HEAD", "detached": detached, "head": head_sha, "local": local, "remote": remote, "upstream": status.get("upstream", ""), "ahead": status.get("ahead", 0), "behind": status.get("behind", 0), } def _validate_local_branch(ctx: GitContext, ref: str) -> str: ref = str(ref or "").strip() if not ref: raise GitWorkspaceError("Branch name is required", "invalid_ref") _run_git(ctx, ["show-ref", "--verify", f"refs/heads/{ref}"], check=True) return ref def _validate_remote_branch(ctx: GitContext, ref: str) -> str: ref = str(ref or "").strip() if not ref: raise GitWorkspaceError("Remote branch name is required", "invalid_ref") _run_git(ctx, ["show-ref", "--verify", f"refs/remotes/{ref}"], check=True) return ref def _validate_checkout_start(ctx: GitContext, ref: str) -> str: ref = str(ref or "HEAD").strip() or "HEAD" result = _run_git(ctx, ["rev-parse", "--verify", f"{ref}^{{commit}}"], check=False) if result.returncode != 0: raise GitWorkspaceError("Invalid checkout reference", "invalid_ref") return ref def _validate_new_branch_name(ctx: GitContext, name: str) -> str: name = str(name or "").strip() if not name: raise GitWorkspaceError("New branch name is required", "invalid_ref") result = _run_git(ctx, ["check-ref-format", "--branch", name], check=False) if result.returncode != 0: raise GitWorkspaceError("Invalid branch name", "invalid_ref") exists = _run_git(ctx, ["show-ref", "--verify", f"refs/heads/{name}"], check=False) if exists.returncode == 0: raise GitWorkspaceError("A local branch with that name already exists", "invalid_ref") return name def _dirty_worktree(ctx: GitContext) -> bool: result = _run_git(ctx, ["status", "--porcelain=v2", "--untracked-files=all"], check=True) return bool(result.stdout.strip()) def _current_checkout_label(ctx: GitContext) -> str: branch = _run_git(ctx, ["branch", "--show-current"], check=False).stdout.strip() if branch: return branch return _run_git(ctx, ["rev-parse", "--short", "HEAD"], check=True).stdout.strip() or "HEAD" def _stash_subject_parts(subject: str) -> tuple[str, str] | None: subject = str(subject or "").strip() if not subject.startswith("On ") or ": " not in subject: return None branch, message = subject[3:].split(": ", 1) branch = branch.strip() message = message.strip() if not branch or not message.startswith(_HERMES_BRANCH_SWITCH_STASH_PREFIX): return None return branch, message def _hermes_branch_switch_stashes(ctx: GitContext) -> list[dict]: result = _run_git(ctx, ["stash", "list", "--format=%gd%x00%gs"], check=False) if result.returncode != 0: return [] stashes = [] for line in result.stdout.splitlines(): try: ref, subject = line.split("\0", 1) except ValueError: continue parts = _stash_subject_parts(subject) if not parts: continue branch, message = parts stashes.append({"ref": ref, "branch": branch, "message": message}) return stashes def _restore_branch_switch_stash_locked(ctx: GitContext, branch: str) -> dict: if _dirty_worktree(ctx): return {} for item in _hermes_branch_switch_stashes(ctx): if item.get("branch") != branch: continue result = _run_git(ctx, ["stash", "pop", "--index", item["ref"]], check=False) if result.returncode == 0: return {"restored_stash": item} return { "restore_failed": True, "restore_error": (result.stderr or result.stdout or "Git stash restore failed").strip(), "restore_stash": item, } return {} def _validate_checkout_request_locked( ctx: GitContext, ref: str, mode: str, new_branch: str | None, ) -> None: if mode == "local": _validate_local_branch(ctx, ref) return if mode in {"new", "create"}: _validate_new_branch_name(ctx, new_branch or ref) _validate_checkout_start(ctx, ref if (new_branch and ref and ref != new_branch) else "HEAD") return if mode == "remote": remote_ref = _validate_remote_branch(ctx, ref) branch_name = str(new_branch or remote_ref.split("/", 1)[-1]).strip() exists = _run_git(ctx, ["show-ref", "--verify", f"refs/heads/{branch_name}"], check=False) if exists.returncode != 0: _validate_new_branch_name(ctx, branch_name) return if mode in {"detached", "detach"}: _validate_checkout_start(ctx, ref) return raise GitWorkspaceError("Unsupported checkout mode", "invalid_ref") def _perform_checkout_locked( ctx: GitContext, workspace: str | Path, ref: str, mode: str, new_branch: str | None, track: bool, ) -> subprocess.CompletedProcess[str]: if mode == "local": target = _validate_local_branch(ctx, ref) return _run_git(ctx, ["switch", target], check=True) if mode in {"new", "create"}: branch = _validate_new_branch_name(ctx, new_branch or ref) start_ref = _validate_checkout_start(ctx, ref if (new_branch and ref and ref != new_branch) else "HEAD") return _run_git(ctx, ["switch", "-c", branch, start_ref], check=True) if mode == "remote": remote_ref = _validate_remote_branch(ctx, ref) branch_name = str(new_branch or remote_ref.split("/", 1)[-1]).strip() exists = _run_git(ctx, ["show-ref", "--verify", f"refs/heads/{branch_name}"], check=False) if exists.returncode == 0: result = _run_git(ctx, ["switch", branch_name], check=True) if track: _run_git(ctx, ["branch", "--set-upstream-to", remote_ref, branch_name], check=False) return result branch = _validate_new_branch_name(ctx, branch_name) args = ["switch", "-c", branch] if track: args.append("--track") args.append(remote_ref) return _run_git(ctx, args, check=True) if mode in {"detached", "detach"}: target = _validate_checkout_start(ctx, ref) return _run_git(ctx, ["switch", "--detach", target], check=True) raise GitWorkspaceError("Unsupported checkout mode", "invalid_ref") def git_checkout( workspace: str | Path, ref: str, mode: str, new_branch: str | None = None, track: bool = False, dirty_mode: str = "block", ) -> dict: ctx = resolve_git_context(workspace) if ctx is None: raise GitWorkspaceError("Workspace is not a Git repository", "not_a_repo") mode = str(mode or "local").strip().lower() dirty_mode = str(dirty_mode or "block").strip().lower() if dirty_mode != "block": raise GitWorkspaceError("Only dirty_mode=block is supported for branch checkout", "dirty_worktree") with _git_mutation_lock(ctx): _validate_checkout_request_locked(ctx, ref, mode, new_branch) if _dirty_worktree(ctx): raise GitWorkspaceError( "Checkout blocked because the Git worktree has uncommitted changes", "dirty_worktree", ) result = _perform_checkout_locked(ctx, workspace, ref, mode, new_branch, track) status = git_status(workspace) branches = git_branches(workspace) return { "ok": True, "message": _remote_message(result), "current_branch": branches.get("current"), "status": status, "branches": branches, } def git_stash_and_checkout( workspace: str | Path, ref: str, mode: str, new_branch: str | None = None, track: bool = False, ) -> dict: ctx = resolve_git_context(workspace) if ctx is None: raise GitWorkspaceError("Workspace is not a Git repository", "not_a_repo") mode = str(mode or "local").strip().lower() target_label = str(new_branch or ref or "HEAD").strip() or "HEAD" stash_name = f"{_HERMES_BRANCH_SWITCH_STASH_PREFIX} to {target_label}".strip() restored: dict = {} with _git_mutation_lock(ctx): _validate_checkout_request_locked(ctx, ref, mode, new_branch) stashed = False if _dirty_worktree(ctx): stash_result = _run_git(ctx, ["stash", "push", "-u", "-m", stash_name], check=True) stash_text = _remote_message(stash_result) stashed = "No local changes to save" not in stash_text try: result = _perform_checkout_locked(ctx, workspace, ref, mode, new_branch, track) except Exception: if stashed: _run_git(ctx, ["stash", "pop", "--index", "stash@{0}"], check=False) raise current_branch = _current_checkout_label(ctx) restored = _restore_branch_switch_stash_locked(ctx, current_branch) status = git_status(workspace) branches = git_branches(workspace) return { "ok": True, "message": _remote_message(result), "stash_name": stash_name if stashed else "", "stashed": stashed, "restored_stash": restored.get("restored_stash"), "restore_failed": bool(restored.get("restore_failed")), "restore_error": restored.get("restore_error", ""), "restore_stash": restored.get("restore_stash"), "current_branch": branches.get("current"), "status": status, "branches": branches, } def _diff_stats(diff_text: str) -> tuple[int, int]: additions = deletions = 0 for line in diff_text.splitlines(): if line.startswith("+++") or line.startswith("---"): continue if line.startswith("+"): additions += 1 elif line.startswith("-"): deletions += 1 return additions, deletions def _synthetic_untracked_diff(path: Path, label: str) -> dict: try: if not path.is_file(): raise GitWorkspaceError("Path is not a file") if path.stat().st_size > DIFF_SIZE_LIMIT: return { "binary": False, "too_large": True, "diff": "", "additions": 0, "deletions": 0, } except OSError as exc: raise GitWorkspaceError(str(exc)) from exc try: data = path.read_bytes() except OSError as exc: raise GitWorkspaceError(str(exc)) from exc if b"\0" in data: return {"binary": True, "too_large": False, "diff": "", "additions": 0, "deletions": 0} try: text = data.decode("utf-8") except UnicodeDecodeError: return {"binary": True, "too_large": False, "diff": "", "additions": 0, "deletions": 0} lines = text.splitlines() diff_lines = list( difflib.unified_diff([], lines, fromfile="/dev/null", tofile=f"b/{label}", lineterm="") ) diff = "\n".join(diff_lines) + ("\n" if diff_lines else "") too_large = len(diff.encode("utf-8", errors="replace")) > DIFF_SIZE_LIMIT if too_large: diff = diff[:DIFF_SIZE_LIMIT] additions, deletions = _diff_stats(diff) return { "binary": False, "too_large": too_large, "diff": diff, "additions": additions, "deletions": deletions, } def git_diff(workspace: str | Path, path: str, kind: str = "unstaged") -> dict: ctx = resolve_git_context(workspace) if ctx is None: raise GitWorkspaceError("Workspace is not a Git repository") if kind not in {"unstaged", "staged"}: raise GitWorkspaceError("kind must be staged or unstaged") repo_rel = _repo_rel(ctx, path) workspace_rel = _workspace_rel(ctx, repo_rel) or path status = git_status(workspace) file_state = next((f for f in status.get("files", []) if f.get("path") == workspace_rel), None) if kind == "unstaged" and file_state and file_state.get("untracked"): payload = _synthetic_untracked_diff(ctx.workspace / workspace_rel, workspace_rel) return {"path": workspace_rel, "kind": kind, **payload} args = ["diff", "--no-ext-diff", "--unified=3"] if kind == "staged": args.append("--cached") args.extend(["--", repo_rel]) result = _run_git(ctx, args, check=True) diff = result.stdout binary = "Binary files " in diff or "GIT binary patch" in diff too_large = len(diff.encode("utf-8", errors="replace")) > DIFF_SIZE_LIMIT if too_large: diff = diff[:DIFF_SIZE_LIMIT] additions, deletions = _diff_stats(diff) return { "path": workspace_rel, "kind": kind, "binary": binary, "too_large": too_large, "additions": additions, "deletions": deletions, "diff": "" if binary else diff, } def _clean_paths(paths: Iterable[str]) -> list[str]: cleaned = [] for path in paths: value = str(path or "").strip() if value and value not in cleaned: cleaned.append(value) if not cleaned: raise GitWorkspaceError("At least one path is required") return cleaned def _pathspecs(ctx: GitContext, paths: Iterable[str]) -> list[str]: return [_repo_rel(ctx, path) for path in _clean_paths(paths)] def git_stage(workspace: str | Path, paths: Iterable[str]) -> dict: ctx = resolve_git_context(workspace) if ctx is None: raise GitWorkspaceError("Workspace is not a Git repository", "not_a_repo") with _git_mutation_lock(ctx): _run_git(ctx, ["add", "--", *_pathspecs(ctx, paths)], check=True) return git_status(workspace) def git_unstage(workspace: str | Path, paths: Iterable[str]) -> dict: ctx = resolve_git_context(workspace) if ctx is None: raise GitWorkspaceError("Workspace is not a Git repository", "not_a_repo") specs = _pathspecs(ctx, paths) with _git_mutation_lock(ctx): result = _run_git(ctx, ["restore", "--staged", "--", *specs], check=False) if result.returncode != 0: _run_git(ctx, ["reset", "HEAD", "--", *specs], check=True) return git_status(workspace) def git_discard(workspace: str | Path, paths: Iterable[str], *, delete_untracked: bool = False) -> dict: ctx = resolve_git_context(workspace) if ctx is None: raise GitWorkspaceError("Workspace is not a Git repository", "not_a_repo") with _git_mutation_lock(ctx): status = git_status(workspace) by_path = {f["path"]: f for f in status.get("files", [])} for path in _clean_paths(paths): repo_rel = _repo_rel(ctx, path) workspace_rel = _workspace_rel(ctx, repo_rel) or path state = by_path.get(workspace_rel) or by_path.get(workspace_rel.rstrip("/") + "/") if state and state.get("conflict"): raise GitWorkspaceError("Conflicted files cannot be discarded from this panel", "conflict") if state and state.get("untracked"): if not delete_untracked: raise GitWorkspaceError("Untracked files require delete_untracked=true") target = safe_resolve_ws(ctx.workspace, workspace_rel) if target.is_dir(): shutil.rmtree(target) else: target.unlink(missing_ok=True) continue _run_git(ctx, ["restore", "--worktree", "--", repo_rel], check=True) return git_status(workspace) COMMIT_MESSAGE_SYSTEM_PROMPT = """When writing commit messages, PR titles, or PR descriptions: - Inspect the staged diff before suggesting a commit message. - Do not use vague subjects like "update", "improve", "refine", "misc changes", "fix stuff", or "various changes". - For large commits, write a concise subject plus a short body with 2-5 bullets summarizing the main areas changed. - The subject should describe the actual user-facing result or bug fixed, not just broad implementation activity. - Keep wording short, clear, and natural. - Never mention AI, Cursor, Zed, agents, or similar tooling in commits, branch names, PR titles, or PR descriptions. - Never add your own thoughts or questions into the commit message, the commit message is definitive in nature. Return only the commit message text. Do not wrap it in Markdown fences. """.strip() def _staged_diff_text(ctx: GitContext) -> tuple[str, bool]: result = _run_git( ctx, [ "diff", "--cached", "--no-ext-diff", "--unified=3", "--", _workspace_pathspec(ctx), ], check=True, ) diff = result.stdout or "" encoded = diff.encode("utf-8", errors="replace") if len(encoded) <= COMMIT_MESSAGE_DIFF_LIMIT: return diff, False return encoded[:COMMIT_MESSAGE_DIFF_LIMIT].decode("utf-8", errors="replace"), True def _selected_temp_index_env(ctx: GitContext, specs: list[str]) -> tuple[dict[str, str], str]: fd, index_path = tempfile.mkstemp(prefix="hermes-webui-git-index-") os.close(fd) Path(index_path).unlink(missing_ok=True) env = {"GIT_INDEX_FILE": index_path} try: head = _run_git(ctx, ["rev-parse", "--verify", "HEAD"], check=False, env=env) if head.returncode == 0: _run_git(ctx, ["read-tree", "HEAD"], check=True, env=env) else: _run_git(ctx, ["read-tree", "--empty"], check=True, env=env) _run_git(ctx, ["add", "-A", "--", *specs], check=True, env=env) return env, index_path except Exception: Path(index_path).unlink(missing_ok=True) raise def _selected_files(ctx: GitContext, paths: Iterable[str]) -> tuple[list[str], list[str], list[dict]]: requested = _clean_paths(paths) requested_specs = [_repo_rel(ctx, path) for path in requested] workspace_paths = [_workspace_rel(ctx, spec) or path for spec, path in zip(requested_specs, requested)] status = git_status(ctx.workspace) by_path = {f["path"]: f for f in status.get("files", [])} specs: list[str] = [] selected = [] for path, repo_rel in zip(workspace_paths, requested_specs): state = by_path.get(path) if not state: continue if state.get("conflict"): raise GitWorkspaceError("Resolve conflicts before committing selected files", "conflict") if state.get("staged") or state.get("unstaged") or state.get("untracked"): selected.append(state) for spec in (repo_rel, _repo_rel(ctx, state["old_path"]) if state.get("old_path") else ""): if spec and spec not in specs: specs.append(spec) if len(selected) != len(workspace_paths): raise GitWorkspaceError("Selected paths have no committable changes") return specs, workspace_paths, selected def _selected_diff_text(ctx: GitContext, specs: list[str]) -> tuple[str, bool]: env, index_path = _selected_temp_index_env(ctx, specs) try: result = _run_git( ctx, ["diff", "--cached", "--no-ext-diff", "--unified=3", "--", *specs], check=True, env=env, ) diff = result.stdout or "" encoded = diff.encode("utf-8", errors="replace") if len(encoded) <= COMMIT_MESSAGE_DIFF_LIMIT: return diff, False return encoded[:COMMIT_MESSAGE_DIFF_LIMIT].decode("utf-8", errors="replace"), True finally: Path(index_path).unlink(missing_ok=True) def selected_commit_message_prompt(workspace: str | Path, paths: Iterable[str]) -> dict: ctx = resolve_git_context(workspace) if ctx is None: raise GitWorkspaceError("Workspace is not a Git repository", "not_a_repo") specs, _workspace_paths, selected_files = _selected_files(ctx, paths) diff, truncated = _selected_diff_text(ctx, specs) if not diff.strip(): raise GitWorkspaceError("No selected diff is available") status = git_status(workspace) file_lines = [] for item in selected_files[:80]: stats = ( "binary" if item.get("binary") else f"+{item.get('additions') or 0} -{item.get('deletions') or 0}" ) file_lines.append(f"- {item.get('status') or 'M'} {item.get('path')} ({stats})") if len(selected_files) > 80: file_lines.append(f"- ... {len(selected_files) - 80} more selected file(s)") user_prompt = ( "Write a commit message for the selected Git diff below.\n\n" f"Branch: {status.get('branch') or 'HEAD'}\n" f"Selected files ({len(selected_files)}):\n" + "\n".join(file_lines) + ( "\n\nDiff was truncated for size; summarize only what is visible.\n" if truncated else "\n" ) + "\nSelected diff:\n```diff\n" + diff + "\n```" ) return { "system_prompt": COMMIT_MESSAGE_SYSTEM_PROMPT, "user_prompt": user_prompt, "truncated": truncated, "status": status, } def staged_commit_message_prompt(workspace: str | Path) -> dict: ctx = resolve_git_context(workspace) if ctx is None: raise GitWorkspaceError("Workspace is not a Git repository") status = git_status(workspace) if int((status.get("totals") or {}).get("staged") or 0) <= 0: raise GitWorkspaceError("Stage changes before generating a commit message") diff, truncated = _staged_diff_text(ctx) if not diff.strip(): raise GitWorkspaceError("No staged diff is available") staged_files = [f for f in status.get("files", []) if f.get("staged")] file_lines = [] for item in staged_files[:80]: stats = ( "binary" if item.get("binary") else f"+{item.get('additions') or 0} -{item.get('deletions') or 0}" ) file_lines.append(f"- {item.get('status') or 'M'} {item.get('path')} ({stats})") if len(staged_files) > 80: file_lines.append(f"- ... {len(staged_files) - 80} more staged file(s)") user_prompt = ( "Write a commit message for the staged Git diff below.\n\n" f"Branch: {status.get('branch') or 'HEAD'}\n" f"Staged files ({len(staged_files)}):\n" + "\n".join(file_lines) + ( "\n\nDiff was truncated for size; summarize only what is visible.\n" if truncated else "\n" ) + "\nStaged diff:\n```diff\n" + diff + "\n```" ) return { "system_prompt": COMMIT_MESSAGE_SYSTEM_PROMPT, "user_prompt": user_prompt, "truncated": truncated, "status": status, } def clean_generated_commit_message(message: str) -> str: text = str(message or "").strip() if text.startswith("```"): lines = text.splitlines() if lines and lines[0].startswith("```"): lines = lines[1:] if lines and lines[-1].strip() == "```": lines = lines[:-1] text = "\n".join(lines).strip() if (text.startswith('"') and text.endswith('"')) or ( text.startswith("'") and text.endswith("'") ): text = text[1:-1].strip() return text def git_commit(workspace: str | Path, message: str) -> dict: msg = str(message or "").strip() if not msg: raise GitWorkspaceError("Commit message is required") ctx = resolve_git_context(workspace) if ctx is None: raise GitWorkspaceError("Workspace is not a Git repository", "not_a_repo") with _git_mutation_lock(ctx): _run_git(ctx, ["commit", "-m", msg], timeout=10, check=True) sha = _run_git(ctx, ["rev-parse", "--short", "HEAD"], check=True).stdout.strip() return {"ok": True, "commit": sha, "status": git_status(workspace)} def git_commit_selected(workspace: str | Path, message: str, paths: Iterable[str]) -> dict: msg = str(message or "").strip() if not msg: raise GitWorkspaceError("Commit message is required") ctx = resolve_git_context(workspace) if ctx is None: raise GitWorkspaceError("Workspace is not a Git repository", "not_a_repo") with _git_mutation_lock(ctx): specs, workspace_paths, _selected_files_list = _selected_files(ctx, paths) env, index_path = _selected_temp_index_env(ctx, specs) try: quiet = _run_git(ctx, ["diff", "--cached", "--quiet", "--", *specs], check=False, env=env) if quiet.returncode == 0: raise GitWorkspaceError("Selected paths have no committable changes") _run_git(ctx, ["commit", "-m", msg], timeout=10, check=True, env=env) _run_git(ctx, ["reset", "-q", "HEAD", "--", *specs], check=True) finally: Path(index_path).unlink(missing_ok=True) sha = _run_git(ctx, ["rev-parse", "--short", "HEAD"], check=True).stdout.strip() return {"ok": True, "commit": sha, "paths": workspace_paths, "status": git_status(workspace)} def _branch_name(ctx: GitContext) -> str: branch = _run_git(ctx, ["branch", "--show-current"], check=True).stdout.strip() if not branch: raise GitWorkspaceError("Cannot push from a detached HEAD") return branch def _remote_message(result: subprocess.CompletedProcess[str]) -> str: return (result.stdout or result.stderr or "").strip() def git_fetch(workspace: str | Path) -> dict: ctx = resolve_git_context(workspace) if ctx is None: raise GitWorkspaceError("Workspace is not a Git repository", "not_a_repo") with _git_mutation_lock(ctx): result = _run_git(ctx, ["fetch", "--prune"], timeout=GIT_REMOTE_TIMEOUT, check=True) return {"ok": True, "message": _remote_message(result), "status": git_status(workspace)} def git_pull(workspace: str | Path) -> dict: ctx = resolve_git_context(workspace) if ctx is None: raise GitWorkspaceError("Workspace is not a Git repository", "not_a_repo") with _git_mutation_lock(ctx): result = _run_git(ctx, ["pull", "--ff-only"], timeout=GIT_REMOTE_TIMEOUT, check=True) return {"ok": True, "message": _remote_message(result), "status": git_status(workspace)} def git_push(workspace: str | Path) -> dict: ctx = resolve_git_context(workspace) if ctx is None: raise GitWorkspaceError("Workspace is not a Git repository", "not_a_repo") with _git_mutation_lock(ctx): status = git_status(workspace) args = ["push"] if not status.get("upstream"): branch = _branch_name(ctx) remotes = _run_git(ctx, ["remote"], check=True).stdout.split() if "origin" not in remotes: raise GitWorkspaceError("No upstream branch or origin remote is configured", "no_upstream") args.extend(["-u", "origin", branch]) result = _run_git(ctx, args, timeout=GIT_REMOTE_TIMEOUT, check=True) return {"ok": True, "message": _remote_message(result), "status": git_status(workspace)}