mirror of
https://github.com/nesquena/hermes-webui.git
synced 2026-05-24 18:50:15 +00:00
222 lines
6.7 KiB
Python
222 lines
6.7 KiB
Python
import subprocess
|
|
from types import SimpleNamespace
|
|
from urllib.parse import urlparse
|
|
|
|
import pytest
|
|
|
|
import api.models as models
|
|
from api.models import SESSIONS, Session
|
|
|
|
|
|
def _git(cwd, *args):
|
|
return subprocess.run(
|
|
["git", *args],
|
|
cwd=cwd,
|
|
text=True,
|
|
capture_output=True,
|
|
check=True,
|
|
)
|
|
|
|
|
|
@pytest.fixture(autouse=True)
|
|
def _isolate_sessions(tmp_path, monkeypatch):
|
|
session_dir = tmp_path / "sessions"
|
|
session_dir.mkdir()
|
|
monkeypatch.setattr(models, "SESSION_DIR", session_dir)
|
|
monkeypatch.setattr(models, "SESSION_INDEX_FILE", session_dir / "_index.json")
|
|
SESSIONS.clear()
|
|
yield session_dir
|
|
SESSIONS.clear()
|
|
|
|
|
|
@pytest.fixture
|
|
def git_worktree(tmp_path):
|
|
repo = tmp_path / "repo"
|
|
remote = tmp_path / "remote.git"
|
|
worktree = tmp_path / "hermes-status"
|
|
repo.mkdir()
|
|
_git(repo, "init")
|
|
_git(repo, "config", "user.email", "test@example.com")
|
|
_git(repo, "config", "user.name", "Hermes Test")
|
|
_git(repo, "branch", "-M", "main")
|
|
(repo / "README.md").write_text("hello\n", encoding="utf-8")
|
|
_git(repo, "add", "README.md")
|
|
_git(repo, "commit", "-m", "initial")
|
|
_git(remote.parent, "init", "--bare", remote.name)
|
|
_git(repo, "remote", "add", "origin", str(remote))
|
|
_git(repo, "push", "-u", "origin", "main")
|
|
_git(repo, "worktree", "add", "-b", "hermes/status", str(worktree), "main")
|
|
_git(worktree, "push", "-u", "origin", "hermes/status")
|
|
return repo, worktree
|
|
|
|
|
|
def _session_for_worktree(repo, worktree, **kwargs):
|
|
return Session(
|
|
session_id=kwargs.pop("session_id", "wtstatus001"),
|
|
workspace=str(worktree),
|
|
worktree_path=str(worktree),
|
|
worktree_branch="hermes/status",
|
|
worktree_repo_root=str(repo),
|
|
worktree_created_at=123.0,
|
|
**kwargs,
|
|
)
|
|
|
|
|
|
def test_worktree_status_reports_clean_existing_worktree(git_worktree):
|
|
from api.worktrees import worktree_status_for_session
|
|
|
|
repo, worktree = git_worktree
|
|
status = worktree_status_for_session(_session_for_worktree(repo, worktree))
|
|
|
|
assert status["path"] == str(worktree.resolve())
|
|
assert status["exists"] is True
|
|
assert status["listed"] is True
|
|
assert status["dirty"] is False
|
|
assert status["untracked_count"] == 0
|
|
assert status["ahead_behind"]["available"] is True
|
|
assert status["ahead_behind"]["ahead"] == 0
|
|
assert status["ahead_behind"]["behind"] == 0
|
|
assert status["locked_by_stream"] is False
|
|
assert status["locked_by_terminal"] is False
|
|
|
|
|
|
def test_worktree_status_reports_dirty_untracked_and_ahead(git_worktree):
|
|
from api.worktrees import worktree_status_for_session
|
|
|
|
repo, worktree = git_worktree
|
|
(worktree / "README.md").write_text("hello\nedited\n", encoding="utf-8")
|
|
(worktree / "scratch.txt").write_text("local-only\n", encoding="utf-8")
|
|
status = worktree_status_for_session(_session_for_worktree(repo, worktree))
|
|
|
|
assert status["dirty"] is True
|
|
assert status["untracked_count"] == 1
|
|
assert status["ahead_behind"]["ahead"] == 0
|
|
|
|
_git(worktree, "add", "README.md")
|
|
_git(worktree, "commit", "-m", "local change")
|
|
status = worktree_status_for_session(_session_for_worktree(repo, worktree))
|
|
|
|
assert status["dirty"] is True
|
|
assert status["untracked_count"] == 1
|
|
assert status["ahead_behind"]["available"] is True
|
|
assert status["ahead_behind"]["ahead"] == 1
|
|
assert status["ahead_behind"]["behind"] == 0
|
|
|
|
|
|
def test_worktree_status_handles_missing_path_without_git_mutation(tmp_path):
|
|
from api.worktrees import worktree_status_for_session
|
|
|
|
missing = tmp_path / "missing-worktree"
|
|
status = worktree_status_for_session(
|
|
SimpleNamespace(
|
|
session_id="missing",
|
|
worktree_path=str(missing),
|
|
worktree_repo_root=str(tmp_path / "repo"),
|
|
active_stream_id=None,
|
|
)
|
|
)
|
|
|
|
assert status["path"] == str(missing.resolve())
|
|
assert status["exists"] is False
|
|
assert status["dirty"] is False
|
|
assert status["untracked_count"] == 0
|
|
assert status["ahead_behind"]["ahead"] == 0
|
|
assert status["ahead_behind"]["behind"] == 0
|
|
|
|
|
|
def test_worktree_status_uses_live_stream_registry(git_worktree):
|
|
from api.config import STREAMS, STREAMS_LOCK
|
|
from api.worktrees import worktree_status_for_session
|
|
|
|
repo, worktree = git_worktree
|
|
session = _session_for_worktree(
|
|
repo,
|
|
worktree,
|
|
active_stream_id="live-stream",
|
|
)
|
|
|
|
with STREAMS_LOCK:
|
|
STREAMS["live-stream"] = object()
|
|
try:
|
|
assert worktree_status_for_session(session)["locked_by_stream"] is True
|
|
finally:
|
|
with STREAMS_LOCK:
|
|
STREAMS.pop("live-stream", None)
|
|
|
|
assert worktree_status_for_session(session)["locked_by_stream"] is False
|
|
|
|
|
|
def test_worktree_status_reports_live_terminal_lock(git_worktree, monkeypatch):
|
|
import api.terminal as terminal
|
|
from api.worktrees import worktree_status_for_session
|
|
|
|
repo, worktree = git_worktree
|
|
|
|
class FakeTerminal:
|
|
workspace = str(worktree.resolve())
|
|
|
|
def is_alive(self):
|
|
return True
|
|
|
|
monkeypatch.setattr(terminal, "get_terminal", lambda session_id: FakeTerminal())
|
|
|
|
status = worktree_status_for_session(_session_for_worktree(repo, worktree))
|
|
|
|
assert status["locked_by_terminal"] is True
|
|
|
|
|
|
def test_worktree_status_endpoint_returns_session_owned_status(git_worktree, monkeypatch):
|
|
import api.routes as routes
|
|
|
|
repo, worktree = git_worktree
|
|
session = _session_for_worktree(repo, worktree, session_id="route_wt")
|
|
session.save()
|
|
captured = {}
|
|
|
|
monkeypatch.setattr(
|
|
routes,
|
|
"j",
|
|
lambda handler, payload, status=200, extra_headers=None: captured.update(
|
|
payload=payload,
|
|
status=status,
|
|
) or True,
|
|
)
|
|
|
|
handled = routes.handle_get(
|
|
object(),
|
|
urlparse("/api/session/worktree/status?session_id=route_wt"),
|
|
)
|
|
|
|
assert handled is True
|
|
assert captured["status"] == 200
|
|
assert captured["payload"]["status"]["path"] == str(worktree.resolve())
|
|
assert captured["payload"]["status"]["exists"] is True
|
|
|
|
|
|
def test_worktree_status_endpoint_rejects_non_worktree_session(tmp_path, monkeypatch):
|
|
import api.routes as routes
|
|
|
|
workspace = tmp_path / "workspace"
|
|
workspace.mkdir()
|
|
session = Session(session_id="plain", workspace=str(workspace))
|
|
session.save()
|
|
captured = {}
|
|
|
|
monkeypatch.setattr(
|
|
routes,
|
|
"bad",
|
|
lambda handler, message, status=400: captured.update(
|
|
message=message,
|
|
status=status,
|
|
) or True,
|
|
)
|
|
|
|
handled = routes.handle_get(
|
|
object(),
|
|
urlparse("/api/session/worktree/status?session_id=plain"),
|
|
)
|
|
|
|
assert handled is True
|
|
assert captured["status"] == 400
|
|
assert "not worktree-backed" in captured["message"]
|