feat(workspace): add backend Git operations

This commit is contained in:
stocky789
2026-05-20 01:20:46 +00:00
parent 9c983e693a
commit 5fc7aee781
6 changed files with 2566 additions and 5 deletions
+545 -4
View File
@@ -2221,6 +2221,7 @@ from api.workspace import (
get_last_workspace,
set_last_workspace,
list_dir,
dir_signature,
list_workspace_suggestions,
read_file_content,
safe_resolve_ws,
@@ -4139,6 +4140,15 @@ def handle_get(handler, parsed) -> bool:
if parsed.path == "/api/list":
return _handle_list_dir(handler, parsed)
if parsed.path == "/api/git/status":
return _handle_git_status(handler, parsed)
if parsed.path == "/api/git/branches":
return _handle_git_branches(handler, parsed)
if parsed.path == "/api/git/diff":
return _handle_git_diff(handler, parsed)
if parsed.path == "/api/personalities":
# Read personalities from config.yaml agent.personalities section
# (matches hermes-agent CLI behavior, not filesystem SOUL.md approach)
@@ -4170,9 +4180,22 @@ def handle_get(handler, parsed) -> bool:
s = get_session(sid)
except KeyError:
return bad(handler, "Session not found", 404)
from api.workspace import git_info_for_workspace
from api.workspace_git import GitWorkspaceError, git_status
info = git_info_for_workspace(Path(s.workspace))
try:
status = git_status(Path(s.workspace))
except GitWorkspaceError as e:
return _git_bad(handler, e)
totals = status.get("totals") or {}
info = None if not status.get("is_git") else {
"branch": status.get("branch"),
"dirty": totals.get("changed", 0),
"modified": (totals.get("staged", 0) or 0) + (totals.get("unstaged", 0) or 0),
"untracked": totals.get("untracked", 0),
"ahead": status.get("ahead", 0),
"behind": status.get("behind", 0),
"is_git": True,
}
return j(handler, {"git": info})
if parsed.path == "/api/commands":
@@ -5311,6 +5334,43 @@ def handle_post(handler, parsed) -> bool:
with cron_profile_context():
return _handle_cron_resume(handler, body)
# ── Git workspace ops (POST) ──
if parsed.path == "/api/git/stage":
return _handle_git_stage(handler, body)
if parsed.path == "/api/git/unstage":
return _handle_git_unstage(handler, body)
if parsed.path == "/api/git/discard":
return _handle_git_discard(handler, body)
if parsed.path == "/api/git/commit-message":
return _handle_git_commit_message(handler, body)
if parsed.path == "/api/git/commit-message-selected":
return _handle_git_commit_message_selected(handler, body)
if parsed.path == "/api/git/commit":
return _handle_git_commit(handler, body)
if parsed.path == "/api/git/commit-selected":
return _handle_git_commit_selected(handler, body)
if parsed.path == "/api/git/fetch":
return _handle_git_remote_action(handler, body, "fetch")
if parsed.path == "/api/git/pull":
return _handle_git_remote_action(handler, body, "pull")
if parsed.path == "/api/git/push":
return _handle_git_remote_action(handler, body, "push")
if parsed.path == "/api/git/checkout":
return _handle_git_checkout(handler, body)
if parsed.path == "/api/git/stash-checkout":
return _handle_git_stash_checkout(handler, body)
# ── File ops (POST) ──
if parsed.path == "/api/file/delete":
return _handle_file_delete(handler, body)
@@ -6185,11 +6245,14 @@ def _handle_list_dir(handler, parsed):
except Exception:
return bad(handler, "Session not found", 404)
try:
rel_path = qs.get("path", ["."])[0]
entries = list_dir(Path(workspace), rel_path)
return j(
handler,
{
"entries": list_dir(Path(workspace), qs.get("path", ["."])[0]),
"path": qs.get("path", ["."])[0],
"entries": entries,
"signature": dir_signature(Path(workspace), rel_path, entries),
"path": rel_path,
},
)
except (FileNotFoundError, ValueError) as e:
@@ -8657,6 +8720,484 @@ def _handle_cron_resume(handler, body):
return bad(handler, "Job not found", 404)
def _git_session(handler, session_id: str):
if not session_id:
bad(handler, "session_id required")
return None
try:
return get_session(session_id)
except KeyError:
bad(handler, "Session not found", 404)
return None
def _git_session_workspace(handler, session_id: str):
session = _git_session(handler, session_id)
if session is None:
return None
return Path(session.workspace)
def _git_session_and_workspace(handler, session_id: str):
session = _git_session(handler, session_id)
if session is None:
return None, None
return session, Path(session.workspace)
def _git_locked_by_active_stream(session) -> bool:
stream_id = getattr(session, "active_stream_id", None)
if not stream_id:
return False
try:
from api.config import STREAMS, STREAMS_LOCK
with STREAMS_LOCK:
return stream_id in STREAMS
except Exception:
return False
def _git_reject_destructive_if_unsafe(handler, session) -> bool:
from api.workspace_git import (
GitWorkspaceError,
WORKSPACE_GIT_DESTRUCTIVE_ENV,
workspace_git_destructive_enabled,
)
if not workspace_git_destructive_enabled():
_git_bad(
handler,
GitWorkspaceError(
f"Destructive workspace Git operations are disabled. Set {WORKSPACE_GIT_DESTRUCTIVE_ENV}=1 to enable them.",
"destructive_git_disabled",
),
status=403,
)
return True
if _git_locked_by_active_stream(session):
_git_bad(
handler,
GitWorkspaceError(
"A session run is active. Wait for it to finish before running this Git operation.",
"active_stream",
),
status=409,
)
return True
return False
def _handle_git_status(handler, parsed):
qs = parse_qs(parsed.query)
workspace = _git_session_workspace(handler, qs.get("session_id", [""])[0])
if workspace is None:
return True
try:
from api.workspace_git import GitWorkspaceError, git_status
return j(handler, {"git": git_status(workspace)})
except GitWorkspaceError as e:
return _git_bad(handler, e)
def _handle_git_branches(handler, parsed):
qs = parse_qs(parsed.query)
workspace = _git_session_workspace(handler, qs.get("session_id", [""])[0])
if workspace is None:
return True
try:
from api.workspace_git import GitWorkspaceError, git_branches
return j(handler, {"branches": git_branches(workspace)})
except GitWorkspaceError as e:
return _git_bad(handler, e)
def _handle_git_diff(handler, parsed):
qs = parse_qs(parsed.query)
workspace = _git_session_workspace(handler, qs.get("session_id", [""])[0])
if workspace is None:
return True
path = qs.get("path", [""])[0]
kind = qs.get("kind", ["unstaged"])[0]
if not path:
return bad(handler, "path required")
try:
from api.workspace_git import GitWorkspaceError, git_diff
return j(handler, {"diff": git_diff(workspace, path, kind)})
except GitWorkspaceError as e:
return _git_bad(handler, e)
def _git_bad(handler, err, status: int = 400):
return j(
handler,
{
"error": _sanitize_error(err),
"code": getattr(err, "code", "git_failed") or "git_failed",
},
status=status,
)
def _git_paths_from_body(body) -> list[str]:
raw_paths = body.get("paths")
if raw_paths is None and body.get("path"):
raw_paths = [body.get("path")]
if isinstance(raw_paths, str):
raw_paths = [raw_paths]
if not isinstance(raw_paths, list):
raise ValueError("paths must be a list")
return [str(path) for path in raw_paths]
def _handle_git_stage(handler, body):
try:
require(body, "session_id")
paths = _git_paths_from_body(body)
session, workspace = _git_session_and_workspace(handler, body["session_id"])
if workspace is None:
return True
if _git_reject_destructive_if_unsafe(handler, session):
return True
from api.workspace_git import GitWorkspaceError, git_stage
return j(handler, {"ok": True, "git": git_stage(workspace, paths)})
except ValueError as e:
return bad(handler, str(e))
except GitWorkspaceError as e:
return _git_bad(handler, e)
def _handle_git_unstage(handler, body):
try:
require(body, "session_id")
paths = _git_paths_from_body(body)
session, workspace = _git_session_and_workspace(handler, body["session_id"])
if workspace is None:
return True
if _git_reject_destructive_if_unsafe(handler, session):
return True
from api.workspace_git import GitWorkspaceError, git_unstage
return j(handler, {"ok": True, "git": git_unstage(workspace, paths)})
except ValueError as e:
return bad(handler, str(e))
except GitWorkspaceError as e:
return _git_bad(handler, e)
def _handle_git_discard(handler, body):
try:
require(body, "session_id")
paths = _git_paths_from_body(body)
session, workspace = _git_session_and_workspace(handler, body["session_id"])
if workspace is None:
return True
if _git_reject_destructive_if_unsafe(handler, session):
return True
from api.workspace_git import GitWorkspaceError, git_discard
return j(
handler,
{
"ok": True,
"git": git_discard(
workspace,
paths,
delete_untracked=bool(body.get("delete_untracked")),
),
},
)
except ValueError as e:
return bad(handler, str(e))
except GitWorkspaceError as e:
return _git_bad(handler, e)
def _llm_git_commit_message(system_prompt: str, user_prompt: str, session=None) -> str:
from api import profiles as profiles_api
active_profile = profiles_api.get_active_profile_name() or "default"
with profiles_api.profile_env_for_background_worker(
active_profile,
"git commit message",
logger_override=logger,
):
from api.config import (
get_effective_default_model,
model_with_provider_context,
resolve_custom_provider_connection,
resolve_model_provider,
)
session_model = str(getattr(session, "model", "") or "").strip()
session_provider = str(getattr(session, "model_provider", "") or "").strip() or None
model_for_resolution = (
model_with_provider_context(session_model, session_provider)
if session_model
else get_effective_default_model()
)
_main_model, _main_provider, _main_base_url = resolve_model_provider(model_for_resolution)
_main_api_key = None
try:
from api.oauth import resolve_runtime_provider_with_anthropic_env_lock
from hermes_cli.runtime_provider import resolve_runtime_provider
_rt = resolve_runtime_provider_with_anthropic_env_lock(
resolve_runtime_provider,
requested=_main_provider,
)
_main_api_key = _rt.get("api_key")
if not _main_provider:
_main_provider = _rt.get("provider")
if not _main_base_url:
_main_base_url = _rt.get("base_url")
except Exception as _e:
logger.debug("git commit message runtime provider resolution failed: %s", _e)
if isinstance(_main_provider, str) and _main_provider.startswith("custom:"):
_cp_key, _cp_base = resolve_custom_provider_connection(_main_provider)
if not _main_api_key and _cp_key:
_main_api_key = _cp_key
if not _main_base_url and _cp_base:
_main_base_url = _cp_base
messages = [
{"role": "system", "content": system_prompt},
{"role": "user", "content": user_prompt},
]
main_runtime = {
"provider": _main_provider,
"model": _main_model,
"base_url": _main_base_url,
"api_key": _main_api_key,
}
try:
from agent.auxiliary_client import get_text_auxiliary_client
aux_client, aux_model = get_text_auxiliary_client(
"compression",
main_runtime=main_runtime,
)
if aux_client is not None and aux_model:
response = aux_client.chat.completions.create(
model=aux_model,
messages=messages,
)
return str(response.choices[0].message.content or "").strip()
except Exception as _e:
logger.debug("git commit message auxiliary model failed; falling back to main model: %s", _e)
from run_agent import AIAgent
agent = AIAgent(
model=_main_model,
provider=_main_provider,
base_url=_main_base_url,
api_key=_main_api_key,
platform="webui",
quiet_mode=True,
enabled_toolsets=[],
session_id=f"git-commit-message-{uuid.uuid4().hex[:8]}",
)
result = agent.run_conversation(
user_message=user_prompt,
system_message=system_prompt,
conversation_history=[],
task_id=f"git-commit-message-{uuid.uuid4().hex[:8]}",
)
return str(result.get("final_response") or "").strip()
def _handle_git_commit_message(handler, body):
from api.workspace_git import (
GitWorkspaceError,
clean_generated_commit_message,
staged_commit_message_prompt,
)
try:
require(body, "session_id")
session = get_session(body["session_id"])
workspace = Path(session.workspace)
prompt = staged_commit_message_prompt(workspace)
message = clean_generated_commit_message(
_llm_git_commit_message(prompt["system_prompt"], prompt["user_prompt"], session=session)
)
if not message:
raise GitWorkspaceError("No commit message was generated")
return j(handler, {"ok": True, "message": message, "truncated": bool(prompt.get("truncated"))})
except KeyError:
return bad(handler, "Session not found", 404)
except ValueError as e:
return bad(handler, str(e))
except GitWorkspaceError as e:
return _git_bad(handler, e)
except Exception as e:
logger.exception("git commit message generation failed")
return bad(handler, _sanitize_error(e), 500)
def _handle_git_commit_message_selected(handler, body):
from api.workspace_git import (
GitWorkspaceError,
clean_generated_commit_message,
selected_commit_message_prompt,
)
try:
require(body, "session_id")
paths = _git_paths_from_body(body)
session = get_session(body["session_id"])
workspace = Path(session.workspace)
prompt = selected_commit_message_prompt(workspace, paths)
message = clean_generated_commit_message(
_llm_git_commit_message(prompt["system_prompt"], prompt["user_prompt"], session=session)
)
if not message:
raise GitWorkspaceError("No commit message was generated")
return j(handler, {"ok": True, "message": message, "truncated": bool(prompt.get("truncated"))})
except KeyError:
return bad(handler, "Session not found", 404)
except ValueError as e:
return bad(handler, str(e))
except GitWorkspaceError as e:
return _git_bad(handler, e)
except Exception as e:
logger.exception("selected git commit message generation failed")
return bad(handler, _sanitize_error(e), 500)
def _handle_git_commit(handler, body):
try:
require(body, "session_id", "message")
session, workspace = _git_session_and_workspace(handler, body["session_id"])
if workspace is None:
return True
if _git_reject_destructive_if_unsafe(handler, session):
return True
from api.workspace_git import GitWorkspaceError, git_commit
return j(handler, git_commit(workspace, body.get("message", "")))
except ValueError as e:
return bad(handler, str(e))
except GitWorkspaceError as e:
return _git_bad(handler, e)
def _handle_git_commit_selected(handler, body):
try:
require(body, "session_id", "message")
paths = _git_paths_from_body(body)
session, workspace = _git_session_and_workspace(handler, body["session_id"])
if workspace is None:
return True
if _git_reject_destructive_if_unsafe(handler, session):
return True
from api.workspace_git import GitWorkspaceError, git_commit_selected
return j(handler, git_commit_selected(workspace, body.get("message", ""), paths))
except ValueError as e:
return bad(handler, str(e))
except GitWorkspaceError as e:
return _git_bad(handler, e)
def _handle_git_remote_action(handler, body, action: str):
try:
require(body, "session_id")
session, workspace = _git_session_and_workspace(handler, body["session_id"])
if workspace is None:
return True
if action in {"pull", "push"} and _git_reject_destructive_if_unsafe(handler, session):
return True
from api.workspace_git import GitWorkspaceError, git_fetch, git_pull, git_push
actions = {
"fetch": git_fetch,
"pull": git_pull,
"push": git_push,
}
return j(handler, actions[action](workspace))
except ValueError as e:
return bad(handler, str(e))
except GitWorkspaceError as e:
return _git_bad(handler, e)
def _handle_git_checkout(handler, body):
try:
require(body, "session_id", "ref", "mode")
session, workspace = _git_session_and_workspace(handler, body["session_id"])
if workspace is None:
return True
if _git_reject_destructive_if_unsafe(handler, session):
return True
from api.workspace_git import GitWorkspaceError, git_checkout
result = git_checkout(
workspace,
str(body.get("ref", "")),
str(body.get("mode", "local")),
new_branch=body.get("new_branch"),
track=bool(body.get("track")),
dirty_mode=str(body.get("dirty_mode", "block")),
)
return j(
handler,
{
"ok": True,
"git": result.get("status"),
"branches": result.get("branches"),
"current_branch": result.get("current_branch"),
"message": result.get("message", ""),
},
)
except ValueError as e:
return bad(handler, str(e))
except GitWorkspaceError as e:
return _git_bad(handler, e)
def _handle_git_stash_checkout(handler, body):
try:
require(body, "session_id", "ref", "mode")
session, workspace = _git_session_and_workspace(handler, body["session_id"])
if workspace is None:
return True
if _git_reject_destructive_if_unsafe(handler, session):
return True
from api.workspace_git import GitWorkspaceError, git_stash_and_checkout
result = git_stash_and_checkout(
workspace,
str(body.get("ref", "")),
str(body.get("mode", "local")),
new_branch=body.get("new_branch"),
track=bool(body.get("track")),
)
return j(
handler,
{
"ok": True,
"git": result.get("status"),
"branches": result.get("branches"),
"current_branch": result.get("current_branch"),
"message": result.get("message", ""),
"stash_name": result.get("stash_name", ""),
"stashed": bool(result.get("stashed")),
},
)
except ValueError as e:
return bad(handler, str(e))
except GitWorkspaceError as e:
return _git_bad(handler, e)
def _handle_file_delete(handler, body):
try:
require(body, "session_id", "path")
+40 -1
View File
@@ -7,6 +7,7 @@ profile has its own workspace configuration. State files live at
``{profile_home}/webui_state/last_workspace.txt``. The global STATE_DIR
paths are used as fallback when no profile module is available.
"""
import hashlib
import json
import logging
import os
@@ -714,12 +715,18 @@ def list_dir(workspace: Path, rel: str='.'):
display_path = str(Path(item.name))
if rel and rel != '.':
display_path = rel + '/' + display_path
try:
item_stat = item.lstat()
mtime_ns = item_stat.st_mtime_ns
except OSError:
mtime_ns = None
entry = {
'name': item.name,
'path': display_path,
'type': 'symlink',
'target': str(link_target),
'is_dir': is_dir,
'mtime_ns': mtime_ns,
}
if not is_dir:
try:
@@ -733,17 +740,49 @@ def list_dir(workspace: Path, rel: str='.'):
entry_path = item.name
if rel and rel != '.':
entry_path = rel + '/' + item.name
try:
item_stat = item.stat()
size = item_stat.st_size if item.is_file() else None
mtime_ns = item_stat.st_mtime_ns
except OSError:
size = None
mtime_ns = None
entries.append({
'name': item.name,
'path': entry_path,
'type': 'dir' if item.is_dir() else 'file',
'size': item.stat().st_size if item.is_file() else None,
'size': size,
'mtime_ns': mtime_ns,
})
if len(entries) >= 200:
break
return entries
def dir_signature(workspace: Path, rel: str = '.', entries: list[dict] | None = None) -> str:
"""Return a cheap, stable signature for a listed workspace directory.
The signature is based only on bounded directory-entry metadata already used
by the workspace tree: names, displayed paths, entry type, file sizes,
mtimes, and symlink targets. It intentionally does not read file contents.
"""
if entries is None:
entries = list_dir(workspace, rel)
payload = []
for entry in entries:
payload.append({
'name': entry.get('name'),
'path': entry.get('path'),
'type': entry.get('type'),
'is_dir': entry.get('is_dir'),
'size': entry.get('size'),
'mtime_ns': entry.get('mtime_ns'),
'target': entry.get('target'),
})
raw = json.dumps(payload, sort_keys=True, separators=(',', ':'), ensure_ascii=False)
return hashlib.sha256(raw.encode('utf-8')).hexdigest()
def read_file_content(workspace: Path, rel: str) -> dict:
target = safe_resolve_ws(workspace, rel)
if not target.is_file():
+1158
View File
File diff suppressed because it is too large Load Diff
+1
View File
@@ -536,6 +536,7 @@ def test_server():
# pytest-side block can't see.
env["HERMES_WEBUI_TEST_NETWORK_BLOCK"] = "1"
env.update({
"HERMES_WEBUI_WORKSPACE_GIT_DESTRUCTIVE": "1",
"HERMES_WEBUI_PORT": str(TEST_PORT),
"HERMES_WEBUI_HOST": "127.0.0.1",
"HERMES_WEBUI_STATE_DIR": str(TEST_STATE_DIR),
+26
View File
@@ -0,0 +1,26 @@
from api.workspace import dir_signature, list_dir
def test_directory_signature_is_metadata_only_and_changes_with_entries(tmp_path):
(tmp_path / "alpha.txt").write_text("one", encoding="utf-8")
entries = list_dir(tmp_path, ".")
sig1 = dir_signature(tmp_path, ".", entries)
assert isinstance(sig1, str)
assert len(sig1) == 64
assert all("mtime_ns" in entry for entry in entries)
(tmp_path / "beta.txt").write_text("two", encoding="utf-8")
entries2 = list_dir(tmp_path, ".")
sig2 = dir_signature(tmp_path, ".", entries2)
assert sig2 != sig1
def test_directory_signature_can_be_computed_from_supplied_entries(tmp_path):
(tmp_path / "alpha.txt").write_text("one", encoding="utf-8")
entries = list_dir(tmp_path, ".")
assert dir_signature(tmp_path, ".", entries) == dir_signature(tmp_path, ".", entries)
+796
View File
@@ -0,0 +1,796 @@
import json
import pathlib
import subprocess
import uuid
import urllib.error
import urllib.parse
import urllib.request
import pytest
from tests._pytest_port import BASE
ROOT = pathlib.Path(__file__).parent.parent
def _git(cwd, *args):
result = subprocess.run(
["git", *args],
cwd=str(cwd),
shell=False,
text=True,
capture_output=True,
timeout=20,
)
assert result.returncode == 0, result.stderr or result.stdout
return result.stdout
def _init_repo(path):
path.mkdir(parents=True, exist_ok=True)
_git(path, "init")
_git(path, "config", "user.email", "hermes-tests@example.invalid")
_git(path, "config", "user.name", "Hermes Tests")
return path
def _commit_all(path, message="initial"):
_git(path, "add", ".")
_git(path, "commit", "-m", message)
def _get(path):
try:
with urllib.request.urlopen(BASE + path, timeout=10) as r:
return json.loads(r.read()), r.status
except urllib.error.HTTPError as e:
return json.loads(e.read()), e.code
def _post(path, body=None):
data = json.dumps(body or {}).encode()
req = urllib.request.Request(
BASE + path,
data=data,
headers={"Content-Type": "application/json"},
)
try:
with urllib.request.urlopen(req, timeout=10) as r:
return json.loads(r.read()), r.status
except urllib.error.HTTPError as e:
return json.loads(e.read()), e.code
def _make_session(created_list, ws=None):
body = {}
if ws:
body["workspace"] = str(ws)
data, status = _post("/api/session/new", body)
assert status == 200
sid = data["session"]["session_id"]
created_list.append(sid)
return sid, pathlib.Path(data["session"]["workspace"])
def test_git_status_non_git_workspace(tmp_path):
from api.workspace_git import git_status
ws = tmp_path / "plain"
ws.mkdir()
assert git_status(ws) == {"is_git": False}
def test_git_status_handles_staged_unstaged_untracked_deleted_and_renamed(tmp_path):
from api.workspace_git import git_status
repo = _init_repo(tmp_path / "repo")
(repo / "tracked.txt").write_text("one\n", encoding="utf-8")
(repo / "delete-me.txt").write_text("bye\n", encoding="utf-8")
(repo / "old name.txt").write_text("move\n", encoding="utf-8")
_commit_all(repo)
(repo / "tracked.txt").write_text("one\ntwo\n", encoding="utf-8")
(repo / "staged.txt").write_text("staged\n", encoding="utf-8")
_git(repo, "add", "staged.txt")
(repo / "delete-me.txt").unlink()
_git(repo, "mv", "old name.txt", "new name.txt")
(repo / "untracked space.txt").write_text("new\nfile\n", encoding="utf-8")
status = git_status(repo)
by_path = {item["path"]: item for item in status["files"]}
assert status["is_git"] is True
assert by_path["tracked.txt"]["unstaged"] is True
assert by_path["staged.txt"]["staged"] is True
assert by_path["delete-me.txt"]["status"] == "D"
assert by_path["new name.txt"]["old_path"] == "old name.txt"
assert by_path["untracked space.txt"]["untracked"] is True
assert by_path["untracked space.txt"]["additions"] == 2
assert status["totals"]["changed"] >= 5
def test_git_status_reports_ignored_files_without_counting_them_as_changes(tmp_path):
from api.workspace_git import git_status
repo = _init_repo(tmp_path / "repo")
(repo / ".gitignore").write_text("*.log\nbuild/\n", encoding="utf-8")
(repo / "tracked.txt").write_text("one\n", encoding="utf-8")
_commit_all(repo)
(repo / "tracked.txt").write_text("one\ntwo\n", encoding="utf-8")
(repo / "debug.log").write_text("ignored log\n", encoding="utf-8")
build = repo / "build"
build.mkdir()
(build / "artifact.txt").write_text("ignored artifact\n", encoding="utf-8")
status = git_status(repo)
by_path = {item["path"]: item for item in status["files"]}
assert by_path["tracked.txt"]["unstaged"] is True
assert by_path["debug.log"]["ignored"] is True
assert by_path["debug.log"]["status"] == "Ignored"
assert by_path["build/"]["ignored"] is True
assert by_path["build/"]["staged"] is False
assert by_path["build/"]["untracked"] is False
assert status["totals"]["changed"] == 1
assert status["totals"]["untracked"] == 0
def test_git_status_ignores_crlf_only_worktree_noise(tmp_path):
from api.workspace_git import git_status
repo = _init_repo(tmp_path / "repo")
(repo / "tracked.txt").write_text("one\ntwo\n", encoding="utf-8", newline="\n")
_commit_all(repo)
(repo / "tracked.txt").write_text("one\r\ntwo\r\n", encoding="utf-8", newline="")
raw = _git(repo, "status", "--porcelain", "--", "tracked.txt")
assert raw.startswith(" M")
status = git_status(repo)
assert status["totals"]["changed"] == 0
assert status["files"] == []
assert status["noise_filtering"]["active"] is True
assert status["noise_filtering"]["crlf_only"] == 1
def test_git_status_keeps_real_edit_with_crlf_endings(tmp_path):
from api.workspace_git import git_status
repo = _init_repo(tmp_path / "repo")
(repo / "tracked.txt").write_text("one\ntwo\n", encoding="utf-8", newline="\n")
_commit_all(repo)
(repo / "tracked.txt").write_text("one\r\ntwo\r\nthree\r\n", encoding="utf-8", newline="")
status = git_status(repo)
by_path = {item["path"]: item for item in status["files"]}
assert status["totals"]["changed"] == 1
assert by_path["tracked.txt"]["unstaged"] is True
assert by_path["tracked.txt"]["additions"] == 1
assert by_path["tracked.txt"]["deletions"] == 0
def test_git_status_ignores_filemode_only_noise(tmp_path):
from api.workspace_git import git_status
repo = _init_repo(tmp_path / "repo")
script = repo / "script.sh"
script.write_text("#!/bin/sh\necho hi\n", encoding="utf-8")
_commit_all(repo)
_git(repo, "update-index", "--chmod=+x", "script.sh")
raw = _git(repo, "status", "--porcelain", "--", "script.sh")
assert "script.sh" in raw
status = git_status(repo)
assert status["totals"]["changed"] == 0
assert status["files"] == []
assert status["noise_filtering"]["active"] is True
def test_git_status_scopes_nested_workspace_to_that_directory(tmp_path):
from api.workspace_git import git_status
repo = _init_repo(tmp_path / "repo")
nested = repo / "app"
nested.mkdir()
(nested / "inside.txt").write_text("inside\n", encoding="utf-8")
(repo / "outside.txt").write_text("outside\n", encoding="utf-8")
_commit_all(repo)
(nested / "inside.txt").write_text("inside\nchanged\n", encoding="utf-8")
(repo / "outside.txt").write_text("outside\nchanged\n", encoding="utf-8")
status = git_status(nested)
paths = {item["path"] for item in status["files"]}
assert paths == {"inside.txt"}
def test_git_diff_generates_untracked_text_diff_and_blocks_escape(tmp_path):
from api.workspace_git import GitWorkspaceError, git_diff
repo = _init_repo(tmp_path / "repo")
(repo / "tracked.txt").write_text("one\n", encoding="utf-8")
_commit_all(repo)
(repo / "new file.txt").write_text("hello\nworld\n", encoding="utf-8")
diff = git_diff(repo, "new file.txt", "unstaged")
assert diff["binary"] is False
assert "+++ b/new file.txt" in diff["diff"]
assert "+hello" in diff["diff"]
with pytest.raises(GitWorkspaceError):
git_diff(repo, "../outside.txt", "unstaged")
def test_git_status_reports_untracked_files_inside_directories(tmp_path):
from api.workspace_git import git_discard, git_status
repo = _init_repo(tmp_path / "repo")
(repo / "tracked.txt").write_text("one\n", encoding="utf-8")
_commit_all(repo)
nested = repo / "newdir"
nested.mkdir()
(nested / "a.txt").write_text("hello\n", encoding="utf-8")
status = git_status(repo)
paths = {item["path"] for item in status["files"]}
assert "newdir/a.txt" in paths
assert "newdir/" not in paths
git_discard(repo, ["newdir/a.txt"], delete_untracked=True)
assert not (nested / "a.txt").exists()
def test_git_status_reports_ignored_files_without_counting_them_as_changed(tmp_path):
from api.workspace_git import git_status
repo = _init_repo(tmp_path / "repo")
(repo / ".gitignore").write_text("*.log\nbuild/\n", encoding="utf-8")
(repo / "tracked.txt").write_text("one\n", encoding="utf-8")
_commit_all(repo)
(repo / "tracked.txt").write_text("one\ntwo\n", encoding="utf-8")
(repo / "debug.log").write_text("ignored log\n", encoding="utf-8")
build = repo / "build"
build.mkdir()
(build / "artifact.txt").write_text("ignored artifact\n", encoding="utf-8")
status = git_status(repo)
by_path = {item["path"]: item for item in status["files"]}
assert by_path["tracked.txt"]["unstaged"] is True
assert by_path["debug.log"]["ignored"] is True
assert by_path["debug.log"]["status"] == "Ignored"
assert by_path["debug.log"]["staged"] is False
assert by_path["debug.log"]["unstaged"] is False
assert by_path["debug.log"]["untracked"] is False
assert any(item["ignored"] and item["path"].startswith("build") for item in status["files"])
assert status["totals"]["changed"] == 1
assert status["totals"]["untracked"] == 0
def test_git_diff_large_untracked_file_is_bounded(tmp_path):
from api.workspace_git import DIFF_SIZE_LIMIT, git_diff, git_status
repo = _init_repo(tmp_path / "repo")
(repo / "tracked.txt").write_text("one\n", encoding="utf-8")
_commit_all(repo)
large = repo / "large.txt"
large.write_text("x" * (DIFF_SIZE_LIMIT + 1), encoding="utf-8")
status = git_status(repo)
by_path = {item["path"]: item for item in status["files"]}
assert by_path["large.txt"]["untracked"] is True
assert by_path["large.txt"]["additions"] == 0
diff = git_diff(repo, "large.txt", "unstaged")
assert diff["too_large"] is True
assert diff["diff"] == ""
def test_git_stage_unstage_discard_and_commit(tmp_path):
from api.workspace_git import git_commit, git_discard, git_stage, git_status, git_unstage
repo = _init_repo(tmp_path / "repo")
(repo / "tracked.txt").write_text("one\n", encoding="utf-8")
_commit_all(repo)
(repo / "tracked.txt").write_text("one\ntwo\n", encoding="utf-8")
staged = git_stage(repo, ["tracked.txt"])
assert staged["totals"]["staged"] == 1
unstaged = git_unstage(repo, ["tracked.txt"])
assert unstaged["totals"]["staged"] == 0
assert unstaged["totals"]["unstaged"] == 1
git_discard(repo, ["tracked.txt"])
assert git_status(repo)["totals"]["changed"] == 0
(repo / "tracked.txt").write_text("one\nthree\n", encoding="utf-8")
git_stage(repo, ["tracked.txt"])
committed = git_commit(repo, "Update tracked file")
assert committed["ok"] is True
assert committed["commit"]
assert committed["status"]["totals"]["changed"] == 0
def test_git_commit_selected_ignores_unrelated_real_index(tmp_path):
from api.workspace_git import git_commit_selected, git_status
repo = _init_repo(tmp_path / "repo")
(repo / "selected.txt").write_text("one\n", encoding="utf-8")
(repo / "staged.txt").write_text("alpha\n", encoding="utf-8")
_commit_all(repo)
(repo / "selected.txt").write_text("one\ntwo\n", encoding="utf-8")
(repo / "staged.txt").write_text("alpha\nbeta\n", encoding="utf-8")
_git(repo, "add", "staged.txt")
committed = git_commit_selected(repo, "Commit selected only", ["selected.txt"])
assert committed["ok"] is True
assert committed["paths"] == ["selected.txt"]
assert _git(repo, "show", "--name-only", "--format=", "HEAD").splitlines() == ["selected.txt"]
by_path = {item["path"]: item for item in git_status(repo)["files"]}
assert "selected.txt" not in by_path
assert by_path["staged.txt"]["staged"] is True
def test_git_commit_selected_supports_initial_commit(tmp_path):
from api.workspace_git import git_commit_selected, git_status
repo = _init_repo(tmp_path / "repo")
(repo / "first.txt").write_text("first\n", encoding="utf-8")
committed = git_commit_selected(repo, "Initial selected commit", ["first.txt"])
assert committed["ok"] is True
assert _git(repo, "show", "--name-only", "--format=", "HEAD").splitlines() == ["first.txt"]
assert git_status(repo)["totals"]["changed"] == 0
def test_git_commit_selected_preserves_rename_semantics(tmp_path):
from api.workspace_git import git_commit_selected, git_status
repo = _init_repo(tmp_path / "repo")
(repo / "old.txt").write_text("old\n", encoding="utf-8")
_commit_all(repo)
_git(repo, "mv", "old.txt", "new.txt")
committed = git_commit_selected(repo, "Rename selected file", ["new.txt"])
assert committed["ok"] is True
assert _git(repo, "ls-tree", "--name-only", "HEAD").splitlines() == ["new.txt"]
assert "old.txt" not in _git(repo, "status", "--porcelain=v2")
assert git_status(repo)["totals"]["changed"] == 0
def test_git_commit_selected_handles_untracked_and_mixed_paths(tmp_path):
from api.workspace_git import git_commit_selected
repo = _init_repo(tmp_path / "repo")
(repo / "tracked.txt").write_text("one\n", encoding="utf-8")
_commit_all(repo)
(repo / "tracked.txt").write_text("one\ntwo\n", encoding="utf-8")
(repo / "new.txt").write_text("new\n", encoding="utf-8")
committed = git_commit_selected(repo, "Commit mixed selected files", ["tracked.txt", "new.txt"])
assert committed["ok"] is True
assert set(_git(repo, "show", "--name-only", "--format=", "HEAD").splitlines()) == {
"tracked.txt",
"new.txt",
}
def test_git_commit_selected_respects_nested_workspace_scope(tmp_path):
from api.workspace_git import GitWorkspaceError, git_commit_selected
repo = _init_repo(tmp_path / "repo")
nested = repo / "app"
nested.mkdir()
(nested / "inside.txt").write_text("inside\n", encoding="utf-8")
(repo / "outside.txt").write_text("outside\n", encoding="utf-8")
_commit_all(repo)
(nested / "inside.txt").write_text("inside\nchanged\n", encoding="utf-8")
(repo / "outside.txt").write_text("outside\nchanged\n", encoding="utf-8")
committed = git_commit_selected(nested, "Nested selected commit", ["inside.txt"])
assert committed["paths"] == ["inside.txt"]
assert _git(repo, "show", "--name-only", "--format=", "HEAD").splitlines() == ["app/inside.txt"]
with pytest.raises(GitWorkspaceError) as outside:
git_commit_selected(nested, "Outside", ["../outside.txt"])
assert outside.value.code == "path_outside_workspace"
def test_git_commit_selected_rejects_conflicts_and_path_traversal(tmp_path):
from api.workspace_git import GitWorkspaceError, git_commit_selected
repo = _init_repo(tmp_path / "repo")
(repo / "conflict.txt").write_text("base\n", encoding="utf-8")
_commit_all(repo)
_git(repo, "checkout", "-b", "side")
(repo / "conflict.txt").write_text("side\n", encoding="utf-8")
_commit_all(repo, "side")
_git(repo, "checkout", "master")
(repo / "conflict.txt").write_text("main\n", encoding="utf-8")
_commit_all(repo, "main")
subprocess.run(["git", "merge", "side"], cwd=repo, shell=False, text=True, capture_output=True, timeout=20)
with pytest.raises(GitWorkspaceError) as conflict:
git_commit_selected(repo, "Nope", ["conflict.txt"])
assert conflict.value.code == "conflict"
with pytest.raises(GitWorkspaceError) as traversal:
git_commit_selected(repo, "Nope", ["../outside.txt"])
assert traversal.value.code == "path_outside_workspace"
def test_selected_commit_message_prompt_uses_selected_diff(tmp_path):
from api.workspace_git import selected_commit_message_prompt
repo = _init_repo(tmp_path / "repo")
(repo / "selected.txt").write_text("one\n", encoding="utf-8")
(repo / "other.txt").write_text("alpha\n", encoding="utf-8")
_commit_all(repo)
(repo / "selected.txt").write_text("one\ntwo\n", encoding="utf-8")
(repo / "other.txt").write_text("alpha\nbeta\n", encoding="utf-8")
prompt = selected_commit_message_prompt(repo, ["selected.txt"])
assert "selected.txt" in prompt["user_prompt"]
assert "+two" in prompt["user_prompt"]
assert "other.txt" not in prompt["user_prompt"]
assert "beta" not in prompt["user_prompt"]
def test_staged_commit_message_prompt_uses_only_staged_diff(tmp_path):
from api.workspace_git import (
GitWorkspaceError,
clean_generated_commit_message,
staged_commit_message_prompt,
)
repo = _init_repo(tmp_path / "repo")
(repo / "tracked.txt").write_text("one\n", encoding="utf-8")
_commit_all(repo)
(repo / "tracked.txt").write_text("one\nstaged\n", encoding="utf-8")
_git(repo, "add", "tracked.txt")
(repo / "tracked.txt").write_text("one\nstaged\nunstaged\n", encoding="utf-8")
prompt = staged_commit_message_prompt(repo)
assert prompt["truncated"] is False
assert "tracked.txt" in prompt["user_prompt"]
assert "+staged" in prompt["user_prompt"]
assert "unstaged" not in prompt["user_prompt"]
assert "Never mention AI, Cursor, Zed, agents" in prompt["system_prompt"]
_git(repo, "restore", "--staged", "tracked.txt")
with pytest.raises(GitWorkspaceError):
staged_commit_message_prompt(repo)
assert clean_generated_commit_message("```text\nSubject\n\n- Body\n```") == "Subject\n\n- Body"
def test_git_fetch_pull_and_push_with_upstream(tmp_path):
from api.workspace_git import git_fetch, git_pull, git_push, git_status
remote = tmp_path / "remote.git"
_git(tmp_path, "init", "--bare", str(remote))
origin = _init_repo(tmp_path / "origin")
(origin / "tracked.txt").write_text("one\n", encoding="utf-8")
_commit_all(origin)
_git(origin, "remote", "add", "origin", str(remote))
_git(origin, "push", "-u", "origin", "HEAD")
clone = tmp_path / "clone"
_git(tmp_path, "clone", str(remote), str(clone))
_git(clone, "config", "user.email", "hermes-tests@example.invalid")
_git(clone, "config", "user.name", "Hermes Tests")
(origin / "tracked.txt").write_text("one\ntwo\n", encoding="utf-8")
_commit_all(origin, "Remote update")
_git(origin, "push")
fetched = git_fetch(clone)
assert fetched["status"]["behind"] == 1
pulled = git_pull(clone)
assert pulled["status"]["behind"] == 0
assert (clone / "tracked.txt").read_text(encoding="utf-8") == "one\ntwo\n"
(clone / "tracked.txt").write_text("one\ntwo\nthree\n", encoding="utf-8")
_git(clone, "add", "tracked.txt")
_git(clone, "commit", "-m", "Local update")
assert git_status(clone)["ahead"] == 1
pushed = git_push(clone)
assert pushed["status"]["ahead"] == 0
def test_git_branches_lists_local_remote_and_upstream(tmp_path):
from api.workspace_git import git_branches
remote = tmp_path / "remote.git"
_git(tmp_path, "init", "--bare", str(remote))
origin = _init_repo(tmp_path / "origin")
(origin / "tracked.txt").write_text("one\n", encoding="utf-8")
_commit_all(origin)
_git(origin, "branch", "-M", "main")
_git(origin, "remote", "add", "origin", str(remote))
_git(origin, "push", "-u", "origin", "main")
_git(remote, "symbolic-ref", "HEAD", "refs/heads/main")
clone = tmp_path / "clone"
_git(tmp_path, "clone", str(remote), str(clone))
branches = git_branches(clone)
assert branches["current"] == "main"
assert branches["detached"] is False
assert any(item["name"] == "main" and item["upstream"] == "origin/main" for item in branches["local"])
main = next(item for item in branches["local"] if item["name"] == "main")
assert "updated_relative" in main and "author" in main and "subject" in main
assert any(item["name"] == "origin/main" for item in branches["remote"])
assert not any(item["name"] == "origin" for item in branches["remote"])
def test_git_checkout_local_new_remote_dirty_and_invalid_refs(tmp_path):
from api.workspace_git import GitWorkspaceError, git_branches, git_checkout
remote = tmp_path / "remote.git"
_git(tmp_path, "init", "--bare", str(remote))
origin = _init_repo(tmp_path / "origin")
(origin / "tracked.txt").write_text("one\n", encoding="utf-8")
_commit_all(origin)
_git(origin, "branch", "-M", "main")
_git(origin, "remote", "add", "origin", str(remote))
_git(origin, "push", "-u", "origin", "main")
_git(remote, "symbolic-ref", "HEAD", "refs/heads/main")
_git(origin, "checkout", "-b", "remote-feature")
(origin / "remote.txt").write_text("remote\n", encoding="utf-8")
_commit_all(origin, "remote feature")
_git(origin, "push", "-u", "origin", "remote-feature")
clone = tmp_path / "clone"
_git(tmp_path, "clone", str(remote), str(clone))
_git(clone, "config", "user.email", "hermes-tests@example.invalid")
_git(clone, "config", "user.name", "Hermes Tests")
created = git_checkout(clone, "main", "new", new_branch="local-work")
assert created["current_branch"] == "local-work"
assert git_branches(clone)["current"] == "local-work"
switched = git_checkout(clone, "main", "local")
assert switched["current_branch"] == "main"
tracked = git_checkout(clone, "origin/remote-feature", "remote", new_branch="remote-feature", track=True)
assert tracked["current_branch"] == "remote-feature"
assert git_branches(clone)["upstream"] == "origin/remote-feature"
(clone / "tracked.txt").write_text("dirty\n", encoding="utf-8")
with pytest.raises(GitWorkspaceError) as dirty:
git_checkout(clone, "main", "local")
assert dirty.value.code == "dirty_worktree"
_git(clone, "restore", "tracked.txt")
with pytest.raises(GitWorkspaceError) as invalid:
git_checkout(clone, "does-not-exist", "local")
assert invalid.value.code in {"invalid_ref", "git_failed"}
def test_git_checkout_detached_requires_explicit_mode(tmp_path):
from api.workspace_git import git_branches, git_checkout
repo = _init_repo(tmp_path / "repo")
(repo / "tracked.txt").write_text("one\n", encoding="utf-8")
_commit_all(repo)
sha = _git(repo, "rev-parse", "--short", "HEAD").strip()
result = git_checkout(repo, sha, "detached")
assert result["ok"] is True
branches = git_branches(repo)
assert branches["detached"] is True
assert branches["current"] == sha
def test_git_stash_and_checkout_is_explicit(tmp_path):
from api.workspace_git import git_stash_and_checkout, git_status
repo = _init_repo(tmp_path / "repo")
(repo / "tracked.txt").write_text("one\n", encoding="utf-8")
_commit_all(repo)
_git(repo, "checkout", "-b", "target")
_git(repo, "checkout", "master")
(repo / "tracked.txt").write_text("dirty\n", encoding="utf-8")
result = git_stash_and_checkout(repo, "target", "local")
assert result["ok"] is True
assert result["stashed"] is True
assert result["stash_name"].startswith("hermes-webui branch switch")
assert result["current_branch"] == "target"
assert git_status(repo)["totals"]["changed"] == 0
assert "hermes-webui branch switch target" in _git(repo, "stash", "list")
def test_git_stash_checkout_validates_before_stashing(tmp_path):
from api.workspace_git import GitWorkspaceError, git_stash_and_checkout
repo = _init_repo(tmp_path / "repo")
(repo / "tracked.txt").write_text("one\n", encoding="utf-8")
_commit_all(repo)
(repo / "tracked.txt").write_text("dirty\n", encoding="utf-8")
with pytest.raises(GitWorkspaceError) as invalid:
git_stash_and_checkout(repo, "missing-branch", "local")
assert invalid.value.code == "invalid_ref"
assert "M tracked.txt" in _git(repo, "status", "--porcelain")
assert _git(repo, "stash", "list") == ""
def test_git_routes_status_diff_stage_unstage_discard_commit(cleanup_test_sessions):
sid, base_ws = _make_session(cleanup_test_sessions)
repo = base_ws / f"git-route-{uuid.uuid4().hex[:8]}"
_init_repo(repo)
(repo / "tracked.txt").write_text("one\n", encoding="utf-8")
_commit_all(repo)
_post("/api/session/update", {"session_id": sid, "workspace": str(repo), "model": "openai/gpt-5.4-mini"})
(repo / "tracked.txt").write_text("one\ntwo\n", encoding="utf-8")
status, code = _get(f"/api/git/status?session_id={sid}")
assert code == 200
assert status["git"]["totals"]["unstaged"] == 1
diff, code = _get(
f"/api/git/diff?session_id={sid}&path={urllib.parse.quote('tracked.txt')}&kind=unstaged"
)
assert code == 200
assert "+two" in diff["diff"]["diff"]
staged, code = _post("/api/git/stage", {"session_id": sid, "paths": ["tracked.txt"]})
assert code == 200 and staged["git"]["totals"]["staged"] == 1
unstaged, code = _post("/api/git/unstage", {"session_id": sid, "paths": ["tracked.txt"]})
assert code == 200 and unstaged["git"]["totals"]["unstaged"] == 1
discarded, code = _post("/api/git/discard", {"session_id": sid, "paths": ["tracked.txt"]})
assert code == 200 and discarded["git"]["totals"]["changed"] == 0
(repo / "tracked.txt").write_text("one\nthree\n", encoding="utf-8")
_post("/api/git/stage", {"session_id": sid, "paths": ["tracked.txt"]})
committed, code = _post("/api/git/commit", {"session_id": sid, "message": "Route commit"})
assert code == 200
assert committed["ok"] is True
assert committed["status"]["totals"]["changed"] == 0
def test_git_routes_branches_and_checkout(cleanup_test_sessions):
sid, base_ws = _make_session(cleanup_test_sessions)
repo = base_ws / f"git-branch-route-{uuid.uuid4().hex[:8]}"
_init_repo(repo)
(repo / "tracked.txt").write_text("one\n", encoding="utf-8")
_commit_all(repo)
_git(repo, "branch", "-M", "main")
_git(repo, "checkout", "-b", "feature")
_git(repo, "checkout", "main")
_post("/api/session/update", {"session_id": sid, "workspace": str(repo), "model": "openai/gpt-5.4-mini"})
branches, code = _get(f"/api/git/branches?session_id={sid}")
assert code == 200
assert branches["branches"]["current"] == "main"
assert any(item["name"] == "feature" for item in branches["branches"]["local"])
checked, code = _post(
"/api/git/checkout",
{"session_id": sid, "ref": "feature", "mode": "local", "dirty_mode": "block"},
)
assert code == 200
assert checked["ok"] is True
assert checked["current_branch"] == "feature"
assert checked["git"]["branch"] == "feature"
def test_git_routes_selected_commit_and_structured_error(cleanup_test_sessions):
sid, base_ws = _make_session(cleanup_test_sessions)
repo = base_ws / f"git-selected-route-{uuid.uuid4().hex[:8]}"
_init_repo(repo)
(repo / "selected.txt").write_text("one\n", encoding="utf-8")
(repo / "other.txt").write_text("alpha\n", encoding="utf-8")
_commit_all(repo)
_post("/api/session/update", {"session_id": sid, "workspace": str(repo), "model": "openai/gpt-5.4-mini"})
(repo / "selected.txt").write_text("one\ntwo\n", encoding="utf-8")
(repo / "other.txt").write_text("alpha\nbeta\n", encoding="utf-8")
_git(repo, "add", "other.txt")
bad, code = _post("/api/git/commit-selected", {"session_id": sid, "message": "Bad", "paths": ["../x"]})
assert code == 400
assert bad["code"] == "path_outside_workspace"
committed, code = _post(
"/api/git/commit-selected",
{"session_id": sid, "message": "Selected route commit", "paths": ["selected.txt"]},
)
assert code == 200
assert committed["ok"] is True
assert committed["paths"] == ["selected.txt"]
assert _git(repo, "show", "--name-only", "--format=", "HEAD").splitlines() == ["selected.txt"]
def test_git_env_scrub_removes_redirecting_vars_and_preserves_temp_index(monkeypatch):
from api.workspace_git import _clean_git_env
monkeypatch.setenv("GIT_DIR", "/tmp/evil-git-dir")
monkeypatch.setenv("GIT_WORK_TREE", "/tmp/evil-work-tree")
monkeypatch.setenv("GIT_CONFIG_GLOBAL", "/tmp/evil-config")
env = _clean_git_env({"GIT_INDEX_FILE": "/tmp/hermes-index"})
assert "GIT_DIR" not in env
assert "GIT_WORK_TREE" not in env
assert "GIT_CONFIG_GLOBAL" not in env
assert env["GIT_INDEX_FILE"] == "/tmp/hermes-index"
def test_git_error_classifier_identifies_non_fast_forward_push():
from api.workspace_git import _classify_git_error
assert _classify_git_error("Updates were rejected", ["push"]) == "non_fast_forward"
assert _classify_git_error("non-fast-forward", ["push"]) == "non_fast_forward"
assert _classify_git_error("fetch first", ["push"]) == "non_fast_forward"
def test_git_commit_hook_failure_returns_hook_failed_code(tmp_path):
from api.workspace_git import GitWorkspaceError, git_commit, git_stage
repo = _init_repo(tmp_path / "repo")
(repo / "tracked.txt").write_text("one\n", encoding="utf-8")
_commit_all(repo)
hook = repo / ".git" / "hooks" / "pre-commit"
hook.write_text("#!/bin/sh\necho hook blocked >&2\nexit 1\n", encoding="utf-8")
hook.chmod(0o755)
(repo / "tracked.txt").write_text("one\ntwo\n", encoding="utf-8")
git_stage(repo, ["tracked.txt"])
with pytest.raises(GitWorkspaceError) as exc:
git_commit(repo, "Hook should fail")
assert exc.value.code == "hook_failed"
def test_destructive_workspace_git_flag_defaults_off_and_accepts_truthy(monkeypatch):
from api.workspace_git import WORKSPACE_GIT_DESTRUCTIVE_ENV, workspace_git_destructive_enabled
monkeypatch.delenv(WORKSPACE_GIT_DESTRUCTIVE_ENV, raising=False)
assert workspace_git_destructive_enabled() is False
monkeypatch.setenv(WORKSPACE_GIT_DESTRUCTIVE_ENV, "1")
assert workspace_git_destructive_enabled() is True
monkeypatch.setenv(WORKSPACE_GIT_DESTRUCTIVE_ENV, "true")
assert workspace_git_destructive_enabled() is True
def test_git_active_stream_lock_detection(monkeypatch):
import types
from api import routes
from api.config import STREAMS, STREAMS_LOCK
session = types.SimpleNamespace(active_stream_id="stream-git-lock-test")
with STREAMS_LOCK:
STREAMS[session.active_stream_id] = object()
try:
assert routes._git_locked_by_active_stream(session) is True
finally:
with STREAMS_LOCK:
STREAMS.pop(session.active_stream_id, None)
assert routes._git_locked_by_active_stream(session) is False