diff --git a/docs/guides/gpg-signing.md b/docs/guides/gpg-signing.md new file mode 100644 index 00000000..3074c031 --- /dev/null +++ b/docs/guides/gpg-signing.md @@ -0,0 +1,381 @@ +# Agent GPG Commit Signing + +This guide covers GPG commit signing for Flowspec agents, enabling cryptographic verification of agent-generated commits. + +## Overview + +Flowspec agents can generate their own GPG keys and sign commits to provide: +- **Cryptographic verification** of agent authorship +- **Non-repudiation** - proof that commits came from the agent +- **Trust chains** - link agent commits to their originating workflow +- **Audit trails** - track which agent made which changes + +## Quick Start + +### 1. Set Up GPG Signing + +```bash +# Generate agent GPG key and configure git +flowspec gpg setup + +# Output: +# ✓ GPG key generated +# ✓ Git configured for signing +# +# Fingerprint: A1B2C3D4E5F6A7B8C9D0E1F2A3B4C5D6E7F8A9B0 +``` + +### 2. Verify Status + +```bash +# Check GPG signing status +flowspec gpg status + +# Output: +# ┌─ Agent GPG Signing Status ─┐ +# │ Status Configured │ +# │ Fingerprint A1B2C3... │ +# │ Git Signing Enabled │ +# └─────────────────────────────┘ +``` + +### 3. Make Signed Commits + +Once configured, all commits in the repository will be automatically signed by the agent: + +```bash +git commit -m "feat: add new feature" +# Commit is automatically signed with agent's GPG key +``` + +## Commands + +### `flowspec gpg setup` + +Generate a new GPG key and configure git for commit signing. + +```bash +# Set up in current repository +flowspec gpg setup + +# Set up in specific repository +flowspec gpg setup --project-root /path/to/repo + +# Force key regeneration +flowspec gpg setup --force +``` + +**What it does:** +1. Generates a 4096-bit RSA GPG key for "Flowspec Agent " +2. Stores the key fingerprint in the system keyring +3. Configures local git settings: + - `user.signingkey`: agent's GPG fingerprint + - `commit.gpgsign`: true + +### `flowspec gpg status` + +Show current GPG signing configuration and status. + +```bash +# Check status in current repository +flowspec gpg status + +# Check status with detailed key information +flowspec gpg status --verbose + +# Check status in specific repository +flowspec gpg status --project-root /path/to/repo +``` + +**Output includes:** +- Configuration status (configured/not configured) +- GPG key fingerprint +- Git signing status (enabled/disabled) +- Key creation date (with `--verbose`) +- Key identity (with `--verbose`) + +### `flowspec gpg rotate` + +Rotate the agent's GPG key (delete old key, generate new key). + +```bash +# Rotate key (with confirmation prompt) +flowspec gpg rotate + +# Rotate key without confirmation +flowspec gpg rotate --yes + +# Rotate key for specific repository +flowspec gpg rotate --project-root /path/to/repo +``` + +**When to rotate:** +- Regular security practice (e.g., annually) +- Key compromise or suspected compromise +- Agent role change or re-provisioning +- Compliance requirements + +**Warning:** Rotating a key does not re-sign historical commits. Old commits will still reference the old key fingerprint. + +## Key Management + +### Key Storage + +- **GPG Keychain**: Keys are stored in the user's GPG keychain (`~/.gnupg/`) +- **Fingerprint**: The key fingerprint is stored in the system keyring for quick retrieval +- **Service Name**: `flowspec-agent-gpg` +- **Username**: `agent-key-fingerprint` + +### Key Properties + +``` +Name: Flowspec Agent +Email: agent@flowspec.local +Type: RSA +Length: 4096 bits +Expiration: None (does not expire) +``` + +### Security Considerations + +1. **No Passphrase**: Agent keys are generated without a passphrase for automated signing +2. **Local Storage**: Keys are stored locally on the agent's system +3. **Per-User**: Each user account has its own agent key +4. **Repository Config**: Git signing is configured per-repository (local config) + +## Git Configuration + +GPG signing modifies local git configuration: + +```bash +# View current configuration +git config --local user.signingkey +git config --local commit.gpgsign + +# Manual configuration (not recommended - use 'flowspec gpg setup') +git config --local user.signingkey A1B2C3D4E5F6A7B8C9D0E1F2A3B4C5D6E7F8A9B0 +git config --local commit.gpgsign true +``` + +## Telemetry Integration + +When GPG signing is active, the agent's key fingerprint is included in telemetry output: + +```bash +# View telemetry status with GPG info +flowspec telemetry status + +# Output includes: +# ┌─ Telemetry Status ──────────┐ +# │ Status Enabled │ +# │ Events 1,234 │ +# │ GPG Signing Active │ +# │ GPG Fingerprint A1B2C3... │ +# └──────────────────────────────┘ +``` + +## Verifying Signed Commits + +### On GitHub + +GitHub automatically verifies GPG-signed commits if the public key is uploaded: + +1. Export the agent's public key: + ```bash + gpg --armor --export agent@flowspec.local > agent-key.asc + ``` + +2. Add the key to GitHub: + - Go to Settings → SSH and GPG keys → New GPG key + - Paste the contents of `agent-key.asc` + +3. Signed commits will show a "Verified" badge + +### Locally + +```bash +# Verify a signed commit +git log --show-signature -1 + +# Output: +# commit abc123def456... +# gpg: Signature made Thu Apr 3 12:34:56 2025 UTC +# gpg: using RSA key A1B2C3D4E5F6A7B8C9D0... +# gpg: Good signature from "Flowspec Agent " +``` + +## Key Rotation Workflow + +Key rotation is important for security best practices: + +```bash +# 1. Rotate the key +flowspec gpg rotate --yes + +# Output: +# Old fingerprint: A1B2C3D4E5F6A7B8C9D0E1F2A3B4C5D6E7F8A9B0 +# New fingerprint: F0E9D8C7B6A5F4E3D2C1B0A9F8E7D6C5B4A3F2E1 + +# 2. Export new public key +gpg --armor --export agent@flowspec.local > agent-key-new.asc + +# 3. Update GitHub/GitLab with new key + +# 4. (Optional) Sign a rotation commit +git commit --allow-empty -m "chore: rotate agent GPG key + +Previous key: A1B2C3D4E5F6A7B8C9D0E1F2A3B4C5D6E7F8A9B0 +New key: F0E9D8C7B6A5F4E3D2C1B0A9F8E7D6C5B4A3F2E1 + +Signed-off-by: Flowspec Agent " +``` + +### Rotation Schedule Recommendations + +| Environment | Rotation Frequency | Rationale | +|-------------|-------------------|-----------| +| Development | Annually | Low risk, convenience | +| Staging | Quarterly | Moderate risk | +| Production | Monthly | High security requirements | +| Compromised | Immediately | Security incident | + +## Troubleshooting + +### GPG Not Found + +``` +Error: GPG not found. Please install gnupg. +``` + +**Solution:** Install GPG: +```bash +# Ubuntu/Debian +sudo apt install gnupg + +# macOS +brew install gnupg + +# Fedora/RHEL +sudo dnf install gnupg +``` + +### Git Not a Repository + +``` +Error: Not a git repository: /path/to/dir +``` + +**Solution:** Run `flowspec gpg setup` inside a git repository or specify `--project-root`: +```bash +cd /path/to/repo +flowspec gpg setup +``` + +### Commits Not Signed + +**Symptoms:** Commits don't show "Verified" badge + +**Diagnosis:** +```bash +# Check git config +git config --local commit.gpgsign +# Should output: true + +git config --local user.signingkey +# Should output: your fingerprint + +# Check GPG key exists +gpg --list-keys agent@flowspec.local +``` + +**Solution:** +```bash +# Reconfigure signing +flowspec gpg setup +``` + +### Keyring Access Error + +``` +Error: Failed to store fingerprint in keyring: ... +``` + +**Solution:** Ensure the system keyring is accessible. On Linux, you may need to install and configure a keyring backend: +```bash +sudo apt install gnome-keyring # Ubuntu/Debian +# or +sudo dnf install gnome-keyring # Fedora/RHEL +``` + +## Advanced Usage + +### Multiple Repositories + +Each repository requires its own `flowspec gpg setup` to enable signing. The same agent key is used across all repositories: + +```bash +# Repository A +cd /path/to/repo-a +flowspec gpg setup + +# Repository B +cd /path/to/repo-b +flowspec gpg setup + +# Both repos use the same agent key, but each has local git config +``` + +### Disable Signing for a Repository + +```bash +# Disable automatic signing +git config --local commit.gpgsign false + +# Or unset the config +git config --local --unset commit.gpgsign +git config --local --unset user.signingkey +``` + +### Export Public Key for Distribution + +```bash +# Export ASCII-armored public key +gpg --armor --export agent@flowspec.local > agent-public-key.asc + +# Export binary public key +gpg --export agent@flowspec.local > agent-public-key.gpg + +# Show fingerprint +gpg --fingerprint agent@flowspec.local +``` + +### Manual Key Deletion + +```bash +# Delete secret and public key +gpg --delete-secret-and-public-key agent@flowspec.local + +# Delete fingerprint from keyring +# (This is handled automatically by 'flowspec gpg rotate') +``` + +## Security Best Practices + +1. **Regular Rotation**: Rotate agent keys on a schedule appropriate for your security requirements +2. **Key Backup**: Consider backing up the private key to a secure location (encrypted) +3. **Audit Logs**: Monitor telemetry for signing activity +4. **Access Control**: Restrict access to the agent's keyring and GPG directory +5. **Verification**: Regularly verify that commits are being signed correctly +6. **Incident Response**: Have a plan for key rotation in case of compromise + +## Related Documentation + +- [Telemetry Guide](./telemetry-guide.md) - Telemetry integration with GPG fingerprints +- [Security Workflow](./security-workflow.md) - Security scanning and compliance +- [Git Hooks](../reference/git-hooks.md) - Custom hooks for commit signing + +## References + +- [GPG Manual](https://www.gnupg.org/documentation/manuals/gnupg/) +- [Git Signing Documentation](https://git-scm.com/book/en/v2/Git-Tools-Signing-Your-Work) +- [GitHub GPG Verification](https://docs.github.com/en/authentication/managing-commit-signature-verification) diff --git a/src/flowspec_cli/doctor.py b/src/flowspec_cli/doctor.py new file mode 100644 index 00000000..8552f7b7 --- /dev/null +++ b/src/flowspec_cli/doctor.py @@ -0,0 +1,487 @@ +"""Health check and diagnostics for flowspec setup. + +This module provides the `flowspec doctor` command to verify that the environment +is properly configured for flowspec development. + +Checks performed: +- flowspec CLI version (currently installed) +- Python version compatibility (requires 3.11+) +- Required tools installed (backlog.md, beads) +- Workflow configuration present and valid +- Agent files using correct naming convention (dot notation) +- Constitution file present (if configured) + +Example: + >>> from flowspec_cli.doctor import run_doctor, CheckStatus + >>> results = run_doctor(fix=False) + >>> failures = [r for r in results if r.status != CheckStatus.PASS] + >>> if not failures: + ... print("All checks passed!") +""" + +import shutil +import subprocess +import sys +from dataclasses import dataclass +from enum import Enum +from pathlib import Path +from typing import Any + +import yaml +from rich.console import Console +from rich.table import Table + +from flowspec_cli.workflow.validator import validate_workflow + +console = Console() + + +class CheckStatus(Enum): + """Status of a health check.""" + + PASS = "pass" + WARN = "warn" + FAIL = "fail" + + +@dataclass +class CheckResult: + """Result of a single health check. + + Attributes: + name: Display name of the check + status: Pass/warn/fail status + message: Human-readable message about the check result + fix_command: Optional command to fix the issue (for auto-fixable issues) + details: Additional context or details + """ + + name: str + status: CheckStatus + message: str + fix_command: str | None = None + details: dict[str, Any] | None = None + + +def get_flowspec_version() -> str: + """Get the currently installed flowspec version. + + Returns: + Version string (e.g., "0.4.004") or "unknown" + """ + try: + # Import version from package metadata + from importlib.metadata import version + + return version("flowspec-cli") + except Exception: + return "unknown" + + +def check_flowspec_version() -> CheckResult: + """Check flowspec CLI version. + + Returns: + CheckResult indicating version status + """ + current = get_flowspec_version() + if current == "unknown": + return CheckResult( + name="flowspec CLI", + status=CheckStatus.WARN, + message="Could not determine flowspec version (source checkout?)", + ) + + # Check against PyPI for latest version + try: + import httpx + + resp = httpx.get("https://pypi.org/pypi/flowspec-cli/json", timeout=5) + if resp.status_code == 200: + latest = resp.json()["info"]["version"] + if latest != current: + return CheckResult( + name="flowspec CLI", + status=CheckStatus.WARN, + message=f"v{current} installed, v{latest} available", + fix_command="pip install --upgrade flowspec-cli", + ) + except Exception: + pass + + return CheckResult( + name="flowspec CLI", + status=CheckStatus.PASS, + message=f"v{current} (up to date)", + ) + + +def check_python_version() -> CheckResult: + """Check Python version compatibility (requires 3.11+). + + Returns: + CheckResult indicating Python version status + """ + version_info = sys.version_info + major, minor = version_info[:2] + micro = version_info[2] if len(version_info) > 2 else 0 + version_str = f"{major}.{minor}.{micro}" + + if major < 3 or (major == 3 and minor < 11): + return CheckResult( + name="Python version", + status=CheckStatus.FAIL, + message=f"Python {version_str} (requires 3.11+)", + ) + + return CheckResult( + name="Python version", + status=CheckStatus.PASS, + message=f"Python {version_str}", + ) + + +def check_tool_installed(tool_name: str) -> CheckResult: + """Check if a CLI tool is installed and accessible. + + Args: + tool_name: Name of the tool to check (e.g., "backlog", "beads") + + Returns: + CheckResult indicating tool installation status + """ + tool_path = shutil.which(tool_name) + if tool_path is None: + return CheckResult( + name=f"{tool_name} CLI", + status=CheckStatus.FAIL, + message=f"{tool_name} not found in PATH", + fix_command=f"Install {tool_name} following the installation guide", + ) + + # Try to get version if possible + try: + result = subprocess.run( + [tool_name, "--version"], + capture_output=True, + text=True, + timeout=5, + ) + version_output = result.stdout.strip() or result.stderr.strip() + # Extract just the version number if present + version = version_output.split()[-1] if version_output else "installed" + except Exception: + version = "installed" + + return CheckResult( + name=f"{tool_name} CLI", + status=CheckStatus.PASS, + message=version, + ) + + +def check_workflow_config() -> CheckResult: + """Check if workflow configuration exists and is valid. + + Returns: + CheckResult indicating workflow configuration status + """ + project_root = Path.cwd() + + # Check for workflow config file + config_paths = [ + project_root / "flowspec_workflow.yml", + project_root / ".flowspec" / "workflow.yml", + ] + + config_path = None + for path in config_paths: + if path.exists(): + config_path = path + break + + if config_path is None: + return CheckResult( + name="Workflow config", + status=CheckStatus.WARN, + message="No workflow configuration found", + fix_command="Run: /flow:init", + ) + + # Try to load and validate the config + try: + with open(config_path) as f: + config_data = yaml.safe_load(f) + + # Validate using the workflow validator + validation_result = validate_workflow(config_data) + + if not validation_result.is_valid: + error_count = len(validation_result.errors) + warning_count = len(validation_result.warnings) + details = { + "errors": [str(e) for e in validation_result.errors], + "warnings": [str(w) for w in validation_result.warnings], + } + return CheckResult( + name="Workflow config", + status=CheckStatus.FAIL, + message=f"Invalid ({error_count} errors, {warning_count} warnings)", + fix_command="Run: flowspec workflow validate --verbose", + details=details, + ) + + # Valid config + warning_count = len(validation_result.warnings) + if warning_count > 0: + return CheckResult( + name="Workflow config", + status=CheckStatus.WARN, + message=f"Valid ({warning_count} warnings)", + details={"warnings": [str(w) for w in validation_result.warnings]}, + ) + + return CheckResult( + name="Workflow config", + status=CheckStatus.PASS, + message="Valid", + ) + + except yaml.YAMLError as e: + return CheckResult( + name="Workflow config", + status=CheckStatus.FAIL, + message=f"Invalid YAML: {e}", + ) + except Exception as e: + return CheckResult( + name="Workflow config", + status=CheckStatus.FAIL, + message=f"Error reading config: {e}", + ) + + +def check_agent_files() -> CheckResult: + """Check if agent files use correct naming convention. + + Only checks .github/agents/ for flow.*.agent.md vs flow-*.agent.md patterns. + Note: .claude/agents/ intentionally uses hyphens and is not checked. + + Returns: + CheckResult indicating agent file naming status + """ + project_root = Path.cwd() + github_agents_dir = project_root / ".github" / "agents" + + old_convention_files = [] + + if github_agents_dir.exists(): + for agent_file in github_agents_dir.glob("*.md"): + stem = agent_file.stem + # Skip README and underscore-prefixed files + if stem.startswith("_") or agent_file.name == "README.md": + continue + # Hyphenated names are old convention; dot-separated are new + if "-" in stem: + old_convention_files.append(str(agent_file.relative_to(project_root))) + + if old_convention_files: + return CheckResult( + name="Agent file naming", + status=CheckStatus.WARN, + message=f"{len(old_convention_files)} files using old hyphen naming", + fix_command="Run: flowspec upgrade-repo", + details={"files": old_convention_files}, + ) + + # Check if there are any agent files at all in .github/agents/ + has_agents = github_agents_dir.exists() and any( + f.suffix == ".md" + for f in github_agents_dir.glob("*.md") + if not f.name.startswith("_") and f.name != "README.md" + ) + + if not has_agents: + return CheckResult( + name="Agent file naming", + status=CheckStatus.WARN, + message="No agent files found in .github/agents/", + ) + + return CheckResult( + name="Agent file naming", + status=CheckStatus.PASS, + message="Using flow. prefix convention", + ) + + +def check_constitution() -> CheckResult: + """Check if constitution file exists. + + Returns: + CheckResult indicating constitution file status + """ + project_root = Path.cwd() + constitution_path = project_root / "memory" / "constitution.md" + + if not constitution_path.exists(): + return CheckResult( + name="Constitution", + status=CheckStatus.WARN, + message="Constitution not found", + fix_command="Run: /flow:init", + ) + + # Check if it's just the template (contains placeholders) + try: + content = constitution_path.read_text() + if "[PROJECT_NAME]" in content or "[PRINCIPLE_1_NAME]" in content: + return CheckResult( + name="Constitution", + status=CheckStatus.WARN, + message="Contains placeholders (not configured)", + fix_command="Run: /flow:init", + ) + except Exception: + pass + + return CheckResult( + name="Constitution", + status=CheckStatus.PASS, + message="Present", + ) + + +def run_all_checks() -> list[CheckResult]: + """Run all health checks. + + Returns: + List of CheckResult objects for all checks performed + """ + checks = [ + check_flowspec_version(), + check_python_version(), + check_tool_installed("backlog"), + check_tool_installed("beads"), + check_workflow_config(), + check_agent_files(), + check_constitution(), + ] + return checks + + +def display_results(results: list[CheckResult], verbose: bool = False) -> None: + """Display check results in a formatted table. + + Args: + results: List of CheckResult objects to display + verbose: If True, show additional details + """ + table = Table(show_header=False, box=None, padding=(0, 1)) + table.add_column("Status", width=3) + table.add_column("Check", style="bold") + table.add_column("Details") + + for result in results: + # Choose symbol and color based on status + if result.status == CheckStatus.PASS: + symbol = "[green]✅[/green]" + elif result.status == CheckStatus.WARN: + symbol = "[yellow]⚠️[/yellow]" + else: + symbol = "[red]❌[/red]" + + table.add_row(symbol, result.name, result.message) + + # Show fix commands for issues + if result.fix_command and result.status != CheckStatus.PASS: + table.add_row( + "", + "", + f"[dim cyan]→ Fix: {result.fix_command}[/dim cyan]", + ) + + # Show details in verbose mode + if verbose and result.details: + for key, value in result.details.items(): + if isinstance(value, list): + for item in value[:3]: # Show first 3 items + table.add_row( + "", + "", + f"[dim] {key}: {item}[/dim]", + ) + if len(value) > 3: + table.add_row( + "", + "", + f"[dim] ... and {len(value) - 3} more[/dim]", + ) + else: + table.add_row( + "", + "", + f"[dim] {key}: {value}[/dim]", + ) + + console.print() + console.print(table) + console.print() + + +def run_doctor(fix: bool = False, verbose: bool = False) -> list[CheckResult]: + """Run health checks and optionally attempt fixes. + + Args: + fix: If True, attempt to auto-fix fixable issues + verbose: If True, show additional details + + Returns: + List of CheckResult objects. Filter for failures: + ``issues = [r for r in results if r.status == CheckStatus.FAIL]`` + """ + console.print() + console.print("[bold]flowspec doctor[/bold]") + console.print() + + results = run_all_checks() + + # Display results + display_results(results, verbose=verbose) + + # Count issues + errors = sum(1 for r in results if r.status == CheckStatus.FAIL) + warnings = sum(1 for r in results if r.status == CheckStatus.WARN) + + # Summary + if errors == 0 and warnings == 0: + console.print("[green]✓ All checks passed![/green]") + else: + summary_parts = [] + if errors > 0: + summary_parts.append(f"[red]{errors} error(s)[/red]") + if warnings > 0: + summary_parts.append(f"[yellow]{warnings} warning(s)[/yellow]") + console.print(" ".join(summary_parts)) + + # Show suggestion to run with --fix if there are fixable issues + if not fix: + fixable = sum(1 for r in results if r.fix_command is not None) + if fixable > 0: + console.print() + console.print( + f"[dim]Hint: Run 'flowspec doctor --fix' to attempt " + f"automatic fixes ({fixable} fixable)[/dim]" + ) + + console.print() + + # Auto-fix handling (not implemented yet - would require complex logic) + if fix: + console.print( + "[yellow]Note: Auto-fix is not yet implemented. " + "Please run the suggested fix commands manually.[/yellow]" + ) + console.print() + + return results diff --git a/src/flowspec_cli/gpg_cli.py b/src/flowspec_cli/gpg_cli.py new file mode 100644 index 00000000..13968c1b --- /dev/null +++ b/src/flowspec_cli/gpg_cli.py @@ -0,0 +1,306 @@ +"""CLI commands for GPG signing management. + +This module provides user-facing commands for: +- Setting up agent GPG keys +- Viewing GPG key status +- Rotating keys +""" + +from __future__ import annotations + +from pathlib import Path +from typing import Optional + +import typer +from rich.console import Console +from rich.panel import Panel +from rich.table import Table + +from .signing import ( + configure_git_signing, + delete_agent_key, + generate_agent_key, + get_key_fingerprint, + get_key_info, + is_git_signing_enabled, + key_exists, + GPGConfigurationError, + GPGError, + GPGKeyGenerationError, +) + +gpg_app = typer.Typer( + name="gpg", + help="Manage agent GPG commit signing", + add_completion=False, +) + +console = Console() + + +@gpg_app.command("setup") +def setup_command( + project_root: Optional[str] = typer.Option( + None, + "--project-root", + "-p", + help="Project directory to configure (default: current directory)", + ), + force: bool = typer.Option( + False, + "--force", + "-f", + help="Force key regeneration if key already exists", + ), +) -> None: + """Set up agent GPG commit signing. + + This command: + 1. Generates a new GPG key for the Flowspec agent (if needed) + 2. Configures git to sign commits with the agent key + 3. Displays the key fingerprint + + The key fingerprint is stored in the system keyring and can be + included in telemetry output. + + Examples: + flowspec gpg setup + flowspec gpg setup --project-root /path/to/repo + flowspec gpg setup --force # Regenerate key + """ + root = Path(project_root) if project_root else Path.cwd() + + try: + # Check if key exists + if key_exists() and not force: + console.print("[yellow]Agent GPG key already exists.[/yellow]") + console.print("[dim]Use --force to regenerate the key.[/dim]") + fingerprint = get_key_fingerprint() + else: + if force and key_exists(): + console.print("[yellow]Regenerating agent GPG key...[/yellow]") + delete_agent_key() + + # Generate new key + console.print("[cyan]Generating agent GPG key...[/cyan]") + fingerprint = generate_agent_key() + console.print("[green]✓[/green] GPG key generated") + + # Configure git + console.print("[cyan]Configuring git for commit signing...[/cyan]") + configure_git_signing(project_root=root) + console.print("[green]✓[/green] Git configured for signing") + + # Display summary + console.print() + panel_content = f"""[bold]Agent GPG Setup Complete[/bold] + +[cyan]Fingerprint:[/cyan] {fingerprint} + +All commits in this repository will now be signed by the agent. + +[dim]View status:[/dim] flowspec gpg status +[dim]Include in telemetry:[/dim] fingerprint is automatically included when signing is active +""" + console.print( + Panel( + panel_content, + title="✓ GPG Signing Enabled", + border_style="green", + padding=(1, 2), + ) + ) + + except GPGKeyGenerationError as e: + console.print(f"[red]✗ Key generation failed:[/red] {e}") + raise typer.Exit(1) + except GPGConfigurationError as e: + console.print(f"[red]✗ Git configuration failed:[/red] {e}") + raise typer.Exit(1) + except GPGError as e: + console.print(f"[red]✗ GPG error:[/red] {e}") + raise typer.Exit(1) + except Exception as e: + console.print(f"[red]✗ Unexpected error:[/red] {e}") + raise typer.Exit(1) + + +@gpg_app.command("status") +def status_command( + project_root: Optional[str] = typer.Option( + None, + "--project-root", + "-p", + help="Project directory to check (default: current directory)", + ), + verbose: bool = typer.Option( + False, + "--verbose", + "-v", + help="Show detailed key information", + ), +) -> None: + """Show agent GPG signing status. + + Displays: + - Whether an agent GPG key exists + - Key fingerprint and details + - Git signing configuration status + + Examples: + flowspec gpg status + flowspec gpg status --verbose + flowspec gpg status --project-root /path/to/repo + """ + root = Path(project_root) if project_root else Path.cwd() + + # Build status table + table = Table(title="Agent GPG Signing Status", show_header=False, box=None) + table.add_column("Setting", style="cyan") + table.add_column("Value") + + # Check if key exists + if not key_exists(): + table.add_row("Status", "[red]Not configured[/red]") + console.print(table) + console.print() + console.print( + "[dim]Run 'flowspec gpg setup' to enable agent commit signing.[/dim]" + ) + return + + # Get key information + fingerprint = get_key_fingerprint() + key_info = get_key_info() + + table.add_row("Status", "[green]Configured[/green]") + table.add_row("Fingerprint", fingerprint or "[dim]unknown[/dim]") + + if verbose and key_info: + if uid := key_info.get("uid"): + table.add_row("Identity", uid) + if created := key_info.get("created"): + # Convert Unix timestamp to readable date + try: + from datetime import datetime + + dt = datetime.fromtimestamp(int(created)) + table.add_row("Created", dt.strftime("%Y-%m-%d %H:%M:%S")) + except (ValueError, OSError): + table.add_row("Created", created) + + # Check git configuration + git_enabled = is_git_signing_enabled(project_root=root) + if git_enabled: + table.add_row("Git Signing", "[green]Enabled[/green]") + else: + table.add_row("Git Signing", "[yellow]Not enabled in this repo[/yellow]") + + console.print(table) + + # Show next steps if git not configured + if not git_enabled: + console.print() + console.print( + "[yellow]Git commit signing is not enabled in this repository.[/yellow]" + ) + console.print( + "[dim]Run 'flowspec gpg setup' to configure this repository.[/dim]" + ) + + +@gpg_app.command("rotate") +def rotate_command( + project_root: Optional[str] = typer.Option( + None, + "--project-root", + "-p", + help="Project directory to reconfigure (default: current directory)", + ), + yes: bool = typer.Option( + False, + "--yes", + "-y", + help="Skip confirmation prompt", + ), +) -> None: + """Rotate the agent GPG key. + + This command: + 1. Deletes the existing agent GPG key + 2. Generates a new key + 3. Updates git configuration + + Key rotation is useful for security best practices or if a key is compromised. + + Examples: + flowspec gpg rotate + flowspec gpg rotate --yes # Skip confirmation + """ + root = Path(project_root) if project_root else Path.cwd() + + if not key_exists(): + console.print("[yellow]No agent GPG key found. Nothing to rotate.[/yellow]") + console.print("[dim]Run 'flowspec gpg setup' to create a new key.[/dim]") + return + + # Get old fingerprint for display + old_fingerprint = get_key_fingerprint() + + if not yes: + console.print( + "[yellow]⚠ This will delete the current agent GPG key and generate a new one.[/yellow]" + ) + console.print(f"[dim]Current fingerprint: {old_fingerprint}[/dim]") + console.print() + confirm = typer.confirm("Do you want to continue?", default=False) + if not confirm: + console.print("[dim]Cancelled.[/dim]") + raise typer.Exit(0) + + try: + # Delete old key + console.print("[cyan]Deleting old key...[/cyan]") + delete_agent_key() + console.print("[green]✓[/green] Old key deleted") + + # Generate new key + console.print("[cyan]Generating new key...[/cyan]") + new_fingerprint = generate_agent_key() + console.print("[green]✓[/green] New key generated") + + # Reconfigure git + console.print("[cyan]Updating git configuration...[/cyan]") + configure_git_signing(project_root=root) + console.print("[green]✓[/green] Git configuration updated") + + # Display summary + console.print() + panel_content = f"""[bold]Key Rotation Complete[/bold] + +[dim]Old fingerprint:[/dim] +{old_fingerprint} + +[cyan]New fingerprint:[/cyan] +{new_fingerprint} + +[dim]All future commits will be signed with the new key.[/dim] +""" + console.print( + Panel( + panel_content, + title="✓ Key Rotated", + border_style="green", + padding=(1, 2), + ) + ) + + except GPGError as e: + console.print(f"[red]✗ Key rotation failed:[/red] {e}") + raise typer.Exit(1) + except Exception as e: + console.print(f"[red]✗ Unexpected error:[/red] {e}") + raise typer.Exit(1) + + +__all__ = ["gpg_app"] diff --git a/src/flowspec_cli/signing.py b/src/flowspec_cli/signing.py new file mode 100644 index 00000000..dd1b2623 --- /dev/null +++ b/src/flowspec_cli/signing.py @@ -0,0 +1,332 @@ +"""GPG signing module for Flowspec agents. + +This module provides GPG key generation and management for agent commit signing. +Agents can generate their own GPG keys, configure git to use them, and include +the key fingerprint in telemetry output. +""" + +from __future__ import annotations + +import subprocess +from pathlib import Path +from typing import Optional + +import keyring + +# Constants +KEYRING_SERVICE = "flowspec-agent-gpg" +KEYRING_USERNAME = "agent-key-fingerprint" +GPG_KEY_NAME = "Flowspec Agent" +GPG_KEY_EMAIL = "agent@flowspec.local" + + +class GPGError(Exception): + """Base exception for GPG-related errors.""" + + pass + + +class GPGKeyGenerationError(GPGError): + """Raised when GPG key generation fails.""" + + pass + + +class GPGConfigurationError(GPGError): + """Raised when git configuration for GPG signing fails.""" + + pass + + +def _run_gpg_command( + args: list[str], input_data: Optional[str] = None +) -> tuple[str, str, int]: + """Run a GPG command and return stdout, stderr, and return code. + + Args: + args: Command arguments (excluding 'gpg' itself) + input_data: Optional input to pass via stdin + + Returns: + Tuple of (stdout, stderr, returncode) + """ + cmd = ["gpg"] + args + try: + result = subprocess.run( + cmd, + input=input_data, + text=True, + capture_output=True, + timeout=30, + ) + return result.stdout, result.stderr, result.returncode + except subprocess.TimeoutExpired: + raise GPGError("GPG command timed out") + except FileNotFoundError: + raise GPGError("GPG not found. Please install gnupg.") + + +def _run_git_command( + args: list[str], cwd: Optional[Path] = None +) -> tuple[str, str, int]: + """Run a git command and return stdout, stderr, and return code. + + Args: + args: Command arguments (excluding 'git' itself) + cwd: Optional working directory + + Returns: + Tuple of (stdout, stderr, returncode) + """ + cmd = ["git"] + args + try: + result = subprocess.run( + cmd, + cwd=cwd, + text=True, + capture_output=True, + timeout=10, + ) + return result.stdout, result.stderr, result.returncode + except subprocess.TimeoutExpired: + raise GPGConfigurationError("Git command timed out") + except FileNotFoundError: + raise GPGConfigurationError("Git not found. Please install git.") + + +def generate_agent_key() -> str: + """Generate a new GPG key for the Flowspec agent. + + Creates an RSA 4096-bit key with no expiration for: + - Name: "Flowspec Agent" + - Email: "agent@flowspec.local" + + The key fingerprint is stored in the system keyring for later retrieval. + + Returns: + The GPG key fingerprint (40-character hex string) + + Raises: + GPGKeyGenerationError: If key generation fails + """ + # Check if key already exists + existing = get_key_fingerprint() + if existing: + return existing + + # Generate key using batch mode + key_params = f""" +%no-protection +Key-Type: RSA +Key-Length: 4096 +Name-Real: {GPG_KEY_NAME} +Name-Email: {GPG_KEY_EMAIL} +Expire-Date: 0 +%commit +""".strip() + + stdout, stderr, returncode = _run_gpg_command( + ["--batch", "--status-fd", "1", "--generate-key"], input_data=key_params + ) + + if returncode != 0: + raise GPGKeyGenerationError(f"Failed to generate GPG key: {stderr}") + + # Extract fingerprint from the key-creation status output. + # Expected format: [GNUPG:] KEY_CREATED + fingerprint = None + for line in stdout.splitlines(): + if line.startswith("[GNUPG:] KEY_CREATED "): + parts = line.split() + if len(parts) >= 4: + fingerprint = parts[3] + break + + if not fingerprint: + raise GPGKeyGenerationError("Failed to extract fingerprint from generated key") + + # Store fingerprint in keyring + try: + keyring.set_password(KEYRING_SERVICE, KEYRING_USERNAME, fingerprint) + except Exception as e: + raise GPGKeyGenerationError(f"Failed to store fingerprint in keyring: {e}") + + return fingerprint + + +def configure_git_signing(project_root: Optional[Path] = None) -> None: + """Configure git to sign commits with the agent's GPG key. + + Sets the following local git config options: + - user.signingkey: The agent's GPG key fingerprint + - commit.gpgsign: true (enable automatic commit signing) + + Args: + project_root: Project directory for local git config (default: current directory) + + Raises: + GPGConfigurationError: If git configuration fails + GPGError: If no agent key exists (call generate_agent_key() first) + """ + fingerprint = get_key_fingerprint() + if not fingerprint: + raise GPGError("No agent GPG key found. Run generate_agent_key() first.") + + # Verify the secret key is still present in the GPG keyring + _, _, returncode = _run_gpg_command(["--list-secret-keys", GPG_KEY_EMAIL]) + if returncode != 0: + raise GPGError( + "Agent GPG secret key not found in keyring. Regenerate with generate_agent_key()." + ) + + cwd = project_root if project_root else Path.cwd() + + # Check if we're in a git repository + _, _, returncode = _run_git_command(["rev-parse", "--git-dir"], cwd=cwd) + if returncode != 0: + raise GPGConfigurationError(f"Not a git repository: {cwd}") + + # Set signing key + _, stderr, returncode = _run_git_command( + ["config", "--local", "user.signingkey", fingerprint], cwd=cwd + ) + if returncode != 0: + raise GPGConfigurationError(f"Failed to set user.signingkey: {stderr}") + + # Enable commit signing + _, stderr, returncode = _run_git_command( + ["config", "--local", "commit.gpgsign", "true"], cwd=cwd + ) + if returncode != 0: + raise GPGConfigurationError(f"Failed to set commit.gpgsign: {stderr}") + + +def get_key_fingerprint() -> Optional[str]: + """Retrieve the stored GPG key fingerprint from keyring. + + Returns: + The 40-character hex fingerprint, or None if not found + """ + try: + fingerprint = keyring.get_password(KEYRING_SERVICE, KEYRING_USERNAME) + return fingerprint + except Exception: + return None + + +def key_exists() -> bool: + """Check if an agent GPG key already exists. + + Returns: + True if a key fingerprint is stored in the keyring, False otherwise + """ + return get_key_fingerprint() is not None + + +def get_key_info() -> Optional[dict[str, str]]: + """Get detailed information about the agent's GPG key. + + Returns: + Dictionary with key details (fingerprint, uid, creation date), or None if no key exists + """ + fingerprint = get_key_fingerprint() + if not fingerprint: + return None + + stdout, stderr, returncode = _run_gpg_command( + ["--list-keys", "--with-colons", fingerprint] + ) + + if returncode != 0: + return None + + # Parse key information from colon-separated output + key_info = {"fingerprint": fingerprint} + + for line in stdout.splitlines(): + parts = line.split(":") + if line.startswith("pub:"): + # pub:u:4096:1:KEYID:CREATION:EXPIRATION + if len(parts) >= 6: + key_info["created"] = parts[5] + elif line.startswith("uid:"): + # uid:u::::CREATION::UID + if len(parts) >= 10: + key_info["uid"] = parts[9] + + return key_info + + +def is_git_signing_enabled(project_root: Optional[Path] = None) -> bool: + """Check if GPG commit signing is enabled in the git repository. + + Args: + project_root: Project directory to check (default: current directory) + + Returns: + True if commit.gpgsign is true and user.signingkey is set, False otherwise + """ + cwd = project_root if project_root else Path.cwd() + + # Check if we're in a git repository + _, _, returncode = _run_git_command(["rev-parse", "--git-dir"], cwd=cwd) + if returncode != 0: + return False + + # Check commit.gpgsign + stdout, _, returncode = _run_git_command( + ["config", "--local", "commit.gpgsign"], cwd=cwd + ) + if returncode != 0 or stdout.strip().lower() != "true": + return False + + # Check user.signingkey + stdout, _, returncode = _run_git_command( + ["config", "--local", "user.signingkey"], cwd=cwd + ) + if returncode != 0 or not stdout.strip(): + return False + + return True + + +def delete_agent_key() -> None: + """Delete the agent's GPG key from the keyring and GPG keychain. + + This is useful for key rotation. After deletion, call generate_agent_key() + to create a new key. + + Raises: + GPGError: If key deletion fails + """ + fingerprint = get_key_fingerprint() + if not fingerprint: + return # No key to delete + + # Delete from GPG keychain + _, stderr, returncode = _run_gpg_command( + ["--batch", "--yes", "--delete-secret-and-public-key", fingerprint] + ) + + if returncode != 0: + raise GPGError(f"Failed to delete GPG key: {stderr}") + + # Delete from keyring + try: + keyring.delete_password(KEYRING_SERVICE, KEYRING_USERNAME) + except Exception as e: + raise GPGError(f"Failed to delete fingerprint from keyring: {e}") + + +__all__ = [ + "generate_agent_key", + "configure_git_signing", + "get_key_fingerprint", + "key_exists", + "get_key_info", + "is_git_signing_enabled", + "delete_agent_key", + "GPGError", + "GPGKeyGenerationError", + "GPGConfigurationError", +] diff --git a/tests/test_doctor.py b/tests/test_doctor.py new file mode 100644 index 00000000..248de80d --- /dev/null +++ b/tests/test_doctor.py @@ -0,0 +1,286 @@ +"""Tests for flowspec doctor health check command.""" + +from unittest.mock import MagicMock, patch +import yaml + +from flowspec_cli.doctor import ( + CheckResult, + CheckStatus, + check_agent_files, + check_constitution, + check_flowspec_version, + check_python_version, + check_tool_installed, + check_workflow_config, + run_all_checks, +) + + +class TestCheckFlowspecVersion: + """Tests for flowspec version check.""" + + def test_returns_version(self): + """Should return current flowspec version.""" + result = check_flowspec_version() + assert result.name == "flowspec CLI" + assert result.status == CheckStatus.PASS + assert "v" in result.message or result.message == "unknown" + + +class TestCheckPythonVersion: + """Tests for Python version check.""" + + def test_compatible_version(self): + """Should pass for Python 3.11+.""" + # Current test is running on 3.11+ since that's a requirement + result = check_python_version() + assert result.name == "Python version" + assert result.status == CheckStatus.PASS + assert "Python" in result.message + + @patch("flowspec_cli.doctor.sys.version_info", (3, 10, 0)) + def test_incompatible_version(self): + """Should fail for Python < 3.11.""" + result = check_python_version() + assert result.status == CheckStatus.FAIL + assert "3.10" in result.message + assert "requires 3.11+" in result.message + + +class TestCheckToolInstalled: + """Tests for tool installation checks.""" + + @patch("flowspec_cli.doctor.shutil.which") + def test_tool_not_found(self, mock_which): + """Should fail when tool not in PATH.""" + mock_which.return_value = None + result = check_tool_installed("nonexistent-tool") + assert result.status == CheckStatus.FAIL + assert "not found in PATH" in result.message + assert result.fix_command is not None + + @patch("flowspec_cli.doctor.shutil.which") + @patch("flowspec_cli.doctor.subprocess.run") + def test_tool_found_with_version(self, mock_run, mock_which): + """Should pass when tool is found and can get version.""" + mock_which.return_value = "/usr/bin/backlog" + mock_run.return_value = MagicMock(stdout="backlog 1.28.1\n", stderr="") + result = check_tool_installed("backlog") + assert result.status == CheckStatus.PASS + assert "1.28.1" in result.message + + @patch("flowspec_cli.doctor.shutil.which") + @patch("flowspec_cli.doctor.subprocess.run") + def test_tool_found_version_fails(self, mock_run, mock_which): + """Should still pass if tool found but version check fails.""" + mock_which.return_value = "/usr/bin/tool" + mock_run.side_effect = Exception("Version check failed") + result = check_tool_installed("tool") + assert result.status == CheckStatus.PASS + assert "installed" in result.message + + +class TestCheckWorkflowConfig: + """Tests for workflow configuration check.""" + + def test_no_config_file(self, tmp_path, monkeypatch): + """Should warn when no workflow config exists.""" + monkeypatch.chdir(tmp_path) + result = check_workflow_config() + assert result.status == CheckStatus.WARN + assert "No workflow configuration found" in result.message + assert result.fix_command == "Run: /flow:init" + + def test_valid_config(self, tmp_path, monkeypatch): + """Should pass for valid workflow config.""" + monkeypatch.chdir(tmp_path) + config_path = tmp_path / "flowspec_workflow.yml" + valid_config = { + "states": ["To Do", "Done"], + "workflows": { + "complete": { + "input_states": ["To Do"], + "output_state": "Done", + } + }, + "transitions": [{"from": "To Do", "to": "Done", "via": "complete"}], + } + config_path.write_text(yaml.dump(valid_config)) + + result = check_workflow_config() + assert result.status == CheckStatus.PASS + assert "Valid" in result.message + + def test_invalid_yaml(self, tmp_path, monkeypatch): + """Should fail for invalid YAML.""" + monkeypatch.chdir(tmp_path) + config_path = tmp_path / "flowspec_workflow.yml" + config_path.write_text("invalid: yaml: content: [") + + result = check_workflow_config() + assert result.status == CheckStatus.FAIL + assert "Invalid YAML" in result.message + + def test_config_with_errors(self, tmp_path, monkeypatch): + """Should fail for config with validation errors.""" + monkeypatch.chdir(tmp_path) + config_path = tmp_path / "flowspec_workflow.yml" + # Config missing initial state "To Do" + invalid_config = { + "states": ["In Progress", "Done"], + "workflows": {}, + "transitions": [], + } + config_path.write_text(yaml.dump(invalid_config)) + + result = check_workflow_config() + assert result.status == CheckStatus.FAIL + assert "Invalid" in result.message + assert result.details is not None + assert "errors" in result.details + + def test_config_with_warnings(self, tmp_path, monkeypatch): + """Should warn for config with validation warnings only.""" + monkeypatch.chdir(tmp_path) + config_path = tmp_path / "flowspec_workflow.yml" + # Config with warnings but no errors + config_with_warnings = { + "states": ["To Do", "In Progress"], # No terminal states (warning) + "workflows": { + "start": { + "input_states": ["To Do"], + "output_state": "In Progress", + } + }, + "transitions": [{"from": "To Do", "to": "In Progress", "via": "start"}], + } + config_path.write_text(yaml.dump(config_with_warnings)) + + result = check_workflow_config() + # Should pass with warnings (warnings don't make config invalid) + assert result.status in (CheckStatus.PASS, CheckStatus.WARN) + + +class TestCheckAgentFiles: + """Tests for agent file naming check.""" + + def test_no_agent_files(self, tmp_path, monkeypatch): + """Should warn when no agent files found.""" + monkeypatch.chdir(tmp_path) + result = check_agent_files() + assert result.status == CheckStatus.WARN + assert "No agent files found" in result.message + + def test_old_hyphen_convention(self, tmp_path, monkeypatch): + """Should warn for files using old hyphen naming.""" + monkeypatch.chdir(tmp_path) + agents_dir = tmp_path / ".github" / "agents" + agents_dir.mkdir(parents=True) + (agents_dir / "backend-engineer.md").write_text("# Backend Engineer") + (agents_dir / "frontend-engineer.md").write_text("# Frontend Engineer") + + result = check_agent_files() + assert result.status == CheckStatus.WARN + assert "old hyphen naming" in result.message + assert result.fix_command == "Run: flowspec upgrade-repo" + assert result.details is not None + assert len(result.details["files"]) == 2 + + def test_new_dot_convention(self, tmp_path, monkeypatch): + """Should pass for files using new dot naming in .github/agents/.""" + monkeypatch.chdir(tmp_path) + agents_dir = tmp_path / ".github" / "agents" + agents_dir.mkdir(parents=True) + (agents_dir / "flow.assess.agent.md").write_text("# Assess") + (agents_dir / "flow.specify.agent.md").write_text("# Specify") + + result = check_agent_files() + assert result.status == CheckStatus.PASS + + def test_mixed_convention(self, tmp_path, monkeypatch): + """Should warn for mixed old and new naming.""" + monkeypatch.chdir(tmp_path) + agents_dir = tmp_path / ".github" / "agents" + agents_dir.mkdir(parents=True) + (agents_dir / "backend-engineer.md").write_text("# Old") + (agents_dir / "frontend.engineer.md").write_text("# New") + + result = check_agent_files() + assert result.status == CheckStatus.WARN + assert "old hyphen naming" in result.message + # Should only report the old convention file + assert len(result.details["files"]) == 1 + + def test_skips_special_files(self, tmp_path, monkeypatch): + """Should skip README and underscore-prefixed files.""" + monkeypatch.chdir(tmp_path) + agents_dir = tmp_path / ".github" / "agents" + agents_dir.mkdir(parents=True) + (agents_dir / "README.md").write_text("# README") + (agents_dir / "_template.md").write_text("# Template") + (agents_dir / "backend.engineer.md").write_text("# Backend Engineer") + + result = check_agent_files() + assert result.status == CheckStatus.PASS + + +class TestCheckConstitution: + """Tests for constitution file check.""" + + def test_no_constitution(self, tmp_path, monkeypatch): + """Should warn when constitution not found.""" + monkeypatch.chdir(tmp_path) + result = check_constitution() + assert result.status == CheckStatus.WARN + assert "Constitution not found" in result.message + assert result.fix_command == "Run: /flow:init" + + def test_constitution_with_placeholders(self, tmp_path, monkeypatch): + """Should warn for constitution with placeholders.""" + monkeypatch.chdir(tmp_path) + memory_dir = tmp_path / "memory" + memory_dir.mkdir() + constitution = memory_dir / "constitution.md" + constitution.write_text("# [PROJECT_NAME]\n\nPrinciple: [PRINCIPLE_1_NAME]") + + result = check_constitution() + assert result.status == CheckStatus.WARN + assert "placeholders" in result.message + assert result.fix_command == "Run: /flow:init" + + def test_valid_constitution(self, tmp_path, monkeypatch): + """Should pass for properly configured constitution.""" + monkeypatch.chdir(tmp_path) + memory_dir = tmp_path / "memory" + memory_dir.mkdir() + constitution = memory_dir / "constitution.md" + constitution.write_text("# My Project\n\nPrinciple: Security First") + + result = check_constitution() + assert result.status == CheckStatus.PASS + assert "Present" in result.message + + +class TestRunAllChecks: + """Tests for run_all_checks function.""" + + def test_returns_all_check_results(self): + """Should return results for all checks.""" + results = run_all_checks() + assert len(results) == 7 # 7 checks defined + assert all(isinstance(r, CheckResult) for r in results) + + def test_check_names(self): + """Should include all expected check names.""" + results = run_all_checks() + names = {r.name for r in results} + expected = { + "flowspec CLI", + "Python version", + "backlog CLI", + "beads CLI", + "Workflow config", + "Agent file naming", + "Constitution", + } + assert names == expected diff --git a/tests/test_signing.py b/tests/test_signing.py new file mode 100644 index 00000000..dd0d0e55 --- /dev/null +++ b/tests/test_signing.py @@ -0,0 +1,411 @@ +"""Tests for GPG signing module. + +This module tests the GPG key generation, git configuration, and key management +functionality for agent commit signing. +""" + +from __future__ import annotations + +import subprocess +from unittest.mock import patch + +import pytest + +from flowspec_cli.signing import ( + GPGConfigurationError, + GPGError, + GPGKeyGenerationError, + configure_git_signing, + delete_agent_key, + generate_agent_key, + get_key_fingerprint, + get_key_info, + is_git_signing_enabled, + key_exists, +) + + +@pytest.fixture +def mock_keyring(): + """Mock the keyring module.""" + with patch("flowspec_cli.signing.keyring") as mock: + yield mock + + +@pytest.fixture +def mock_gpg_commands(): + """Mock GPG command execution.""" + with patch("flowspec_cli.signing._run_gpg_command") as mock: + yield mock + + +@pytest.fixture +def mock_git_commands(): + """Mock git command execution.""" + with patch("flowspec_cli.signing._run_git_command") as mock: + yield mock + + +class TestKeyGeneration: + """Tests for GPG key generation.""" + + def test_generate_agent_key_success(self, mock_keyring, mock_gpg_commands): + """Test successful key generation.""" + mock_keyring.get_password.return_value = None # no existing key + # Mock GPG generate-key with --status-fd output + mock_gpg_commands.return_value = ( + "[GNUPG:] KEY_CREATED B ABCD1234ABCD1234ABCD1234ABCD1234ABCD1234\n", + "", + 0, + ) + + fingerprint = generate_agent_key() + + assert fingerprint == "ABCD1234ABCD1234ABCD1234ABCD1234ABCD1234" + mock_keyring.set_password.assert_called_once_with( + "flowspec-agent-gpg", + "agent-key-fingerprint", + "ABCD1234ABCD1234ABCD1234ABCD1234ABCD1234", + ) + + def test_generate_agent_key_already_exists(self, mock_keyring, mock_gpg_commands): + """Test key generation when key already exists.""" + # Mock existing key + mock_keyring.get_password.return_value = "EXISTING1234" + + fingerprint = generate_agent_key() + + # Should return existing fingerprint without generating new key + assert fingerprint == "EXISTING1234" + mock_gpg_commands.assert_not_called() + + def test_generate_agent_key_generation_fails(self, mock_keyring, mock_gpg_commands): + """Test key generation failure.""" + mock_keyring.get_password.return_value = None + mock_gpg_commands.return_value = ("", "GPG error", 1) + + with pytest.raises(GPGKeyGenerationError, match="Failed to generate GPG key"): + generate_agent_key() + + def test_generate_agent_key_fingerprint_extraction_fails( + self, mock_keyring, mock_gpg_commands + ): + """Test failure when fingerprint cannot be extracted.""" + mock_keyring.get_password.return_value = None + # Status output missing KEY_CREATED line + mock_gpg_commands.return_value = ("invalid output\n", "", 0) + + with pytest.raises( + GPGKeyGenerationError, match="Failed to extract fingerprint" + ): + generate_agent_key() + + def test_generate_agent_key_keyring_storage_fails( + self, mock_keyring, mock_gpg_commands + ): + """Test failure when storing fingerprint in keyring fails.""" + mock_keyring.get_password.return_value = None + mock_keyring.set_password.side_effect = Exception("Keyring error") + # Status output with fingerprint + mock_gpg_commands.return_value = ( + "[GNUPG:] KEY_CREATED B ABCD1234\n", + "", + 0, + ) + + with pytest.raises( + GPGKeyGenerationError, match="Failed to store fingerprint in keyring" + ): + generate_agent_key() + + +class TestGitConfiguration: + """Tests for git signing configuration.""" + + def test_configure_git_signing_success( + self, mock_keyring, mock_git_commands, mock_gpg_commands, tmp_path + ): + """Test successful git configuration.""" + mock_keyring.get_password.return_value = "ABCD1234" + mock_gpg_commands.return_value = ("", "", 0) # secret key present + mock_git_commands.side_effect = [ + ("", "", 0), # rev-parse (check if git repo) + ("", "", 0), # config user.signingkey + ("", "", 0), # config commit.gpgsign + ] + + configure_git_signing(project_root=tmp_path) + + # Verify git commands were called correctly + assert mock_git_commands.call_count == 3 + assert mock_git_commands.call_args_list[1][0][0] == [ + "config", + "--local", + "user.signingkey", + "ABCD1234", + ] + assert mock_git_commands.call_args_list[2][0][0] == [ + "config", + "--local", + "commit.gpgsign", + "true", + ] + + def test_configure_git_signing_no_key(self, mock_keyring, tmp_path): + """Test git configuration without existing key.""" + mock_keyring.get_password.return_value = None + + with pytest.raises(GPGError, match="No agent GPG key found"): + configure_git_signing(project_root=tmp_path) + + def test_configure_git_signing_not_git_repo( + self, mock_keyring, mock_git_commands, mock_gpg_commands, tmp_path + ): + """Test git configuration in non-git directory.""" + mock_keyring.get_password.return_value = "ABCD1234" + mock_gpg_commands.return_value = ("", "", 0) # secret key present + mock_git_commands.return_value = ("", "not a git repository", 1) + + with pytest.raises(GPGConfigurationError, match="Not a git repository"): + configure_git_signing(project_root=tmp_path) + + def test_configure_git_signing_key_config_fails( + self, mock_keyring, mock_git_commands, mock_gpg_commands, tmp_path + ): + """Test git configuration when setting signingkey fails.""" + mock_keyring.get_password.return_value = "ABCD1234" + mock_gpg_commands.return_value = ("", "", 0) # secret key present + mock_git_commands.side_effect = [ + ("", "", 0), # rev-parse success + ("", "config error", 1), # config user.signingkey fails + ] + + with pytest.raises( + GPGConfigurationError, match="Failed to set user.signingkey" + ): + configure_git_signing(project_root=tmp_path) + + def test_configure_git_signing_gpgsign_config_fails( + self, mock_keyring, mock_git_commands, mock_gpg_commands, tmp_path + ): + """Test git configuration when setting commit.gpgsign fails.""" + mock_keyring.get_password.return_value = "ABCD1234" + mock_gpg_commands.return_value = ("", "", 0) # secret key present + mock_git_commands.side_effect = [ + ("", "", 0), # rev-parse success + ("", "", 0), # config user.signingkey success + ("", "config error", 1), # config commit.gpgsign fails + ] + + with pytest.raises(GPGConfigurationError, match="Failed to set commit.gpgsign"): + configure_git_signing(project_root=tmp_path) + + +class TestKeyRetrieval: + """Tests for key fingerprint retrieval.""" + + def test_get_key_fingerprint_success(self, mock_keyring): + """Test successful fingerprint retrieval.""" + mock_keyring.get_password.return_value = "ABCD1234" + + fingerprint = get_key_fingerprint() + + assert fingerprint == "ABCD1234" + mock_keyring.get_password.assert_called_once_with( + "flowspec-agent-gpg", "agent-key-fingerprint" + ) + + def test_get_key_fingerprint_not_found(self, mock_keyring): + """Test fingerprint retrieval when not found.""" + mock_keyring.get_password.return_value = None + + fingerprint = get_key_fingerprint() + + assert fingerprint is None + + def test_get_key_fingerprint_keyring_error(self, mock_keyring): + """Test fingerprint retrieval when keyring raises error.""" + mock_keyring.get_password.side_effect = Exception("Keyring error") + + fingerprint = get_key_fingerprint() + + assert fingerprint is None + + def test_key_exists_true(self, mock_keyring): + """Test key_exists when key is present.""" + mock_keyring.get_password.return_value = "ABCD1234" + + assert key_exists() is True + + def test_key_exists_false(self, mock_keyring): + """Test key_exists when key is not present.""" + mock_keyring.get_password.return_value = None + + assert key_exists() is False + + +class TestKeyInfo: + """Tests for detailed key information retrieval.""" + + def test_get_key_info_success(self, mock_keyring, mock_gpg_commands): + """Test successful key info retrieval.""" + mock_keyring.get_password.return_value = "ABCD1234" + gpg_output = """pub:u:4096:1:KEYID:1234567890:0 +uid:u::::1234567890::HASH::Flowspec Agent +""" + mock_gpg_commands.return_value = (gpg_output, "", 0) + + info = get_key_info() + + assert info is not None + assert info["fingerprint"] == "ABCD1234" + assert info["created"] == "1234567890" + assert info["uid"] == "Flowspec Agent " + + def test_get_key_info_no_key(self, mock_keyring): + """Test key info retrieval when no key exists.""" + mock_keyring.get_password.return_value = None + + info = get_key_info() + + assert info is None + + def test_get_key_info_gpg_fails(self, mock_keyring, mock_gpg_commands): + """Test key info retrieval when GPG command fails.""" + mock_keyring.get_password.return_value = "ABCD1234" + mock_gpg_commands.return_value = ("", "error", 1) + + info = get_key_info() + + assert info is None + + +class TestGitSigningStatus: + """Tests for checking git signing status.""" + + def test_is_git_signing_enabled_true(self, mock_git_commands, tmp_path): + """Test when git signing is enabled.""" + mock_git_commands.side_effect = [ + ("", "", 0), # rev-parse success + ("true\n", "", 0), # commit.gpgsign is true + ("ABCD1234\n", "", 0), # user.signingkey is set + ] + + assert is_git_signing_enabled(project_root=tmp_path) is True + + def test_is_git_signing_enabled_false_not_repo(self, mock_git_commands, tmp_path): + """Test when not a git repository.""" + mock_git_commands.return_value = ("", "not a repo", 1) + + assert is_git_signing_enabled(project_root=tmp_path) is False + + def test_is_git_signing_enabled_false_gpgsign_off( + self, mock_git_commands, tmp_path + ): + """Test when commit.gpgsign is not true.""" + mock_git_commands.side_effect = [ + ("", "", 0), # rev-parse success + ("false\n", "", 0), # commit.gpgsign is false + ] + + assert is_git_signing_enabled(project_root=tmp_path) is False + + def test_is_git_signing_enabled_false_no_signingkey( + self, mock_git_commands, tmp_path + ): + """Test when user.signingkey is not set.""" + mock_git_commands.side_effect = [ + ("", "", 0), # rev-parse success + ("true\n", "", 0), # commit.gpgsign is true + ("", "", 1), # user.signingkey not set + ] + + assert is_git_signing_enabled(project_root=tmp_path) is False + + +class TestKeyDeletion: + """Tests for key deletion (rotation).""" + + def test_delete_agent_key_success(self, mock_keyring, mock_gpg_commands): + """Test successful key deletion.""" + mock_keyring.get_password.return_value = "ABCD1234" + mock_gpg_commands.return_value = ("", "", 0) + + delete_agent_key() + + mock_gpg_commands.assert_called_once() + assert "--delete-secret-and-public-key" in mock_gpg_commands.call_args[0][0] + mock_keyring.delete_password.assert_called_once_with( + "flowspec-agent-gpg", "agent-key-fingerprint" + ) + + def test_delete_agent_key_no_key(self, mock_keyring, mock_gpg_commands): + """Test key deletion when no key exists.""" + mock_keyring.get_password.return_value = None + + delete_agent_key() + + # Should return early without calling GPG + mock_gpg_commands.assert_not_called() + mock_keyring.delete_password.assert_not_called() + + def test_delete_agent_key_gpg_fails(self, mock_keyring, mock_gpg_commands): + """Test key deletion when GPG command fails.""" + mock_keyring.get_password.return_value = "ABCD1234" + mock_gpg_commands.return_value = ("", "delete error", 1) + + with pytest.raises(GPGError, match="Failed to delete GPG key"): + delete_agent_key() + + def test_delete_agent_key_keyring_fails(self, mock_keyring, mock_gpg_commands): + """Test key deletion when keyring deletion fails.""" + mock_keyring.get_password.return_value = "ABCD1234" + mock_gpg_commands.return_value = ("", "", 0) + mock_keyring.delete_password.side_effect = Exception("Keyring error") + + with pytest.raises(GPGError, match="Failed to delete fingerprint from keyring"): + delete_agent_key() + + +class TestCommandExecution: + """Tests for command execution helpers.""" + + def test_run_gpg_command_timeout(self, mock_keyring): + """Test GPG command timeout handling.""" + with patch("flowspec_cli.signing.subprocess.run") as mock_run: + mock_run.side_effect = subprocess.TimeoutExpired("gpg", 30) + + with pytest.raises(GPGError, match="GPG command timed out"): + from flowspec_cli.signing import _run_gpg_command + + _run_gpg_command(["--version"]) + + def test_run_gpg_command_not_found(self, mock_keyring): + """Test GPG command when GPG is not installed.""" + with patch("flowspec_cli.signing.subprocess.run") as mock_run: + mock_run.side_effect = FileNotFoundError() + + with pytest.raises(GPGError, match="GPG not found"): + from flowspec_cli.signing import _run_gpg_command + + _run_gpg_command(["--version"]) + + def test_run_git_command_timeout(self, mock_keyring): + """Test git command timeout handling.""" + with patch("flowspec_cli.signing.subprocess.run") as mock_run: + mock_run.side_effect = subprocess.TimeoutExpired("git", 10) + + with pytest.raises(GPGConfigurationError, match="Git command timed out"): + from flowspec_cli.signing import _run_git_command + + _run_git_command(["status"]) + + def test_run_git_command_not_found(self, mock_keyring): + """Test git command when git is not installed.""" + with patch("flowspec_cli.signing.subprocess.run") as mock_run: + mock_run.side_effect = FileNotFoundError() + + with pytest.raises(GPGConfigurationError, match="Git not found"): + from flowspec_cli.signing import _run_git_command + + _run_git_command(["status"])