diff --git a/api/routes.py b/api/routes.py index 063226bf..fcbae668 100644 --- a/api/routes.py +++ b/api/routes.py @@ -2221,6 +2221,7 @@ from api.workspace import ( get_last_workspace, set_last_workspace, list_dir, + dir_signature, list_workspace_suggestions, read_file_content, safe_resolve_ws, @@ -4139,6 +4140,15 @@ def handle_get(handler, parsed) -> bool: if parsed.path == "/api/list": return _handle_list_dir(handler, parsed) + if parsed.path == "/api/git/status": + return _handle_git_status(handler, parsed) + + if parsed.path == "/api/git/branches": + return _handle_git_branches(handler, parsed) + + if parsed.path == "/api/git/diff": + return _handle_git_diff(handler, parsed) + if parsed.path == "/api/personalities": # Read personalities from config.yaml agent.personalities section # (matches hermes-agent CLI behavior, not filesystem SOUL.md approach) @@ -4170,9 +4180,22 @@ def handle_get(handler, parsed) -> bool: s = get_session(sid) except KeyError: return bad(handler, "Session not found", 404) - from api.workspace import git_info_for_workspace + from api.workspace_git import GitWorkspaceError, git_status - info = git_info_for_workspace(Path(s.workspace)) + try: + status = git_status(Path(s.workspace)) + except GitWorkspaceError as e: + return _git_bad(handler, e) + totals = status.get("totals") or {} + info = None if not status.get("is_git") else { + "branch": status.get("branch"), + "dirty": totals.get("changed", 0), + "modified": (totals.get("staged", 0) or 0) + (totals.get("unstaged", 0) or 0), + "untracked": totals.get("untracked", 0), + "ahead": status.get("ahead", 0), + "behind": status.get("behind", 0), + "is_git": True, + } return j(handler, {"git": info}) if parsed.path == "/api/commands": @@ -5311,6 +5334,43 @@ def handle_post(handler, parsed) -> bool: with cron_profile_context(): return _handle_cron_resume(handler, body) + # ── Git workspace ops (POST) ── + if parsed.path == "/api/git/stage": + return _handle_git_stage(handler, body) + + if parsed.path == "/api/git/unstage": + return _handle_git_unstage(handler, body) + + if parsed.path == "/api/git/discard": + return _handle_git_discard(handler, body) + + if parsed.path == "/api/git/commit-message": + return _handle_git_commit_message(handler, body) + + if parsed.path == "/api/git/commit-message-selected": + return _handle_git_commit_message_selected(handler, body) + + if parsed.path == "/api/git/commit": + return _handle_git_commit(handler, body) + + if parsed.path == "/api/git/commit-selected": + return _handle_git_commit_selected(handler, body) + + if parsed.path == "/api/git/fetch": + return _handle_git_remote_action(handler, body, "fetch") + + if parsed.path == "/api/git/pull": + return _handle_git_remote_action(handler, body, "pull") + + if parsed.path == "/api/git/push": + return _handle_git_remote_action(handler, body, "push") + + if parsed.path == "/api/git/checkout": + return _handle_git_checkout(handler, body) + + if parsed.path == "/api/git/stash-checkout": + return _handle_git_stash_checkout(handler, body) + # ── File ops (POST) ── if parsed.path == "/api/file/delete": return _handle_file_delete(handler, body) @@ -6185,11 +6245,14 @@ def _handle_list_dir(handler, parsed): except Exception: return bad(handler, "Session not found", 404) try: + rel_path = qs.get("path", ["."])[0] + entries = list_dir(Path(workspace), rel_path) return j( handler, { - "entries": list_dir(Path(workspace), qs.get("path", ["."])[0]), - "path": qs.get("path", ["."])[0], + "entries": entries, + "signature": dir_signature(Path(workspace), rel_path, entries), + "path": rel_path, }, ) except (FileNotFoundError, ValueError) as e: @@ -8657,6 +8720,484 @@ def _handle_cron_resume(handler, body): return bad(handler, "Job not found", 404) +def _git_session(handler, session_id: str): + if not session_id: + bad(handler, "session_id required") + return None + try: + return get_session(session_id) + except KeyError: + bad(handler, "Session not found", 404) + return None + + +def _git_session_workspace(handler, session_id: str): + session = _git_session(handler, session_id) + if session is None: + return None + return Path(session.workspace) + + +def _git_session_and_workspace(handler, session_id: str): + session = _git_session(handler, session_id) + if session is None: + return None, None + return session, Path(session.workspace) + + +def _git_locked_by_active_stream(session) -> bool: + stream_id = getattr(session, "active_stream_id", None) + if not stream_id: + return False + try: + from api.config import STREAMS, STREAMS_LOCK + + with STREAMS_LOCK: + return stream_id in STREAMS + except Exception: + return False + + +def _git_reject_destructive_if_unsafe(handler, session) -> bool: + from api.workspace_git import ( + GitWorkspaceError, + WORKSPACE_GIT_DESTRUCTIVE_ENV, + workspace_git_destructive_enabled, + ) + + if not workspace_git_destructive_enabled(): + _git_bad( + handler, + GitWorkspaceError( + f"Destructive workspace Git operations are disabled. Set {WORKSPACE_GIT_DESTRUCTIVE_ENV}=1 to enable them.", + "destructive_git_disabled", + ), + status=403, + ) + return True + if _git_locked_by_active_stream(session): + _git_bad( + handler, + GitWorkspaceError( + "A session run is active. Wait for it to finish before running this Git operation.", + "active_stream", + ), + status=409, + ) + return True + return False + + +def _handle_git_status(handler, parsed): + qs = parse_qs(parsed.query) + workspace = _git_session_workspace(handler, qs.get("session_id", [""])[0]) + if workspace is None: + return True + try: + from api.workspace_git import GitWorkspaceError, git_status + + return j(handler, {"git": git_status(workspace)}) + except GitWorkspaceError as e: + return _git_bad(handler, e) + + +def _handle_git_branches(handler, parsed): + qs = parse_qs(parsed.query) + workspace = _git_session_workspace(handler, qs.get("session_id", [""])[0]) + if workspace is None: + return True + try: + from api.workspace_git import GitWorkspaceError, git_branches + + return j(handler, {"branches": git_branches(workspace)}) + except GitWorkspaceError as e: + return _git_bad(handler, e) + + +def _handle_git_diff(handler, parsed): + qs = parse_qs(parsed.query) + workspace = _git_session_workspace(handler, qs.get("session_id", [""])[0]) + if workspace is None: + return True + path = qs.get("path", [""])[0] + kind = qs.get("kind", ["unstaged"])[0] + if not path: + return bad(handler, "path required") + try: + from api.workspace_git import GitWorkspaceError, git_diff + + return j(handler, {"diff": git_diff(workspace, path, kind)}) + except GitWorkspaceError as e: + return _git_bad(handler, e) + + +def _git_bad(handler, err, status: int = 400): + return j( + handler, + { + "error": _sanitize_error(err), + "code": getattr(err, "code", "git_failed") or "git_failed", + }, + status=status, + ) + + +def _git_paths_from_body(body) -> list[str]: + raw_paths = body.get("paths") + if raw_paths is None and body.get("path"): + raw_paths = [body.get("path")] + if isinstance(raw_paths, str): + raw_paths = [raw_paths] + if not isinstance(raw_paths, list): + raise ValueError("paths must be a list") + return [str(path) for path in raw_paths] + + +def _handle_git_stage(handler, body): + try: + require(body, "session_id") + paths = _git_paths_from_body(body) + session, workspace = _git_session_and_workspace(handler, body["session_id"]) + if workspace is None: + return True + if _git_reject_destructive_if_unsafe(handler, session): + return True + from api.workspace_git import GitWorkspaceError, git_stage + + return j(handler, {"ok": True, "git": git_stage(workspace, paths)}) + except ValueError as e: + return bad(handler, str(e)) + except GitWorkspaceError as e: + return _git_bad(handler, e) + + +def _handle_git_unstage(handler, body): + try: + require(body, "session_id") + paths = _git_paths_from_body(body) + session, workspace = _git_session_and_workspace(handler, body["session_id"]) + if workspace is None: + return True + if _git_reject_destructive_if_unsafe(handler, session): + return True + from api.workspace_git import GitWorkspaceError, git_unstage + + return j(handler, {"ok": True, "git": git_unstage(workspace, paths)}) + except ValueError as e: + return bad(handler, str(e)) + except GitWorkspaceError as e: + return _git_bad(handler, e) + + +def _handle_git_discard(handler, body): + try: + require(body, "session_id") + paths = _git_paths_from_body(body) + session, workspace = _git_session_and_workspace(handler, body["session_id"]) + if workspace is None: + return True + if _git_reject_destructive_if_unsafe(handler, session): + return True + from api.workspace_git import GitWorkspaceError, git_discard + + return j( + handler, + { + "ok": True, + "git": git_discard( + workspace, + paths, + delete_untracked=bool(body.get("delete_untracked")), + ), + }, + ) + except ValueError as e: + return bad(handler, str(e)) + except GitWorkspaceError as e: + return _git_bad(handler, e) + + +def _llm_git_commit_message(system_prompt: str, user_prompt: str, session=None) -> str: + from api import profiles as profiles_api + + active_profile = profiles_api.get_active_profile_name() or "default" + with profiles_api.profile_env_for_background_worker( + active_profile, + "git commit message", + logger_override=logger, + ): + from api.config import ( + get_effective_default_model, + model_with_provider_context, + resolve_custom_provider_connection, + resolve_model_provider, + ) + + session_model = str(getattr(session, "model", "") or "").strip() + session_provider = str(getattr(session, "model_provider", "") or "").strip() or None + model_for_resolution = ( + model_with_provider_context(session_model, session_provider) + if session_model + else get_effective_default_model() + ) + _main_model, _main_provider, _main_base_url = resolve_model_provider(model_for_resolution) + _main_api_key = None + try: + from api.oauth import resolve_runtime_provider_with_anthropic_env_lock + from hermes_cli.runtime_provider import resolve_runtime_provider + + _rt = resolve_runtime_provider_with_anthropic_env_lock( + resolve_runtime_provider, + requested=_main_provider, + ) + _main_api_key = _rt.get("api_key") + if not _main_provider: + _main_provider = _rt.get("provider") + if not _main_base_url: + _main_base_url = _rt.get("base_url") + except Exception as _e: + logger.debug("git commit message runtime provider resolution failed: %s", _e) + if isinstance(_main_provider, str) and _main_provider.startswith("custom:"): + _cp_key, _cp_base = resolve_custom_provider_connection(_main_provider) + if not _main_api_key and _cp_key: + _main_api_key = _cp_key + if not _main_base_url and _cp_base: + _main_base_url = _cp_base + + messages = [ + {"role": "system", "content": system_prompt}, + {"role": "user", "content": user_prompt}, + ] + main_runtime = { + "provider": _main_provider, + "model": _main_model, + "base_url": _main_base_url, + "api_key": _main_api_key, + } + try: + from agent.auxiliary_client import get_text_auxiliary_client + + aux_client, aux_model = get_text_auxiliary_client( + "compression", + main_runtime=main_runtime, + ) + if aux_client is not None and aux_model: + response = aux_client.chat.completions.create( + model=aux_model, + messages=messages, + ) + return str(response.choices[0].message.content or "").strip() + except Exception as _e: + logger.debug("git commit message auxiliary model failed; falling back to main model: %s", _e) + + from run_agent import AIAgent + + agent = AIAgent( + model=_main_model, + provider=_main_provider, + base_url=_main_base_url, + api_key=_main_api_key, + platform="webui", + quiet_mode=True, + enabled_toolsets=[], + session_id=f"git-commit-message-{uuid.uuid4().hex[:8]}", + ) + result = agent.run_conversation( + user_message=user_prompt, + system_message=system_prompt, + conversation_history=[], + task_id=f"git-commit-message-{uuid.uuid4().hex[:8]}", + ) + return str(result.get("final_response") or "").strip() + + +def _handle_git_commit_message(handler, body): + from api.workspace_git import ( + GitWorkspaceError, + clean_generated_commit_message, + staged_commit_message_prompt, + ) + + try: + require(body, "session_id") + session = get_session(body["session_id"]) + workspace = Path(session.workspace) + + prompt = staged_commit_message_prompt(workspace) + message = clean_generated_commit_message( + _llm_git_commit_message(prompt["system_prompt"], prompt["user_prompt"], session=session) + ) + if not message: + raise GitWorkspaceError("No commit message was generated") + return j(handler, {"ok": True, "message": message, "truncated": bool(prompt.get("truncated"))}) + except KeyError: + return bad(handler, "Session not found", 404) + except ValueError as e: + return bad(handler, str(e)) + except GitWorkspaceError as e: + return _git_bad(handler, e) + except Exception as e: + logger.exception("git commit message generation failed") + return bad(handler, _sanitize_error(e), 500) + + +def _handle_git_commit_message_selected(handler, body): + from api.workspace_git import ( + GitWorkspaceError, + clean_generated_commit_message, + selected_commit_message_prompt, + ) + + try: + require(body, "session_id") + paths = _git_paths_from_body(body) + session = get_session(body["session_id"]) + workspace = Path(session.workspace) + + prompt = selected_commit_message_prompt(workspace, paths) + message = clean_generated_commit_message( + _llm_git_commit_message(prompt["system_prompt"], prompt["user_prompt"], session=session) + ) + if not message: + raise GitWorkspaceError("No commit message was generated") + return j(handler, {"ok": True, "message": message, "truncated": bool(prompt.get("truncated"))}) + except KeyError: + return bad(handler, "Session not found", 404) + except ValueError as e: + return bad(handler, str(e)) + except GitWorkspaceError as e: + return _git_bad(handler, e) + except Exception as e: + logger.exception("selected git commit message generation failed") + return bad(handler, _sanitize_error(e), 500) + + +def _handle_git_commit(handler, body): + try: + require(body, "session_id", "message") + session, workspace = _git_session_and_workspace(handler, body["session_id"]) + if workspace is None: + return True + if _git_reject_destructive_if_unsafe(handler, session): + return True + from api.workspace_git import GitWorkspaceError, git_commit + + return j(handler, git_commit(workspace, body.get("message", ""))) + except ValueError as e: + return bad(handler, str(e)) + except GitWorkspaceError as e: + return _git_bad(handler, e) + + +def _handle_git_commit_selected(handler, body): + try: + require(body, "session_id", "message") + paths = _git_paths_from_body(body) + session, workspace = _git_session_and_workspace(handler, body["session_id"]) + if workspace is None: + return True + if _git_reject_destructive_if_unsafe(handler, session): + return True + from api.workspace_git import GitWorkspaceError, git_commit_selected + + return j(handler, git_commit_selected(workspace, body.get("message", ""), paths)) + except ValueError as e: + return bad(handler, str(e)) + except GitWorkspaceError as e: + return _git_bad(handler, e) + + +def _handle_git_remote_action(handler, body, action: str): + try: + require(body, "session_id") + session, workspace = _git_session_and_workspace(handler, body["session_id"]) + if workspace is None: + return True + if action in {"pull", "push"} and _git_reject_destructive_if_unsafe(handler, session): + return True + from api.workspace_git import GitWorkspaceError, git_fetch, git_pull, git_push + + actions = { + "fetch": git_fetch, + "pull": git_pull, + "push": git_push, + } + return j(handler, actions[action](workspace)) + except ValueError as e: + return bad(handler, str(e)) + except GitWorkspaceError as e: + return _git_bad(handler, e) + + +def _handle_git_checkout(handler, body): + try: + require(body, "session_id", "ref", "mode") + session, workspace = _git_session_and_workspace(handler, body["session_id"]) + if workspace is None: + return True + if _git_reject_destructive_if_unsafe(handler, session): + return True + from api.workspace_git import GitWorkspaceError, git_checkout + + result = git_checkout( + workspace, + str(body.get("ref", "")), + str(body.get("mode", "local")), + new_branch=body.get("new_branch"), + track=bool(body.get("track")), + dirty_mode=str(body.get("dirty_mode", "block")), + ) + return j( + handler, + { + "ok": True, + "git": result.get("status"), + "branches": result.get("branches"), + "current_branch": result.get("current_branch"), + "message": result.get("message", ""), + }, + ) + except ValueError as e: + return bad(handler, str(e)) + except GitWorkspaceError as e: + return _git_bad(handler, e) + + +def _handle_git_stash_checkout(handler, body): + try: + require(body, "session_id", "ref", "mode") + session, workspace = _git_session_and_workspace(handler, body["session_id"]) + if workspace is None: + return True + if _git_reject_destructive_if_unsafe(handler, session): + return True + from api.workspace_git import GitWorkspaceError, git_stash_and_checkout + + result = git_stash_and_checkout( + workspace, + str(body.get("ref", "")), + str(body.get("mode", "local")), + new_branch=body.get("new_branch"), + track=bool(body.get("track")), + ) + return j( + handler, + { + "ok": True, + "git": result.get("status"), + "branches": result.get("branches"), + "current_branch": result.get("current_branch"), + "message": result.get("message", ""), + "stash_name": result.get("stash_name", ""), + "stashed": bool(result.get("stashed")), + }, + ) + except ValueError as e: + return bad(handler, str(e)) + except GitWorkspaceError as e: + return _git_bad(handler, e) + + def _handle_file_delete(handler, body): try: require(body, "session_id", "path") diff --git a/api/workspace.py b/api/workspace.py index 97c768f6..362b9e87 100644 --- a/api/workspace.py +++ b/api/workspace.py @@ -7,6 +7,7 @@ profile has its own workspace configuration. State files live at ``{profile_home}/webui_state/last_workspace.txt``. The global STATE_DIR paths are used as fallback when no profile module is available. """ +import hashlib import json import logging import os @@ -714,12 +715,18 @@ def list_dir(workspace: Path, rel: str='.'): display_path = str(Path(item.name)) if rel and rel != '.': display_path = rel + '/' + display_path + try: + item_stat = item.lstat() + mtime_ns = item_stat.st_mtime_ns + except OSError: + mtime_ns = None entry = { 'name': item.name, 'path': display_path, 'type': 'symlink', 'target': str(link_target), 'is_dir': is_dir, + 'mtime_ns': mtime_ns, } if not is_dir: try: @@ -733,17 +740,49 @@ def list_dir(workspace: Path, rel: str='.'): entry_path = item.name if rel and rel != '.': entry_path = rel + '/' + item.name + try: + item_stat = item.stat() + size = item_stat.st_size if item.is_file() else None + mtime_ns = item_stat.st_mtime_ns + except OSError: + size = None + mtime_ns = None entries.append({ 'name': item.name, 'path': entry_path, 'type': 'dir' if item.is_dir() else 'file', - 'size': item.stat().st_size if item.is_file() else None, + 'size': size, + 'mtime_ns': mtime_ns, }) if len(entries) >= 200: break return entries +def dir_signature(workspace: Path, rel: str = '.', entries: list[dict] | None = None) -> str: + """Return a cheap, stable signature for a listed workspace directory. + + The signature is based only on bounded directory-entry metadata already used + by the workspace tree: names, displayed paths, entry type, file sizes, + mtimes, and symlink targets. It intentionally does not read file contents. + """ + if entries is None: + entries = list_dir(workspace, rel) + payload = [] + for entry in entries: + payload.append({ + 'name': entry.get('name'), + 'path': entry.get('path'), + 'type': entry.get('type'), + 'is_dir': entry.get('is_dir'), + 'size': entry.get('size'), + 'mtime_ns': entry.get('mtime_ns'), + 'target': entry.get('target'), + }) + raw = json.dumps(payload, sort_keys=True, separators=(',', ':'), ensure_ascii=False) + return hashlib.sha256(raw.encode('utf-8')).hexdigest() + + def read_file_content(workspace: Path, rel: str) -> dict: target = safe_resolve_ws(workspace, rel) if not target.is_file(): diff --git a/api/workspace_git.py b/api/workspace_git.py new file mode 100644 index 00000000..fbef17f6 --- /dev/null +++ b/api/workspace_git.py @@ -0,0 +1,1158 @@ +"""Git helpers for the workspace panel. + +The browser only sends session ids and workspace-relative paths. This module +resolves the active workspace server-side, scopes paths before they become Git +pathspecs, and keeps all Git subprocess calls shell-free and bounded. +""" + +from __future__ import annotations + +import difflib +import os +import shutil +import subprocess +import tempfile +import threading +from contextlib import contextmanager +from dataclasses import dataclass +from pathlib import Path +from typing import Iterable + +from api.workspace import safe_resolve_ws + + +GIT_TIMEOUT = 5 +GIT_REMOTE_TIMEOUT = 60 +STATUS_FILE_LIMIT = 500 +DIFF_SIZE_LIMIT = 512 * 1024 +COMMIT_MESSAGE_DIFF_LIMIT = 64 * 1024 +WORKSPACE_GIT_DESTRUCTIVE_ENV = "HERMES_WEBUI_WORKSPACE_GIT_DESTRUCTIVE" +_GIT_ENV_SCRUB_KEYS = ("GIT_DIR", "GIT_WORK_TREE", "GIT_CONFIG_GLOBAL") + + +def workspace_git_destructive_enabled() -> bool: + return os.getenv(WORKSPACE_GIT_DESTRUCTIVE_ENV, "").strip().lower() in { + "1", + "true", + "yes", + "on", + } + + +def _clean_git_env(extra: dict[str, str] | None = None) -> dict[str, str]: + env = os.environ.copy() + if extra: + env.update(extra) + for key in _GIT_ENV_SCRUB_KEYS: + env.pop(key, None) + return env + + +class GitWorkspaceError(RuntimeError): + """User-facing Git operation error.""" + + def __init__(self, message: str, code: str = "git_failed"): + super().__init__(message) + self.code = code + + +@dataclass(frozen=True) +class GitContext: + workspace: Path + repo_root: Path + workspace_prefix: str + + +_LOCKS_GUARD = threading.Lock() +_OP_LOCKS: dict[str, threading.Lock] = {} + + +@contextmanager +def _git_mutation_lock(ctx: GitContext): + key = str(ctx.repo_root) + with _LOCKS_GUARD: + lock = _OP_LOCKS.setdefault(key, threading.Lock()) + if not lock.acquire(timeout=GIT_REMOTE_TIMEOUT): + raise GitWorkspaceError("Another Git operation is still running", "operation_in_progress") + try: + yield + finally: + lock.release() + + +def _classify_git_error(message: str, args: list[str] | None = None) -> str: + text = (message or "").lower() + joined = " ".join(args or []).lower() + if "timed out" in text: + return "timeout" + if "not installed" in text or "no such file or directory: 'git'" in text: + return "missing_git" + if "not a git repository" in text: + return "not_a_repo" + if "outside the workspace" in text or "outside the git repository" in text: + return "path_outside_workspace" + if "authentication failed" in text or "permission denied" in text or "could not read username" in text: + return "auth_failed" + if "no upstream" in text or "no configured push destination" in text or "has no upstream branch" in text: + return "no_upstream" + if ( + "non-fast-forward" in text + or "fetch first" in text + or ("rejected" in text and "push" in joined) + ): + return "non_fast_forward" + if "conflict" in text or "unmerged" in text or ("merge" in text and "needs" in text): + return "conflict" + if "working tree" in text and ("clean" in text or "dirty" in text): + return "dirty_worktree" + if "local changes" in text or "would be overwritten by checkout" in text: + return "dirty_worktree" + if "invalid reference" in text or "not a valid" in text or "unknown revision" in text: + return "invalid_ref" + if "hook" in text: + return "hook_failed" + return "git_failed" + + +def _run_git( + ctx_or_cwd: GitContext | Path, + args: list[str], + *, + timeout: int = GIT_TIMEOUT, + check: bool = False, + env: dict[str, str] | None = None, +) -> subprocess.CompletedProcess[str]: + cwd = ctx_or_cwd.repo_root if isinstance(ctx_or_cwd, GitContext) else ctx_or_cwd + run_env = _clean_git_env(env) + try: + result = subprocess.run( + ["git", *args], + cwd=str(cwd), + shell=False, + capture_output=True, + text=True, + timeout=timeout, + env=run_env, + ) + except subprocess.TimeoutExpired as exc: + raise GitWorkspaceError("Git command timed out", "timeout") from exc + except FileNotFoundError as exc: + raise GitWorkspaceError("Git is not installed or not available on PATH", "missing_git") from exc + except OSError as exc: + raise GitWorkspaceError(str(exc), _classify_git_error(str(exc), args)) from exc + if check and result.returncode != 0: + message = (result.stderr or result.stdout or "Git command failed").strip() + raise GitWorkspaceError(message, _classify_git_error(message, args)) + return result + + +def resolve_git_context(workspace: str | Path) -> GitContext | None: + ws = Path(workspace).expanduser().resolve() + result = _run_git(ws, ["rev-parse", "--show-toplevel"], check=False) + if result.returncode != 0: + return None + repo_root = Path(result.stdout.strip()).resolve() + try: + prefix = ws.relative_to(repo_root).as_posix() + except ValueError: + return None + return GitContext(workspace=ws, repo_root=repo_root, workspace_prefix="" if prefix == "." else prefix) + + +def _workspace_pathspec(ctx: GitContext) -> str: + return ctx.workspace_prefix or "." + + +def _repo_rel(ctx: GitContext, workspace_rel: str) -> str: + try: + target = safe_resolve_ws(ctx.workspace, workspace_rel or ".") + except ValueError as exc: + raise GitWorkspaceError(str(exc), "path_outside_workspace") from exc + try: + repo_rel = target.relative_to(ctx.repo_root).as_posix() + except ValueError as exc: + raise GitWorkspaceError("Path is outside the Git repository", "path_outside_workspace") from exc + if ctx.workspace_prefix: + try: + target.relative_to(ctx.workspace) + except ValueError as exc: + raise GitWorkspaceError("Path is outside the workspace", "path_outside_workspace") from exc + return repo_rel + + +def _workspace_rel(ctx: GitContext, repo_rel: str) -> str | None: + repo_rel = repo_rel.replace("\\", "/") + if not ctx.workspace_prefix: + return repo_rel + prefix = ctx.workspace_prefix.rstrip("/") + "/" + if repo_rel == ctx.workspace_prefix: + return "." + if repo_rel.startswith(prefix): + return repo_rel[len(prefix) :] + return None + + +def _empty_status() -> dict: + return { + "changed": 0, + "staged": 0, + "unstaged": 0, + "untracked": 0, + "conflicts": 0, + } + + +def _status_code(xy: str, *, untracked: bool = False, renamed: bool = False) -> str: + if untracked: + return "??" + if xy in {"DD", "AU", "UD", "UA", "DU", "AA", "UU"}: + return xy + if renamed: + return "R" + for ch in xy: + if ch in "MADRCUT": + return ch + return xy.strip(".") or "M" + + +def _parse_numstat(text: str, ctx: GitContext) -> dict[str, tuple[int, int, bool]]: + stats: dict[str, tuple[int, int, bool]] = {} + for line in text.splitlines(): + parts = line.split("\t", 2) + if len(parts) < 3: + continue + raw_add, raw_del, raw_path = parts + binary = raw_add == "-" or raw_del == "-" + additions = 0 if binary else int(raw_add or "0") + deletions = 0 if binary else int(raw_del or "0") + workspace_path = _workspace_rel(ctx, raw_path) + if workspace_path is None: + continue + stats[workspace_path] = (additions, deletions, binary) + return stats + + +def _parse_path_list(text: str, ctx: GitContext) -> set[str]: + paths: set[str] = set() + for raw_path in text.split("\0"): + if not raw_path: + continue + workspace_path = _workspace_rel(ctx, raw_path) + if workspace_path is not None: + paths.add(workspace_path) + return paths + + +def _collect_diff_paths(ctx: GitContext, cached: bool) -> set[str] | None: + args = ["diff", "--name-only", "-z", "--ignore-cr-at-eol"] + if cached: + args.append("--cached") + args.extend(["--", _workspace_pathspec(ctx)]) + result = _run_git(ctx, args, check=False) + if result.returncode != 0: + return None + return _parse_path_list(result.stdout, ctx) + + +def _collect_numstat(ctx: GitContext, cached: bool) -> dict[str, tuple[int, int, bool]]: + args = ["diff", "--numstat", "--ignore-cr-at-eol"] + if cached: + args.append("--cached") + args.extend(["--", _workspace_pathspec(ctx)]) + result = _run_git(ctx, args, check=False) + if result.returncode != 0: + return {} + return _parse_numstat(result.stdout, ctx) + + +def _count_untracked_file(path: Path) -> tuple[int, int, bool]: + try: + if not path.is_file() or path.stat().st_size > DIFF_SIZE_LIMIT: + return 0, 0, False + except OSError: + return 0, 0, False + try: + data = path.read_bytes() + except OSError: + return 0, 0, False + if b"\0" in data: + return 0, 0, True + try: + text = data.decode("utf-8") + except UnicodeDecodeError: + return 0, 0, True + return len(text.splitlines()) or (1 if text else 0), 0, False + + +def git_status(workspace: str | Path) -> dict: + ctx = resolve_git_context(workspace) + if ctx is None: + return {"is_git": False} + + result = _run_git( + ctx, + [ + "status", + "--porcelain=v2", + "-z", + "--branch", + "--ignored=matching", + "--untracked-files=all", + "--", + _workspace_pathspec(ctx), + ], + check=True, + ) + staged_stats = _collect_numstat(ctx, cached=True) + unstaged_stats = _collect_numstat(ctx, cached=False) + staged_diff_paths = _collect_diff_paths(ctx, cached=True) + unstaged_diff_paths = _collect_diff_paths(ctx, cached=False) + + branch = "" + upstream = "" + ahead = 0 + behind = 0 + files: dict[str, dict] = {} + filtered_noise = {"filemode_only": 0, "crlf_only": 0} + tokens = result.stdout.split("\0") + i = 0 + truncated = False + while i < len(tokens): + rec = tokens[i] + i += 1 + if not rec: + continue + if rec.startswith("# "): + parts = rec.split(" ", 2) + if len(parts) >= 3 and parts[1] == "branch.head": + branch = "" if parts[2] == "(detached)" else parts[2] + elif len(parts) >= 3 and parts[1] == "branch.upstream": + upstream = parts[2] + elif len(parts) >= 3 and parts[1] == "branch.ab": + for bit in parts[2].split(): + if bit.startswith("+") and bit[1:].isdigit(): + ahead = int(bit[1:]) + elif bit.startswith("-") and bit[1:].isdigit(): + behind = int(bit[1:]) + continue + + old_path = None + renamed = False + if rec.startswith("? "): + xy = "??" + repo_path = rec[2:] + untracked = True + ignored = False + elif rec.startswith("! "): + xy = "!!" + repo_path = rec[2:] + untracked = False + ignored = True + elif rec.startswith("1 "): + parts = rec.split(" ", 8) + if len(parts) < 9: + continue + xy = parts[1] + repo_path = parts[8] + untracked = False + ignored = False + elif rec.startswith("2 "): + parts = rec.split(" ", 9) + if len(parts) < 10: + continue + xy = parts[1] + repo_path = parts[9] + if i < len(tokens): + old_path = tokens[i] + i += 1 + renamed = True + untracked = False + ignored = False + elif rec.startswith("u "): + parts = rec.split(" ", 10) + if len(parts) < 11: + continue + xy = parts[1] + repo_path = parts[10] + untracked = False + ignored = False + else: + continue + + workspace_path = _workspace_rel(ctx, repo_path) + if workspace_path is None: + continue + old_workspace_path = _workspace_rel(ctx, old_path) if old_path else None + x = xy[0] if xy else "." + y = xy[1] if len(xy) > 1 else "." + conflict = xy in {"DD", "AU", "UD", "UA", "DU", "AA", "UU"} or rec.startswith("u ") + additions, deletions, binary = 0, 0, False + for source in (staged_stats, unstaged_stats): + if workspace_path in source: + a, d, b = source[workspace_path] + additions += a + deletions += d + binary = binary or b + if untracked: + additions, deletions, binary = _count_untracked_file(ctx.workspace / workspace_path) + + staged = (x not in {".", "?"}) and not untracked + unstaged = (y not in {".", " "}) and not untracked + if staged and staged_diff_paths is not None and not renamed: + raw_staged = staged + staged = workspace_path in staged_diff_paths or ( + old_workspace_path is not None and old_workspace_path in staged_diff_paths + ) + if raw_staged and not staged: + filtered_noise["filemode_only"] += 1 + if unstaged and unstaged_diff_paths is not None and not renamed: + raw_unstaged = unstaged + unstaged = workspace_path in unstaged_diff_paths or ( + old_workspace_path is not None and old_workspace_path in unstaged_diff_paths + ) + if raw_unstaged and not unstaged: + filtered_noise["filemode_only"] += 1 + if ignored: + files[workspace_path] = { + "path": workspace_path, + "old_path": None, + "workspace_path": workspace_path, + "status": "Ignored", + "staged": False, + "unstaged": False, + "untracked": False, + "ignored": True, + "conflict": False, + "additions": 0, + "deletions": 0, + "binary": False, + } + if len(files) >= STATUS_FILE_LIMIT: + truncated = True + break + continue + + if not (staged or unstaged or untracked or conflict or renamed): + continue + if not (untracked or conflict or renamed or binary) and additions == 0 and deletions == 0: + filtered_noise["crlf_only"] += 1 + continue + + files[workspace_path] = { + "path": workspace_path, + "old_path": old_workspace_path, + "workspace_path": workspace_path, + "status": _status_code(xy, untracked=untracked, renamed=renamed), + "staged": staged, + "unstaged": unstaged, + "untracked": untracked, + "ignored": False, + "conflict": conflict, + "additions": additions, + "deletions": deletions, + "binary": binary, + } + if len(files) >= STATUS_FILE_LIMIT: + truncated = True + break + + file_list = sorted(files.values(), key=lambda f: (f["path"].lower())) + totals = _empty_status() + for item in file_list: + if item.get("ignored"): + continue + if item["staged"]: + totals["staged"] += 1 + if item["unstaged"]: + totals["unstaged"] += 1 + if item["untracked"]: + totals["untracked"] += 1 + if item["conflict"]: + totals["conflicts"] += 1 + totals["changed"] = sum(1 for item in file_list if not item.get("ignored")) + + if not branch: + branch = (_run_git(ctx, ["rev-parse", "--short", "HEAD"], check=False).stdout or "").strip() + return { + "is_git": True, + "branch": branch or "HEAD", + "upstream": upstream, + "ahead": ahead, + "behind": behind, + "totals": totals, + "files": file_list, + "truncated": truncated, + "noise_filtering": { + **filtered_noise, + "active": any(filtered_noise.values()), + }, + } + + +def _branch_ahead_behind(ctx: GitContext, branch: str, upstream: str) -> tuple[int, int]: + if not upstream: + return 0, 0 + result = _run_git(ctx, ["rev-list", "--left-right", "--count", f"{branch}...{upstream}"], check=False) + if result.returncode != 0: + return 0, 0 + parts = result.stdout.strip().split() + if len(parts) != 2: + return 0, 0 + try: + return int(parts[0]), int(parts[1]) + except ValueError: + return 0, 0 + + +def _for_each_ref(ctx: GitContext, ref_prefix: str) -> list[dict]: + fmt = ( + "%(refname)%00%(refname:short)%00%(upstream:short)%00%(objectname:short)%00" + "%(committerdate:unix)%00%(committerdate:relative)%00%(authorname)%00%(subject)" + ) + result = _run_git(ctx, ["for-each-ref", f"--format={fmt}", ref_prefix], check=True) + refs = [] + for line in result.stdout.splitlines(): + full_name, name, upstream, sha, updated, updated_relative, author, subject = ( + line.split("\0") + ["", "", "", "", "", "", "", ""] + )[:8] + if not name or full_name.endswith("/HEAD") or name.endswith("/HEAD"): + continue + if ref_prefix == "refs/remotes" and "/" not in name: + continue + item = { + "name": name, + "sha": sha, + "updated": int(updated) if str(updated).isdigit() else 0, + "updated_relative": updated_relative, + "author": author, + "subject": subject, + } + if upstream: + ahead, behind = _branch_ahead_behind(ctx, name, upstream) + item.update({"upstream": upstream, "ahead": ahead, "behind": behind}) + else: + item.update({"upstream": "", "ahead": 0, "behind": 0}) + refs.append(item) + return sorted(refs, key=lambda item: item["name"].lower()) + + +def git_branches(workspace: str | Path) -> dict: + ctx = resolve_git_context(workspace) + if ctx is None: + raise GitWorkspaceError("Workspace is not a Git repository", "not_a_repo") + head_name = _run_git(ctx, ["branch", "--show-current"], check=True).stdout.strip() + detached = not bool(head_name) + head_sha = _run_git(ctx, ["rev-parse", "--short", "HEAD"], check=True).stdout.strip() + status = git_status(workspace) + local = _for_each_ref(ctx, "refs/heads") + remote = _for_each_ref(ctx, "refs/remotes") + return { + "is_git": True, + "current": head_name or head_sha or "HEAD", + "detached": detached, + "head": head_sha, + "local": local, + "remote": remote, + "upstream": status.get("upstream", ""), + "ahead": status.get("ahead", 0), + "behind": status.get("behind", 0), + } + + +def _validate_local_branch(ctx: GitContext, ref: str) -> str: + ref = str(ref or "").strip() + if not ref: + raise GitWorkspaceError("Branch name is required", "invalid_ref") + _run_git(ctx, ["show-ref", "--verify", f"refs/heads/{ref}"], check=True) + return ref + + +def _validate_remote_branch(ctx: GitContext, ref: str) -> str: + ref = str(ref or "").strip() + if not ref: + raise GitWorkspaceError("Remote branch name is required", "invalid_ref") + _run_git(ctx, ["show-ref", "--verify", f"refs/remotes/{ref}"], check=True) + return ref + + +def _validate_checkout_start(ctx: GitContext, ref: str) -> str: + ref = str(ref or "HEAD").strip() or "HEAD" + result = _run_git(ctx, ["rev-parse", "--verify", f"{ref}^{{commit}}"], check=False) + if result.returncode != 0: + raise GitWorkspaceError("Invalid checkout reference", "invalid_ref") + return ref + + +def _validate_new_branch_name(ctx: GitContext, name: str) -> str: + name = str(name or "").strip() + if not name: + raise GitWorkspaceError("New branch name is required", "invalid_ref") + result = _run_git(ctx, ["check-ref-format", "--branch", name], check=False) + if result.returncode != 0: + raise GitWorkspaceError("Invalid branch name", "invalid_ref") + exists = _run_git(ctx, ["show-ref", "--verify", f"refs/heads/{name}"], check=False) + if exists.returncode == 0: + raise GitWorkspaceError("A local branch with that name already exists", "invalid_ref") + return name + + +def _dirty_worktree(ctx: GitContext) -> bool: + result = _run_git(ctx, ["status", "--porcelain=v2", "--untracked-files=all"], check=True) + return bool(result.stdout.strip()) + + +def _validate_checkout_request_locked( + ctx: GitContext, + ref: str, + mode: str, + new_branch: str | None, +) -> None: + if mode == "local": + _validate_local_branch(ctx, ref) + return + if mode in {"new", "create"}: + _validate_new_branch_name(ctx, new_branch or ref) + _validate_checkout_start(ctx, ref if (new_branch and ref and ref != new_branch) else "HEAD") + return + if mode == "remote": + remote_ref = _validate_remote_branch(ctx, ref) + branch_name = str(new_branch or remote_ref.split("/", 1)[-1]).strip() + exists = _run_git(ctx, ["show-ref", "--verify", f"refs/heads/{branch_name}"], check=False) + if exists.returncode != 0: + _validate_new_branch_name(ctx, branch_name) + return + if mode in {"detached", "detach"}: + _validate_checkout_start(ctx, ref) + return + raise GitWorkspaceError("Unsupported checkout mode", "invalid_ref") + + +def _perform_checkout_locked( + ctx: GitContext, + workspace: str | Path, + ref: str, + mode: str, + new_branch: str | None, + track: bool, +) -> subprocess.CompletedProcess[str]: + if mode == "local": + target = _validate_local_branch(ctx, ref) + return _run_git(ctx, ["switch", target], check=True) + if mode in {"new", "create"}: + branch = _validate_new_branch_name(ctx, new_branch or ref) + start_ref = _validate_checkout_start(ctx, ref if (new_branch and ref and ref != new_branch) else "HEAD") + return _run_git(ctx, ["switch", "-c", branch, start_ref], check=True) + if mode == "remote": + remote_ref = _validate_remote_branch(ctx, ref) + branch_name = str(new_branch or remote_ref.split("/", 1)[-1]).strip() + exists = _run_git(ctx, ["show-ref", "--verify", f"refs/heads/{branch_name}"], check=False) + if exists.returncode == 0: + result = _run_git(ctx, ["switch", branch_name], check=True) + if track: + _run_git(ctx, ["branch", "--set-upstream-to", remote_ref, branch_name], check=False) + return result + branch = _validate_new_branch_name(ctx, branch_name) + args = ["switch", "-c", branch] + if track: + args.append("--track") + args.append(remote_ref) + return _run_git(ctx, args, check=True) + if mode in {"detached", "detach"}: + target = _validate_checkout_start(ctx, ref) + return _run_git(ctx, ["switch", "--detach", target], check=True) + raise GitWorkspaceError("Unsupported checkout mode", "invalid_ref") + + +def git_checkout( + workspace: str | Path, + ref: str, + mode: str, + new_branch: str | None = None, + track: bool = False, + dirty_mode: str = "block", +) -> dict: + ctx = resolve_git_context(workspace) + if ctx is None: + raise GitWorkspaceError("Workspace is not a Git repository", "not_a_repo") + mode = str(mode or "local").strip().lower() + dirty_mode = str(dirty_mode or "block").strip().lower() + if dirty_mode != "block": + raise GitWorkspaceError("Only dirty_mode=block is supported for branch checkout", "dirty_worktree") + with _git_mutation_lock(ctx): + _validate_checkout_request_locked(ctx, ref, mode, new_branch) + if _dirty_worktree(ctx): + raise GitWorkspaceError( + "Checkout blocked because the Git worktree has uncommitted changes", + "dirty_worktree", + ) + result = _perform_checkout_locked(ctx, workspace, ref, mode, new_branch, track) + status = git_status(workspace) + branches = git_branches(workspace) + return { + "ok": True, + "message": _remote_message(result), + "current_branch": branches.get("current"), + "status": status, + "branches": branches, + } + + +def git_stash_and_checkout( + workspace: str | Path, + ref: str, + mode: str, + new_branch: str | None = None, + track: bool = False, +) -> dict: + ctx = resolve_git_context(workspace) + if ctx is None: + raise GitWorkspaceError("Workspace is not a Git repository", "not_a_repo") + mode = str(mode or "local").strip().lower() + stash_name = f"hermes-webui branch switch {ref}".strip() + with _git_mutation_lock(ctx): + _validate_checkout_request_locked(ctx, ref, mode, new_branch) + stashed = False + stash_result = _run_git(ctx, ["stash", "push", "-u", "-m", stash_name], check=True) + stash_text = _remote_message(stash_result) + if "No local changes to save" not in stash_text: + stashed = True + result = _perform_checkout_locked(ctx, workspace, ref, mode, new_branch, track) + status = git_status(workspace) + branches = git_branches(workspace) + return { + "ok": True, + "message": _remote_message(result), + "stash_name": stash_name if stashed else "", + "stashed": stashed, + "current_branch": branches.get("current"), + "status": status, + "branches": branches, + } + + +def _diff_stats(diff_text: str) -> tuple[int, int]: + additions = deletions = 0 + for line in diff_text.splitlines(): + if line.startswith("+++") or line.startswith("---"): + continue + if line.startswith("+"): + additions += 1 + elif line.startswith("-"): + deletions += 1 + return additions, deletions + + +def _synthetic_untracked_diff(path: Path, label: str) -> dict: + try: + if not path.is_file(): + raise GitWorkspaceError("Path is not a file") + if path.stat().st_size > DIFF_SIZE_LIMIT: + return { + "binary": False, + "too_large": True, + "diff": "", + "additions": 0, + "deletions": 0, + } + except OSError as exc: + raise GitWorkspaceError(str(exc)) from exc + try: + data = path.read_bytes() + except OSError as exc: + raise GitWorkspaceError(str(exc)) from exc + if b"\0" in data: + return {"binary": True, "too_large": False, "diff": "", "additions": 0, "deletions": 0} + try: + text = data.decode("utf-8") + except UnicodeDecodeError: + return {"binary": True, "too_large": False, "diff": "", "additions": 0, "deletions": 0} + lines = text.splitlines() + diff_lines = list( + difflib.unified_diff([], lines, fromfile="/dev/null", tofile=f"b/{label}", lineterm="") + ) + diff = "\n".join(diff_lines) + ("\n" if diff_lines else "") + too_large = len(diff.encode("utf-8", errors="replace")) > DIFF_SIZE_LIMIT + if too_large: + diff = diff[:DIFF_SIZE_LIMIT] + additions, deletions = _diff_stats(diff) + return { + "binary": False, + "too_large": too_large, + "diff": diff, + "additions": additions, + "deletions": deletions, + } + + +def git_diff(workspace: str | Path, path: str, kind: str = "unstaged") -> dict: + ctx = resolve_git_context(workspace) + if ctx is None: + raise GitWorkspaceError("Workspace is not a Git repository") + if kind not in {"unstaged", "staged"}: + raise GitWorkspaceError("kind must be staged or unstaged") + repo_rel = _repo_rel(ctx, path) + workspace_rel = _workspace_rel(ctx, repo_rel) or path + + status = git_status(workspace) + file_state = next((f for f in status.get("files", []) if f.get("path") == workspace_rel), None) + if kind == "unstaged" and file_state and file_state.get("untracked"): + payload = _synthetic_untracked_diff(ctx.workspace / workspace_rel, workspace_rel) + return {"path": workspace_rel, "kind": kind, **payload} + + args = ["diff", "--no-ext-diff", "--unified=3"] + if kind == "staged": + args.append("--cached") + args.extend(["--", repo_rel]) + result = _run_git(ctx, args, check=True) + diff = result.stdout + binary = "Binary files " in diff or "GIT binary patch" in diff + too_large = len(diff.encode("utf-8", errors="replace")) > DIFF_SIZE_LIMIT + if too_large: + diff = diff[:DIFF_SIZE_LIMIT] + additions, deletions = _diff_stats(diff) + return { + "path": workspace_rel, + "kind": kind, + "binary": binary, + "too_large": too_large, + "additions": additions, + "deletions": deletions, + "diff": "" if binary else diff, + } + + +def _clean_paths(paths: Iterable[str]) -> list[str]: + cleaned = [] + for path in paths: + value = str(path or "").strip() + if value and value not in cleaned: + cleaned.append(value) + if not cleaned: + raise GitWorkspaceError("At least one path is required") + return cleaned + + +def _pathspecs(ctx: GitContext, paths: Iterable[str]) -> list[str]: + return [_repo_rel(ctx, path) for path in _clean_paths(paths)] + + +def git_stage(workspace: str | Path, paths: Iterable[str]) -> dict: + ctx = resolve_git_context(workspace) + if ctx is None: + raise GitWorkspaceError("Workspace is not a Git repository", "not_a_repo") + with _git_mutation_lock(ctx): + _run_git(ctx, ["add", "--", *_pathspecs(ctx, paths)], check=True) + return git_status(workspace) + + +def git_unstage(workspace: str | Path, paths: Iterable[str]) -> dict: + ctx = resolve_git_context(workspace) + if ctx is None: + raise GitWorkspaceError("Workspace is not a Git repository", "not_a_repo") + specs = _pathspecs(ctx, paths) + with _git_mutation_lock(ctx): + result = _run_git(ctx, ["restore", "--staged", "--", *specs], check=False) + if result.returncode != 0: + _run_git(ctx, ["reset", "HEAD", "--", *specs], check=True) + return git_status(workspace) + + +def git_discard(workspace: str | Path, paths: Iterable[str], *, delete_untracked: bool = False) -> dict: + ctx = resolve_git_context(workspace) + if ctx is None: + raise GitWorkspaceError("Workspace is not a Git repository", "not_a_repo") + with _git_mutation_lock(ctx): + status = git_status(workspace) + by_path = {f["path"]: f for f in status.get("files", [])} + for path in _clean_paths(paths): + repo_rel = _repo_rel(ctx, path) + workspace_rel = _workspace_rel(ctx, repo_rel) or path + state = by_path.get(workspace_rel) or by_path.get(workspace_rel.rstrip("/") + "/") + if state and state.get("conflict"): + raise GitWorkspaceError("Conflicted files cannot be discarded from this panel", "conflict") + if state and state.get("untracked"): + if not delete_untracked: + raise GitWorkspaceError("Untracked files require delete_untracked=true") + target = safe_resolve_ws(ctx.workspace, workspace_rel) + if target.is_dir(): + shutil.rmtree(target) + else: + target.unlink(missing_ok=True) + continue + _run_git(ctx, ["restore", "--worktree", "--", repo_rel], check=True) + return git_status(workspace) + + +COMMIT_MESSAGE_SYSTEM_PROMPT = """When writing commit messages, PR titles, or PR descriptions: + +- Inspect the staged diff before suggesting a commit message. +- Do not use vague subjects like "update", "improve", "refine", "misc changes", "fix stuff", or "various changes". +- For large commits, write a concise subject plus a short body with 2-5 bullets summarizing the main areas changed. +- The subject should describe the actual user-facing result or bug fixed, not just broad implementation activity. +- Keep wording short, clear, and natural. +- Never mention AI, Cursor, Zed, agents, or similar tooling in commits, branch names, PR titles, or PR descriptions. +- Never add your own thoughts or questions into the commit message, the commit message is definitive in nature. + +Return only the commit message text. Do not wrap it in Markdown fences. +""".strip() + + +def _staged_diff_text(ctx: GitContext) -> tuple[str, bool]: + result = _run_git( + ctx, + [ + "diff", + "--cached", + "--no-ext-diff", + "--unified=3", + "--", + _workspace_pathspec(ctx), + ], + check=True, + ) + diff = result.stdout or "" + encoded = diff.encode("utf-8", errors="replace") + if len(encoded) <= COMMIT_MESSAGE_DIFF_LIMIT: + return diff, False + return encoded[:COMMIT_MESSAGE_DIFF_LIMIT].decode("utf-8", errors="replace"), True + + +def _selected_temp_index_env(ctx: GitContext, specs: list[str]) -> tuple[dict[str, str], str]: + fd, index_path = tempfile.mkstemp(prefix="hermes-webui-git-index-") + os.close(fd) + Path(index_path).unlink(missing_ok=True) + env = {"GIT_INDEX_FILE": index_path} + try: + head = _run_git(ctx, ["rev-parse", "--verify", "HEAD"], check=False, env=env) + if head.returncode == 0: + _run_git(ctx, ["read-tree", "HEAD"], check=True, env=env) + else: + _run_git(ctx, ["read-tree", "--empty"], check=True, env=env) + _run_git(ctx, ["add", "-A", "--", *specs], check=True, env=env) + return env, index_path + except Exception: + Path(index_path).unlink(missing_ok=True) + raise + + +def _selected_files(ctx: GitContext, paths: Iterable[str]) -> tuple[list[str], list[str], list[dict]]: + requested = _clean_paths(paths) + requested_specs = [_repo_rel(ctx, path) for path in requested] + workspace_paths = [_workspace_rel(ctx, spec) or path for spec, path in zip(requested_specs, requested)] + status = git_status(ctx.workspace) + by_path = {f["path"]: f for f in status.get("files", [])} + specs: list[str] = [] + selected = [] + for path, repo_rel in zip(workspace_paths, requested_specs): + state = by_path.get(path) + if not state: + continue + if state.get("conflict"): + raise GitWorkspaceError("Resolve conflicts before committing selected files", "conflict") + if state.get("staged") or state.get("unstaged") or state.get("untracked"): + selected.append(state) + for spec in (repo_rel, _repo_rel(ctx, state["old_path"]) if state.get("old_path") else ""): + if spec and spec not in specs: + specs.append(spec) + if len(selected) != len(workspace_paths): + raise GitWorkspaceError("Selected paths have no committable changes") + return specs, workspace_paths, selected + + +def _selected_diff_text(ctx: GitContext, specs: list[str]) -> tuple[str, bool]: + env, index_path = _selected_temp_index_env(ctx, specs) + try: + result = _run_git( + ctx, + ["diff", "--cached", "--no-ext-diff", "--unified=3", "--", *specs], + check=True, + env=env, + ) + diff = result.stdout or "" + encoded = diff.encode("utf-8", errors="replace") + if len(encoded) <= COMMIT_MESSAGE_DIFF_LIMIT: + return diff, False + return encoded[:COMMIT_MESSAGE_DIFF_LIMIT].decode("utf-8", errors="replace"), True + finally: + Path(index_path).unlink(missing_ok=True) + + +def selected_commit_message_prompt(workspace: str | Path, paths: Iterable[str]) -> dict: + ctx = resolve_git_context(workspace) + if ctx is None: + raise GitWorkspaceError("Workspace is not a Git repository", "not_a_repo") + specs, _workspace_paths, selected_files = _selected_files(ctx, paths) + diff, truncated = _selected_diff_text(ctx, specs) + if not diff.strip(): + raise GitWorkspaceError("No selected diff is available") + status = git_status(workspace) + file_lines = [] + for item in selected_files[:80]: + stats = ( + "binary" + if item.get("binary") + else f"+{item.get('additions') or 0} -{item.get('deletions') or 0}" + ) + file_lines.append(f"- {item.get('status') or 'M'} {item.get('path')} ({stats})") + if len(selected_files) > 80: + file_lines.append(f"- ... {len(selected_files) - 80} more selected file(s)") + user_prompt = ( + "Write a commit message for the selected Git diff below.\n\n" + f"Branch: {status.get('branch') or 'HEAD'}\n" + f"Selected files ({len(selected_files)}):\n" + + "\n".join(file_lines) + + ( + "\n\nDiff was truncated for size; summarize only what is visible.\n" + if truncated + else "\n" + ) + + "\nSelected diff:\n```diff\n" + + diff + + "\n```" + ) + return { + "system_prompt": COMMIT_MESSAGE_SYSTEM_PROMPT, + "user_prompt": user_prompt, + "truncated": truncated, + "status": status, + } + + +def staged_commit_message_prompt(workspace: str | Path) -> dict: + ctx = resolve_git_context(workspace) + if ctx is None: + raise GitWorkspaceError("Workspace is not a Git repository") + status = git_status(workspace) + if int((status.get("totals") or {}).get("staged") or 0) <= 0: + raise GitWorkspaceError("Stage changes before generating a commit message") + diff, truncated = _staged_diff_text(ctx) + if not diff.strip(): + raise GitWorkspaceError("No staged diff is available") + staged_files = [f for f in status.get("files", []) if f.get("staged")] + file_lines = [] + for item in staged_files[:80]: + stats = ( + "binary" + if item.get("binary") + else f"+{item.get('additions') or 0} -{item.get('deletions') or 0}" + ) + file_lines.append(f"- {item.get('status') or 'M'} {item.get('path')} ({stats})") + if len(staged_files) > 80: + file_lines.append(f"- ... {len(staged_files) - 80} more staged file(s)") + user_prompt = ( + "Write a commit message for the staged Git diff below.\n\n" + f"Branch: {status.get('branch') or 'HEAD'}\n" + f"Staged files ({len(staged_files)}):\n" + + "\n".join(file_lines) + + ( + "\n\nDiff was truncated for size; summarize only what is visible.\n" + if truncated + else "\n" + ) + + "\nStaged diff:\n```diff\n" + + diff + + "\n```" + ) + return { + "system_prompt": COMMIT_MESSAGE_SYSTEM_PROMPT, + "user_prompt": user_prompt, + "truncated": truncated, + "status": status, + } + + +def clean_generated_commit_message(message: str) -> str: + text = str(message or "").strip() + if text.startswith("```"): + lines = text.splitlines() + if lines and lines[0].startswith("```"): + lines = lines[1:] + if lines and lines[-1].strip() == "```": + lines = lines[:-1] + text = "\n".join(lines).strip() + if (text.startswith('"') and text.endswith('"')) or ( + text.startswith("'") and text.endswith("'") + ): + text = text[1:-1].strip() + return text + + +def git_commit(workspace: str | Path, message: str) -> dict: + msg = str(message or "").strip() + if not msg: + raise GitWorkspaceError("Commit message is required") + ctx = resolve_git_context(workspace) + if ctx is None: + raise GitWorkspaceError("Workspace is not a Git repository", "not_a_repo") + with _git_mutation_lock(ctx): + _run_git(ctx, ["commit", "-m", msg], timeout=10, check=True) + sha = _run_git(ctx, ["rev-parse", "--short", "HEAD"], check=True).stdout.strip() + return {"ok": True, "commit": sha, "status": git_status(workspace)} + + +def git_commit_selected(workspace: str | Path, message: str, paths: Iterable[str]) -> dict: + msg = str(message or "").strip() + if not msg: + raise GitWorkspaceError("Commit message is required") + ctx = resolve_git_context(workspace) + if ctx is None: + raise GitWorkspaceError("Workspace is not a Git repository", "not_a_repo") + with _git_mutation_lock(ctx): + specs, workspace_paths, _selected_files_list = _selected_files(ctx, paths) + env, index_path = _selected_temp_index_env(ctx, specs) + try: + quiet = _run_git(ctx, ["diff", "--cached", "--quiet", "--", *specs], check=False, env=env) + if quiet.returncode == 0: + raise GitWorkspaceError("Selected paths have no committable changes") + _run_git(ctx, ["commit", "-m", msg], timeout=10, check=True, env=env) + _run_git(ctx, ["reset", "-q", "HEAD", "--", *specs], check=True) + finally: + Path(index_path).unlink(missing_ok=True) + sha = _run_git(ctx, ["rev-parse", "--short", "HEAD"], check=True).stdout.strip() + return {"ok": True, "commit": sha, "paths": workspace_paths, "status": git_status(workspace)} + + +def _branch_name(ctx: GitContext) -> str: + branch = _run_git(ctx, ["branch", "--show-current"], check=True).stdout.strip() + if not branch: + raise GitWorkspaceError("Cannot push from a detached HEAD") + return branch + + +def _remote_message(result: subprocess.CompletedProcess[str]) -> str: + return (result.stdout or result.stderr or "").strip() + + +def git_fetch(workspace: str | Path) -> dict: + ctx = resolve_git_context(workspace) + if ctx is None: + raise GitWorkspaceError("Workspace is not a Git repository", "not_a_repo") + with _git_mutation_lock(ctx): + result = _run_git(ctx, ["fetch", "--prune"], timeout=GIT_REMOTE_TIMEOUT, check=True) + return {"ok": True, "message": _remote_message(result), "status": git_status(workspace)} + + +def git_pull(workspace: str | Path) -> dict: + ctx = resolve_git_context(workspace) + if ctx is None: + raise GitWorkspaceError("Workspace is not a Git repository", "not_a_repo") + with _git_mutation_lock(ctx): + result = _run_git(ctx, ["pull", "--ff-only"], timeout=GIT_REMOTE_TIMEOUT, check=True) + return {"ok": True, "message": _remote_message(result), "status": git_status(workspace)} + + +def git_push(workspace: str | Path) -> dict: + ctx = resolve_git_context(workspace) + if ctx is None: + raise GitWorkspaceError("Workspace is not a Git repository", "not_a_repo") + with _git_mutation_lock(ctx): + status = git_status(workspace) + args = ["push"] + if not status.get("upstream"): + branch = _branch_name(ctx) + remotes = _run_git(ctx, ["remote"], check=True).stdout.split() + if "origin" not in remotes: + raise GitWorkspaceError("No upstream branch or origin remote is configured", "no_upstream") + args.extend(["-u", "origin", branch]) + result = _run_git(ctx, args, timeout=GIT_REMOTE_TIMEOUT, check=True) + return {"ok": True, "message": _remote_message(result), "status": git_status(workspace)} diff --git a/tests/conftest.py b/tests/conftest.py index 66cf0102..39ab0c1d 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -536,6 +536,7 @@ def test_server(): # pytest-side block can't see. env["HERMES_WEBUI_TEST_NETWORK_BLOCK"] = "1" env.update({ + "HERMES_WEBUI_WORKSPACE_GIT_DESTRUCTIVE": "1", "HERMES_WEBUI_PORT": str(TEST_PORT), "HERMES_WEBUI_HOST": "127.0.0.1", "HERMES_WEBUI_STATE_DIR": str(TEST_STATE_DIR), diff --git a/tests/test_workspace_dir_signature.py b/tests/test_workspace_dir_signature.py new file mode 100644 index 00000000..696acf75 --- /dev/null +++ b/tests/test_workspace_dir_signature.py @@ -0,0 +1,26 @@ +from api.workspace import dir_signature, list_dir + + +def test_directory_signature_is_metadata_only_and_changes_with_entries(tmp_path): + (tmp_path / "alpha.txt").write_text("one", encoding="utf-8") + + entries = list_dir(tmp_path, ".") + sig1 = dir_signature(tmp_path, ".", entries) + + assert isinstance(sig1, str) + assert len(sig1) == 64 + assert all("mtime_ns" in entry for entry in entries) + + (tmp_path / "beta.txt").write_text("two", encoding="utf-8") + entries2 = list_dir(tmp_path, ".") + sig2 = dir_signature(tmp_path, ".", entries2) + + assert sig2 != sig1 + + +def test_directory_signature_can_be_computed_from_supplied_entries(tmp_path): + (tmp_path / "alpha.txt").write_text("one", encoding="utf-8") + + entries = list_dir(tmp_path, ".") + + assert dir_signature(tmp_path, ".", entries) == dir_signature(tmp_path, ".", entries) diff --git a/tests/test_workspace_git.py b/tests/test_workspace_git.py new file mode 100644 index 00000000..d0dea59b --- /dev/null +++ b/tests/test_workspace_git.py @@ -0,0 +1,796 @@ +import json +import pathlib +import subprocess +import uuid +import urllib.error +import urllib.parse +import urllib.request + +import pytest + +from tests._pytest_port import BASE + + +ROOT = pathlib.Path(__file__).parent.parent + + +def _git(cwd, *args): + result = subprocess.run( + ["git", *args], + cwd=str(cwd), + shell=False, + text=True, + capture_output=True, + timeout=20, + ) + assert result.returncode == 0, result.stderr or result.stdout + return result.stdout + + +def _init_repo(path): + path.mkdir(parents=True, exist_ok=True) + _git(path, "init") + _git(path, "config", "user.email", "hermes-tests@example.invalid") + _git(path, "config", "user.name", "Hermes Tests") + return path + + +def _commit_all(path, message="initial"): + _git(path, "add", ".") + _git(path, "commit", "-m", message) + + +def _get(path): + try: + with urllib.request.urlopen(BASE + path, timeout=10) as r: + return json.loads(r.read()), r.status + except urllib.error.HTTPError as e: + return json.loads(e.read()), e.code + + +def _post(path, body=None): + data = json.dumps(body or {}).encode() + req = urllib.request.Request( + BASE + path, + data=data, + headers={"Content-Type": "application/json"}, + ) + try: + with urllib.request.urlopen(req, timeout=10) as r: + return json.loads(r.read()), r.status + except urllib.error.HTTPError as e: + return json.loads(e.read()), e.code + + +def _make_session(created_list, ws=None): + body = {} + if ws: + body["workspace"] = str(ws) + data, status = _post("/api/session/new", body) + assert status == 200 + sid = data["session"]["session_id"] + created_list.append(sid) + return sid, pathlib.Path(data["session"]["workspace"]) + + +def test_git_status_non_git_workspace(tmp_path): + from api.workspace_git import git_status + + ws = tmp_path / "plain" + ws.mkdir() + assert git_status(ws) == {"is_git": False} + + +def test_git_status_handles_staged_unstaged_untracked_deleted_and_renamed(tmp_path): + from api.workspace_git import git_status + + repo = _init_repo(tmp_path / "repo") + (repo / "tracked.txt").write_text("one\n", encoding="utf-8") + (repo / "delete-me.txt").write_text("bye\n", encoding="utf-8") + (repo / "old name.txt").write_text("move\n", encoding="utf-8") + _commit_all(repo) + + (repo / "tracked.txt").write_text("one\ntwo\n", encoding="utf-8") + (repo / "staged.txt").write_text("staged\n", encoding="utf-8") + _git(repo, "add", "staged.txt") + (repo / "delete-me.txt").unlink() + _git(repo, "mv", "old name.txt", "new name.txt") + (repo / "untracked space.txt").write_text("new\nfile\n", encoding="utf-8") + + status = git_status(repo) + by_path = {item["path"]: item for item in status["files"]} + + assert status["is_git"] is True + assert by_path["tracked.txt"]["unstaged"] is True + assert by_path["staged.txt"]["staged"] is True + assert by_path["delete-me.txt"]["status"] == "D" + assert by_path["new name.txt"]["old_path"] == "old name.txt" + assert by_path["untracked space.txt"]["untracked"] is True + assert by_path["untracked space.txt"]["additions"] == 2 + assert status["totals"]["changed"] >= 5 + + +def test_git_status_reports_ignored_files_without_counting_them_as_changes(tmp_path): + from api.workspace_git import git_status + + repo = _init_repo(tmp_path / "repo") + (repo / ".gitignore").write_text("*.log\nbuild/\n", encoding="utf-8") + (repo / "tracked.txt").write_text("one\n", encoding="utf-8") + _commit_all(repo) + + (repo / "tracked.txt").write_text("one\ntwo\n", encoding="utf-8") + (repo / "debug.log").write_text("ignored log\n", encoding="utf-8") + build = repo / "build" + build.mkdir() + (build / "artifact.txt").write_text("ignored artifact\n", encoding="utf-8") + + status = git_status(repo) + by_path = {item["path"]: item for item in status["files"]} + + assert by_path["tracked.txt"]["unstaged"] is True + assert by_path["debug.log"]["ignored"] is True + assert by_path["debug.log"]["status"] == "Ignored" + assert by_path["build/"]["ignored"] is True + assert by_path["build/"]["staged"] is False + assert by_path["build/"]["untracked"] is False + assert status["totals"]["changed"] == 1 + assert status["totals"]["untracked"] == 0 + + +def test_git_status_ignores_crlf_only_worktree_noise(tmp_path): + from api.workspace_git import git_status + + repo = _init_repo(tmp_path / "repo") + (repo / "tracked.txt").write_text("one\ntwo\n", encoding="utf-8", newline="\n") + _commit_all(repo) + + (repo / "tracked.txt").write_text("one\r\ntwo\r\n", encoding="utf-8", newline="") + + raw = _git(repo, "status", "--porcelain", "--", "tracked.txt") + assert raw.startswith(" M") + + status = git_status(repo) + assert status["totals"]["changed"] == 0 + assert status["files"] == [] + assert status["noise_filtering"]["active"] is True + assert status["noise_filtering"]["crlf_only"] == 1 + + +def test_git_status_keeps_real_edit_with_crlf_endings(tmp_path): + from api.workspace_git import git_status + + repo = _init_repo(tmp_path / "repo") + (repo / "tracked.txt").write_text("one\ntwo\n", encoding="utf-8", newline="\n") + _commit_all(repo) + + (repo / "tracked.txt").write_text("one\r\ntwo\r\nthree\r\n", encoding="utf-8", newline="") + + status = git_status(repo) + by_path = {item["path"]: item for item in status["files"]} + assert status["totals"]["changed"] == 1 + assert by_path["tracked.txt"]["unstaged"] is True + assert by_path["tracked.txt"]["additions"] == 1 + assert by_path["tracked.txt"]["deletions"] == 0 + + +def test_git_status_ignores_filemode_only_noise(tmp_path): + from api.workspace_git import git_status + + repo = _init_repo(tmp_path / "repo") + script = repo / "script.sh" + script.write_text("#!/bin/sh\necho hi\n", encoding="utf-8") + _commit_all(repo) + + _git(repo, "update-index", "--chmod=+x", "script.sh") + + raw = _git(repo, "status", "--porcelain", "--", "script.sh") + assert "script.sh" in raw + + status = git_status(repo) + assert status["totals"]["changed"] == 0 + assert status["files"] == [] + assert status["noise_filtering"]["active"] is True + + +def test_git_status_scopes_nested_workspace_to_that_directory(tmp_path): + from api.workspace_git import git_status + + repo = _init_repo(tmp_path / "repo") + nested = repo / "app" + nested.mkdir() + (nested / "inside.txt").write_text("inside\n", encoding="utf-8") + (repo / "outside.txt").write_text("outside\n", encoding="utf-8") + _commit_all(repo) + + (nested / "inside.txt").write_text("inside\nchanged\n", encoding="utf-8") + (repo / "outside.txt").write_text("outside\nchanged\n", encoding="utf-8") + + status = git_status(nested) + paths = {item["path"] for item in status["files"]} + assert paths == {"inside.txt"} + + +def test_git_diff_generates_untracked_text_diff_and_blocks_escape(tmp_path): + from api.workspace_git import GitWorkspaceError, git_diff + + repo = _init_repo(tmp_path / "repo") + (repo / "tracked.txt").write_text("one\n", encoding="utf-8") + _commit_all(repo) + (repo / "new file.txt").write_text("hello\nworld\n", encoding="utf-8") + + diff = git_diff(repo, "new file.txt", "unstaged") + assert diff["binary"] is False + assert "+++ b/new file.txt" in diff["diff"] + assert "+hello" in diff["diff"] + + with pytest.raises(GitWorkspaceError): + git_diff(repo, "../outside.txt", "unstaged") + + +def test_git_status_reports_untracked_files_inside_directories(tmp_path): + from api.workspace_git import git_discard, git_status + + repo = _init_repo(tmp_path / "repo") + (repo / "tracked.txt").write_text("one\n", encoding="utf-8") + _commit_all(repo) + nested = repo / "newdir" + nested.mkdir() + (nested / "a.txt").write_text("hello\n", encoding="utf-8") + + status = git_status(repo) + paths = {item["path"] for item in status["files"]} + assert "newdir/a.txt" in paths + assert "newdir/" not in paths + + git_discard(repo, ["newdir/a.txt"], delete_untracked=True) + assert not (nested / "a.txt").exists() + + +def test_git_status_reports_ignored_files_without_counting_them_as_changed(tmp_path): + from api.workspace_git import git_status + + repo = _init_repo(tmp_path / "repo") + (repo / ".gitignore").write_text("*.log\nbuild/\n", encoding="utf-8") + (repo / "tracked.txt").write_text("one\n", encoding="utf-8") + _commit_all(repo) + + (repo / "tracked.txt").write_text("one\ntwo\n", encoding="utf-8") + (repo / "debug.log").write_text("ignored log\n", encoding="utf-8") + build = repo / "build" + build.mkdir() + (build / "artifact.txt").write_text("ignored artifact\n", encoding="utf-8") + + status = git_status(repo) + by_path = {item["path"]: item for item in status["files"]} + + assert by_path["tracked.txt"]["unstaged"] is True + assert by_path["debug.log"]["ignored"] is True + assert by_path["debug.log"]["status"] == "Ignored" + assert by_path["debug.log"]["staged"] is False + assert by_path["debug.log"]["unstaged"] is False + assert by_path["debug.log"]["untracked"] is False + assert any(item["ignored"] and item["path"].startswith("build") for item in status["files"]) + assert status["totals"]["changed"] == 1 + assert status["totals"]["untracked"] == 0 + + +def test_git_diff_large_untracked_file_is_bounded(tmp_path): + from api.workspace_git import DIFF_SIZE_LIMIT, git_diff, git_status + + repo = _init_repo(tmp_path / "repo") + (repo / "tracked.txt").write_text("one\n", encoding="utf-8") + _commit_all(repo) + large = repo / "large.txt" + large.write_text("x" * (DIFF_SIZE_LIMIT + 1), encoding="utf-8") + + status = git_status(repo) + by_path = {item["path"]: item for item in status["files"]} + assert by_path["large.txt"]["untracked"] is True + assert by_path["large.txt"]["additions"] == 0 + + diff = git_diff(repo, "large.txt", "unstaged") + assert diff["too_large"] is True + assert diff["diff"] == "" + + +def test_git_stage_unstage_discard_and_commit(tmp_path): + from api.workspace_git import git_commit, git_discard, git_stage, git_status, git_unstage + + repo = _init_repo(tmp_path / "repo") + (repo / "tracked.txt").write_text("one\n", encoding="utf-8") + _commit_all(repo) + + (repo / "tracked.txt").write_text("one\ntwo\n", encoding="utf-8") + staged = git_stage(repo, ["tracked.txt"]) + assert staged["totals"]["staged"] == 1 + + unstaged = git_unstage(repo, ["tracked.txt"]) + assert unstaged["totals"]["staged"] == 0 + assert unstaged["totals"]["unstaged"] == 1 + + git_discard(repo, ["tracked.txt"]) + assert git_status(repo)["totals"]["changed"] == 0 + + (repo / "tracked.txt").write_text("one\nthree\n", encoding="utf-8") + git_stage(repo, ["tracked.txt"]) + committed = git_commit(repo, "Update tracked file") + assert committed["ok"] is True + assert committed["commit"] + assert committed["status"]["totals"]["changed"] == 0 + + +def test_git_commit_selected_ignores_unrelated_real_index(tmp_path): + from api.workspace_git import git_commit_selected, git_status + + repo = _init_repo(tmp_path / "repo") + (repo / "selected.txt").write_text("one\n", encoding="utf-8") + (repo / "staged.txt").write_text("alpha\n", encoding="utf-8") + _commit_all(repo) + + (repo / "selected.txt").write_text("one\ntwo\n", encoding="utf-8") + (repo / "staged.txt").write_text("alpha\nbeta\n", encoding="utf-8") + _git(repo, "add", "staged.txt") + + committed = git_commit_selected(repo, "Commit selected only", ["selected.txt"]) + assert committed["ok"] is True + assert committed["paths"] == ["selected.txt"] + assert _git(repo, "show", "--name-only", "--format=", "HEAD").splitlines() == ["selected.txt"] + + by_path = {item["path"]: item for item in git_status(repo)["files"]} + assert "selected.txt" not in by_path + assert by_path["staged.txt"]["staged"] is True + + +def test_git_commit_selected_supports_initial_commit(tmp_path): + from api.workspace_git import git_commit_selected, git_status + + repo = _init_repo(tmp_path / "repo") + (repo / "first.txt").write_text("first\n", encoding="utf-8") + + committed = git_commit_selected(repo, "Initial selected commit", ["first.txt"]) + assert committed["ok"] is True + assert _git(repo, "show", "--name-only", "--format=", "HEAD").splitlines() == ["first.txt"] + assert git_status(repo)["totals"]["changed"] == 0 + + +def test_git_commit_selected_preserves_rename_semantics(tmp_path): + from api.workspace_git import git_commit_selected, git_status + + repo = _init_repo(tmp_path / "repo") + (repo / "old.txt").write_text("old\n", encoding="utf-8") + _commit_all(repo) + + _git(repo, "mv", "old.txt", "new.txt") + + committed = git_commit_selected(repo, "Rename selected file", ["new.txt"]) + assert committed["ok"] is True + assert _git(repo, "ls-tree", "--name-only", "HEAD").splitlines() == ["new.txt"] + assert "old.txt" not in _git(repo, "status", "--porcelain=v2") + assert git_status(repo)["totals"]["changed"] == 0 + + +def test_git_commit_selected_handles_untracked_and_mixed_paths(tmp_path): + from api.workspace_git import git_commit_selected + + repo = _init_repo(tmp_path / "repo") + (repo / "tracked.txt").write_text("one\n", encoding="utf-8") + _commit_all(repo) + + (repo / "tracked.txt").write_text("one\ntwo\n", encoding="utf-8") + (repo / "new.txt").write_text("new\n", encoding="utf-8") + + committed = git_commit_selected(repo, "Commit mixed selected files", ["tracked.txt", "new.txt"]) + assert committed["ok"] is True + assert set(_git(repo, "show", "--name-only", "--format=", "HEAD").splitlines()) == { + "tracked.txt", + "new.txt", + } + + +def test_git_commit_selected_respects_nested_workspace_scope(tmp_path): + from api.workspace_git import GitWorkspaceError, git_commit_selected + + repo = _init_repo(tmp_path / "repo") + nested = repo / "app" + nested.mkdir() + (nested / "inside.txt").write_text("inside\n", encoding="utf-8") + (repo / "outside.txt").write_text("outside\n", encoding="utf-8") + _commit_all(repo) + + (nested / "inside.txt").write_text("inside\nchanged\n", encoding="utf-8") + (repo / "outside.txt").write_text("outside\nchanged\n", encoding="utf-8") + + committed = git_commit_selected(nested, "Nested selected commit", ["inside.txt"]) + assert committed["paths"] == ["inside.txt"] + assert _git(repo, "show", "--name-only", "--format=", "HEAD").splitlines() == ["app/inside.txt"] + + with pytest.raises(GitWorkspaceError) as outside: + git_commit_selected(nested, "Outside", ["../outside.txt"]) + assert outside.value.code == "path_outside_workspace" + + +def test_git_commit_selected_rejects_conflicts_and_path_traversal(tmp_path): + from api.workspace_git import GitWorkspaceError, git_commit_selected + + repo = _init_repo(tmp_path / "repo") + (repo / "conflict.txt").write_text("base\n", encoding="utf-8") + _commit_all(repo) + _git(repo, "checkout", "-b", "side") + (repo / "conflict.txt").write_text("side\n", encoding="utf-8") + _commit_all(repo, "side") + _git(repo, "checkout", "master") + (repo / "conflict.txt").write_text("main\n", encoding="utf-8") + _commit_all(repo, "main") + subprocess.run(["git", "merge", "side"], cwd=repo, shell=False, text=True, capture_output=True, timeout=20) + + with pytest.raises(GitWorkspaceError) as conflict: + git_commit_selected(repo, "Nope", ["conflict.txt"]) + assert conflict.value.code == "conflict" + + with pytest.raises(GitWorkspaceError) as traversal: + git_commit_selected(repo, "Nope", ["../outside.txt"]) + assert traversal.value.code == "path_outside_workspace" + + +def test_selected_commit_message_prompt_uses_selected_diff(tmp_path): + from api.workspace_git import selected_commit_message_prompt + + repo = _init_repo(tmp_path / "repo") + (repo / "selected.txt").write_text("one\n", encoding="utf-8") + (repo / "other.txt").write_text("alpha\n", encoding="utf-8") + _commit_all(repo) + (repo / "selected.txt").write_text("one\ntwo\n", encoding="utf-8") + (repo / "other.txt").write_text("alpha\nbeta\n", encoding="utf-8") + + prompt = selected_commit_message_prompt(repo, ["selected.txt"]) + assert "selected.txt" in prompt["user_prompt"] + assert "+two" in prompt["user_prompt"] + assert "other.txt" not in prompt["user_prompt"] + assert "beta" not in prompt["user_prompt"] + + +def test_staged_commit_message_prompt_uses_only_staged_diff(tmp_path): + from api.workspace_git import ( + GitWorkspaceError, + clean_generated_commit_message, + staged_commit_message_prompt, + ) + + repo = _init_repo(tmp_path / "repo") + (repo / "tracked.txt").write_text("one\n", encoding="utf-8") + _commit_all(repo) + + (repo / "tracked.txt").write_text("one\nstaged\n", encoding="utf-8") + _git(repo, "add", "tracked.txt") + (repo / "tracked.txt").write_text("one\nstaged\nunstaged\n", encoding="utf-8") + + prompt = staged_commit_message_prompt(repo) + assert prompt["truncated"] is False + assert "tracked.txt" in prompt["user_prompt"] + assert "+staged" in prompt["user_prompt"] + assert "unstaged" not in prompt["user_prompt"] + assert "Never mention AI, Cursor, Zed, agents" in prompt["system_prompt"] + + _git(repo, "restore", "--staged", "tracked.txt") + with pytest.raises(GitWorkspaceError): + staged_commit_message_prompt(repo) + + assert clean_generated_commit_message("```text\nSubject\n\n- Body\n```") == "Subject\n\n- Body" + + +def test_git_fetch_pull_and_push_with_upstream(tmp_path): + from api.workspace_git import git_fetch, git_pull, git_push, git_status + + remote = tmp_path / "remote.git" + _git(tmp_path, "init", "--bare", str(remote)) + + origin = _init_repo(tmp_path / "origin") + (origin / "tracked.txt").write_text("one\n", encoding="utf-8") + _commit_all(origin) + _git(origin, "remote", "add", "origin", str(remote)) + _git(origin, "push", "-u", "origin", "HEAD") + + clone = tmp_path / "clone" + _git(tmp_path, "clone", str(remote), str(clone)) + _git(clone, "config", "user.email", "hermes-tests@example.invalid") + _git(clone, "config", "user.name", "Hermes Tests") + + (origin / "tracked.txt").write_text("one\ntwo\n", encoding="utf-8") + _commit_all(origin, "Remote update") + _git(origin, "push") + + fetched = git_fetch(clone) + assert fetched["status"]["behind"] == 1 + + pulled = git_pull(clone) + assert pulled["status"]["behind"] == 0 + assert (clone / "tracked.txt").read_text(encoding="utf-8") == "one\ntwo\n" + + (clone / "tracked.txt").write_text("one\ntwo\nthree\n", encoding="utf-8") + _git(clone, "add", "tracked.txt") + _git(clone, "commit", "-m", "Local update") + assert git_status(clone)["ahead"] == 1 + + pushed = git_push(clone) + assert pushed["status"]["ahead"] == 0 + + +def test_git_branches_lists_local_remote_and_upstream(tmp_path): + from api.workspace_git import git_branches + + remote = tmp_path / "remote.git" + _git(tmp_path, "init", "--bare", str(remote)) + origin = _init_repo(tmp_path / "origin") + (origin / "tracked.txt").write_text("one\n", encoding="utf-8") + _commit_all(origin) + _git(origin, "branch", "-M", "main") + _git(origin, "remote", "add", "origin", str(remote)) + _git(origin, "push", "-u", "origin", "main") + _git(remote, "symbolic-ref", "HEAD", "refs/heads/main") + + clone = tmp_path / "clone" + _git(tmp_path, "clone", str(remote), str(clone)) + branches = git_branches(clone) + assert branches["current"] == "main" + assert branches["detached"] is False + assert any(item["name"] == "main" and item["upstream"] == "origin/main" for item in branches["local"]) + main = next(item for item in branches["local"] if item["name"] == "main") + assert "updated_relative" in main and "author" in main and "subject" in main + assert any(item["name"] == "origin/main" for item in branches["remote"]) + assert not any(item["name"] == "origin" for item in branches["remote"]) + + +def test_git_checkout_local_new_remote_dirty_and_invalid_refs(tmp_path): + from api.workspace_git import GitWorkspaceError, git_branches, git_checkout + + remote = tmp_path / "remote.git" + _git(tmp_path, "init", "--bare", str(remote)) + origin = _init_repo(tmp_path / "origin") + (origin / "tracked.txt").write_text("one\n", encoding="utf-8") + _commit_all(origin) + _git(origin, "branch", "-M", "main") + _git(origin, "remote", "add", "origin", str(remote)) + _git(origin, "push", "-u", "origin", "main") + _git(remote, "symbolic-ref", "HEAD", "refs/heads/main") + _git(origin, "checkout", "-b", "remote-feature") + (origin / "remote.txt").write_text("remote\n", encoding="utf-8") + _commit_all(origin, "remote feature") + _git(origin, "push", "-u", "origin", "remote-feature") + + clone = tmp_path / "clone" + _git(tmp_path, "clone", str(remote), str(clone)) + _git(clone, "config", "user.email", "hermes-tests@example.invalid") + _git(clone, "config", "user.name", "Hermes Tests") + + created = git_checkout(clone, "main", "new", new_branch="local-work") + assert created["current_branch"] == "local-work" + assert git_branches(clone)["current"] == "local-work" + + switched = git_checkout(clone, "main", "local") + assert switched["current_branch"] == "main" + + tracked = git_checkout(clone, "origin/remote-feature", "remote", new_branch="remote-feature", track=True) + assert tracked["current_branch"] == "remote-feature" + assert git_branches(clone)["upstream"] == "origin/remote-feature" + + (clone / "tracked.txt").write_text("dirty\n", encoding="utf-8") + with pytest.raises(GitWorkspaceError) as dirty: + git_checkout(clone, "main", "local") + assert dirty.value.code == "dirty_worktree" + _git(clone, "restore", "tracked.txt") + + with pytest.raises(GitWorkspaceError) as invalid: + git_checkout(clone, "does-not-exist", "local") + assert invalid.value.code in {"invalid_ref", "git_failed"} + + +def test_git_checkout_detached_requires_explicit_mode(tmp_path): + from api.workspace_git import git_branches, git_checkout + + repo = _init_repo(tmp_path / "repo") + (repo / "tracked.txt").write_text("one\n", encoding="utf-8") + _commit_all(repo) + sha = _git(repo, "rev-parse", "--short", "HEAD").strip() + + result = git_checkout(repo, sha, "detached") + assert result["ok"] is True + branches = git_branches(repo) + assert branches["detached"] is True + assert branches["current"] == sha + + +def test_git_stash_and_checkout_is_explicit(tmp_path): + from api.workspace_git import git_stash_and_checkout, git_status + + repo = _init_repo(tmp_path / "repo") + (repo / "tracked.txt").write_text("one\n", encoding="utf-8") + _commit_all(repo) + _git(repo, "checkout", "-b", "target") + _git(repo, "checkout", "master") + (repo / "tracked.txt").write_text("dirty\n", encoding="utf-8") + + result = git_stash_and_checkout(repo, "target", "local") + assert result["ok"] is True + assert result["stashed"] is True + assert result["stash_name"].startswith("hermes-webui branch switch") + assert result["current_branch"] == "target" + assert git_status(repo)["totals"]["changed"] == 0 + assert "hermes-webui branch switch target" in _git(repo, "stash", "list") + + +def test_git_stash_checkout_validates_before_stashing(tmp_path): + from api.workspace_git import GitWorkspaceError, git_stash_and_checkout + + repo = _init_repo(tmp_path / "repo") + (repo / "tracked.txt").write_text("one\n", encoding="utf-8") + _commit_all(repo) + (repo / "tracked.txt").write_text("dirty\n", encoding="utf-8") + + with pytest.raises(GitWorkspaceError) as invalid: + git_stash_and_checkout(repo, "missing-branch", "local") + + assert invalid.value.code == "invalid_ref" + assert "M tracked.txt" in _git(repo, "status", "--porcelain") + assert _git(repo, "stash", "list") == "" + + +def test_git_routes_status_diff_stage_unstage_discard_commit(cleanup_test_sessions): + sid, base_ws = _make_session(cleanup_test_sessions) + repo = base_ws / f"git-route-{uuid.uuid4().hex[:8]}" + _init_repo(repo) + (repo / "tracked.txt").write_text("one\n", encoding="utf-8") + _commit_all(repo) + + _post("/api/session/update", {"session_id": sid, "workspace": str(repo), "model": "openai/gpt-5.4-mini"}) + (repo / "tracked.txt").write_text("one\ntwo\n", encoding="utf-8") + + status, code = _get(f"/api/git/status?session_id={sid}") + assert code == 200 + assert status["git"]["totals"]["unstaged"] == 1 + + diff, code = _get( + f"/api/git/diff?session_id={sid}&path={urllib.parse.quote('tracked.txt')}&kind=unstaged" + ) + assert code == 200 + assert "+two" in diff["diff"]["diff"] + + staged, code = _post("/api/git/stage", {"session_id": sid, "paths": ["tracked.txt"]}) + assert code == 200 and staged["git"]["totals"]["staged"] == 1 + + unstaged, code = _post("/api/git/unstage", {"session_id": sid, "paths": ["tracked.txt"]}) + assert code == 200 and unstaged["git"]["totals"]["unstaged"] == 1 + + discarded, code = _post("/api/git/discard", {"session_id": sid, "paths": ["tracked.txt"]}) + assert code == 200 and discarded["git"]["totals"]["changed"] == 0 + + (repo / "tracked.txt").write_text("one\nthree\n", encoding="utf-8") + _post("/api/git/stage", {"session_id": sid, "paths": ["tracked.txt"]}) + committed, code = _post("/api/git/commit", {"session_id": sid, "message": "Route commit"}) + assert code == 200 + assert committed["ok"] is True + assert committed["status"]["totals"]["changed"] == 0 + + +def test_git_routes_branches_and_checkout(cleanup_test_sessions): + sid, base_ws = _make_session(cleanup_test_sessions) + repo = base_ws / f"git-branch-route-{uuid.uuid4().hex[:8]}" + _init_repo(repo) + (repo / "tracked.txt").write_text("one\n", encoding="utf-8") + _commit_all(repo) + _git(repo, "branch", "-M", "main") + _git(repo, "checkout", "-b", "feature") + _git(repo, "checkout", "main") + + _post("/api/session/update", {"session_id": sid, "workspace": str(repo), "model": "openai/gpt-5.4-mini"}) + branches, code = _get(f"/api/git/branches?session_id={sid}") + assert code == 200 + assert branches["branches"]["current"] == "main" + assert any(item["name"] == "feature" for item in branches["branches"]["local"]) + + checked, code = _post( + "/api/git/checkout", + {"session_id": sid, "ref": "feature", "mode": "local", "dirty_mode": "block"}, + ) + assert code == 200 + assert checked["ok"] is True + assert checked["current_branch"] == "feature" + assert checked["git"]["branch"] == "feature" + + +def test_git_routes_selected_commit_and_structured_error(cleanup_test_sessions): + sid, base_ws = _make_session(cleanup_test_sessions) + repo = base_ws / f"git-selected-route-{uuid.uuid4().hex[:8]}" + _init_repo(repo) + (repo / "selected.txt").write_text("one\n", encoding="utf-8") + (repo / "other.txt").write_text("alpha\n", encoding="utf-8") + _commit_all(repo) + + _post("/api/session/update", {"session_id": sid, "workspace": str(repo), "model": "openai/gpt-5.4-mini"}) + (repo / "selected.txt").write_text("one\ntwo\n", encoding="utf-8") + (repo / "other.txt").write_text("alpha\nbeta\n", encoding="utf-8") + _git(repo, "add", "other.txt") + + bad, code = _post("/api/git/commit-selected", {"session_id": sid, "message": "Bad", "paths": ["../x"]}) + assert code == 400 + assert bad["code"] == "path_outside_workspace" + + committed, code = _post( + "/api/git/commit-selected", + {"session_id": sid, "message": "Selected route commit", "paths": ["selected.txt"]}, + ) + assert code == 200 + assert committed["ok"] is True + assert committed["paths"] == ["selected.txt"] + assert _git(repo, "show", "--name-only", "--format=", "HEAD").splitlines() == ["selected.txt"] + + +def test_git_env_scrub_removes_redirecting_vars_and_preserves_temp_index(monkeypatch): + from api.workspace_git import _clean_git_env + + monkeypatch.setenv("GIT_DIR", "/tmp/evil-git-dir") + monkeypatch.setenv("GIT_WORK_TREE", "/tmp/evil-work-tree") + monkeypatch.setenv("GIT_CONFIG_GLOBAL", "/tmp/evil-config") + + env = _clean_git_env({"GIT_INDEX_FILE": "/tmp/hermes-index"}) + + assert "GIT_DIR" not in env + assert "GIT_WORK_TREE" not in env + assert "GIT_CONFIG_GLOBAL" not in env + assert env["GIT_INDEX_FILE"] == "/tmp/hermes-index" + + +def test_git_error_classifier_identifies_non_fast_forward_push(): + from api.workspace_git import _classify_git_error + + assert _classify_git_error("Updates were rejected", ["push"]) == "non_fast_forward" + assert _classify_git_error("non-fast-forward", ["push"]) == "non_fast_forward" + assert _classify_git_error("fetch first", ["push"]) == "non_fast_forward" + + +def test_git_commit_hook_failure_returns_hook_failed_code(tmp_path): + from api.workspace_git import GitWorkspaceError, git_commit, git_stage + + repo = _init_repo(tmp_path / "repo") + (repo / "tracked.txt").write_text("one\n", encoding="utf-8") + _commit_all(repo) + hook = repo / ".git" / "hooks" / "pre-commit" + hook.write_text("#!/bin/sh\necho hook blocked >&2\nexit 1\n", encoding="utf-8") + hook.chmod(0o755) + + (repo / "tracked.txt").write_text("one\ntwo\n", encoding="utf-8") + git_stage(repo, ["tracked.txt"]) + + with pytest.raises(GitWorkspaceError) as exc: + git_commit(repo, "Hook should fail") + assert exc.value.code == "hook_failed" + + +def test_destructive_workspace_git_flag_defaults_off_and_accepts_truthy(monkeypatch): + from api.workspace_git import WORKSPACE_GIT_DESTRUCTIVE_ENV, workspace_git_destructive_enabled + + monkeypatch.delenv(WORKSPACE_GIT_DESTRUCTIVE_ENV, raising=False) + assert workspace_git_destructive_enabled() is False + + monkeypatch.setenv(WORKSPACE_GIT_DESTRUCTIVE_ENV, "1") + assert workspace_git_destructive_enabled() is True + + monkeypatch.setenv(WORKSPACE_GIT_DESTRUCTIVE_ENV, "true") + assert workspace_git_destructive_enabled() is True + + +def test_git_active_stream_lock_detection(monkeypatch): + import types + + from api import routes + from api.config import STREAMS, STREAMS_LOCK + + session = types.SimpleNamespace(active_stream_id="stream-git-lock-test") + with STREAMS_LOCK: + STREAMS[session.active_stream_id] = object() + try: + assert routes._git_locked_by_active_stream(session) is True + finally: + with STREAMS_LOCK: + STREAMS.pop(session.active_stream_id, None) + + assert routes._git_locked_by_active_stream(session) is False