diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml new file mode 100644 index 0000000..5967508 --- /dev/null +++ b/.github/workflows/ci.yml @@ -0,0 +1,137 @@ +name: CI/CD Pipeline + +on: + push: + branches: [ main, develop ] + pull_request: + branches: [ main, develop ] + +jobs: + test: + name: Test Python ${{ matrix.python-version }} + runs-on: ubuntu-latest + strategy: + matrix: + python-version: ['3.8', '3.9', '3.10', '3.11'] + + steps: + - uses: actions/checkout@v3 + + - name: Set up Python ${{ matrix.python-version }} + uses: actions/setup-python@v4 + with: + python-version: ${{ matrix.python-version }} + + - name: Cache pip packages + uses: actions/cache@v3 + with: + path: ~/.cache/pip + key: ${{ runner.os }}-pip-${{ hashFiles('**/requirements.txt') }} + restore-keys: | + ${{ runner.os }}-pip- + + - name: Install dependencies + run: | + python -m pip install --upgrade pip + pip install -r requirements.txt + pip install pytest pytest-cov flake8 black + + - name: Install package + run: | + pip install -e . + + - name: Lint with flake8 + run: | + # Stop build if there are Python syntax errors or undefined names + flake8 src/ --count --select=E9,F63,F7,F82 --show-source --statistics + # Exit-zero treats all errors as warnings + flake8 src/ --count --exit-zero --max-complexity=10 --max-line-length=100 --statistics + + - name: Check code formatting with black + run: | + black --check src/ tests/ + + - name: Run tests with pytest + run: | + pytest tests/ -v --cov=humanoid_planner --cov-report=xml --cov-report=html + + - name: Upload coverage to Codecov + uses: codecov/codecov-action@v3 + with: + file: ./coverage.xml + flags: unittests + name: codecov-umbrella + fail_ci_if_error: false + + build: + name: Build and verify package + runs-on: ubuntu-latest + + steps: + - uses: actions/checkout@v3 + + - name: Set up Python + uses: actions/setup-python@v4 + with: + python-version: '3.10' + + - name: Install build dependencies + run: | + python -m pip install --upgrade pip + pip install build twine + + - name: Build package + run: | + python -m build + + - name: Check package + run: | + twine check dist/* + + - name: Upload artifacts + uses: actions/upload-artifact@v3 + with: + name: dist + path: dist/ + + docker: + name: Build Docker image + runs-on: ubuntu-latest + + steps: + - uses: actions/checkout@v3 + + - name: Set up Docker Buildx + uses: docker/setup-buildx-action@v2 + + - name: Build Docker image + uses: docker/build-push-action@v4 + with: + context: . + push: false + tags: humanoid-motion-planning:latest + cache-from: type=gha + cache-to: type=gha,mode=max + + docs: + name: Build documentation + runs-on: ubuntu-latest + + steps: + - uses: actions/checkout@v3 + + - name: Set up Python + uses: actions/setup-python@v4 + with: + python-version: '3.10' + + - name: Install dependencies + run: | + python -m pip install --upgrade pip + pip install -r requirements.txt + pip install sphinx sphinx-rtd-theme + + - name: Build documentation + run: | + cd docs + make html || echo "Documentation build not configured yet" diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml new file mode 100644 index 0000000..890b323 --- /dev/null +++ b/.pre-commit-config.yaml @@ -0,0 +1,55 @@ +# Pre-commit hooks for code quality +# Install with: pre-commit install + +repos: + - repo: https://github.com/pre-commit/pre-commit-hooks + rev: v4.4.0 + hooks: + - id: trailing-whitespace + - id: end-of-file-fixer + - id: check-yaml + - id: check-added-large-files + args: ['--maxkb=1000'] + - id: check-merge-conflict + - id: check-json + - id: check-toml + - id: detect-private-key + + - repo: https://github.com/psf/black + rev: 23.3.0 + hooks: + - id: black + language_version: python3.10 + args: ['--line-length=100'] + + - repo: https://github.com/PyCQA/flake8 + rev: 6.0.0 + hooks: + - id: flake8 + args: ['--max-line-length=100', '--extend-ignore=E203,W503'] + + - repo: https://github.com/PyCQA/isort + rev: 5.12.0 + hooks: + - id: isort + args: ['--profile=black', '--line-length=100'] + + - repo: https://github.com/pre-commit/mirrors-mypy + rev: v1.3.0 + hooks: + - id: mypy + additional_dependencies: [types-PyYAML] + args: ['--ignore-missing-imports'] + + - repo: https://github.com/asottile/pyupgrade + rev: v3.4.0 + hooks: + - id: pyupgrade + args: ['--py38-plus'] + + - repo: https://github.com/pycqa/pydocstyle + rev: 6.3.0 + hooks: + - id: pydocstyle + args: ['--convention=google'] + exclude: 'tests/.*' diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md new file mode 100644 index 0000000..b68133d --- /dev/null +++ b/CONTRIBUTING.md @@ -0,0 +1,260 @@ +# Contributing to Humanoid Motion Planning + +First off, thank you for considering contributing to this project! 🎉 + +## Table of Contents + +- [Code of Conduct](#code-of-conduct) +- [Getting Started](#getting-started) +- [Development Setup](#development-setup) +- [How to Contribute](#how-to-contribute) +- [Style Guidelines](#style-guidelines) +- [Testing](#testing) +- [Documentation](#documentation) +- [Pull Request Process](#pull-request-process) + +## Code of Conduct + +This project and everyone participating in it is governed by our Code of Conduct. By participating, you are expected to uphold this code. + +## Getting Started + +1. Fork the repository on GitHub +2. Clone your fork locally: + ```bash + git clone https://github.com/your-username/humanoid-motion-planning.git + cd humanoid-motion-planning + ``` + +## Development Setup + +### Using Python Virtual Environment + +```bash +# Create virtual environment +python -m venv venv +source venv/bin/activate # On Windows: venv\Scripts\activate + +# Install dependencies +pip install -r requirements.txt +pip install -e ".[dev]" + +# Install pre-commit hooks +pip install pre-commit +pre-commit install +``` + +### Using Docker + +```bash +# Build and run development container +docker-compose up -d dev +docker-compose exec dev bash +``` + +## How to Contribute + +### Reporting Bugs + +Before creating bug reports, please check existing issues. When you create a bug report, include as many details as possible: + +- Use a clear and descriptive title +- Describe the exact steps to reproduce the problem +- Provide specific examples +- Describe the behavior you observed and what you expected +- Include code samples and error messages +- Note your environment (OS, Python version, etc.) + +### Suggesting Enhancements + +Enhancement suggestions are tracked as GitHub issues. When creating an enhancement suggestion: + +- Use a clear and descriptive title +- Provide a detailed description of the proposed feature +- Explain why this enhancement would be useful +- Include code examples if applicable + +### Code Contributions + +1. **Find or create an issue** - Make sure there's an issue for what you're working on +2. **Create a branch** - Branch from `develop` (not `main`): + ```bash + git checkout -b feature/your-feature-name develop + ``` +3. **Make your changes** - Follow the style guidelines +4. **Write tests** - Ensure your changes are tested +5. **Run tests** - Make sure all tests pass: + ```bash + pytest tests/ -v + ``` +6. **Commit your changes** - Use clear commit messages: + ```bash + git commit -m "Add feature: brief description" + ``` +7. **Push and create PR** - Push to your fork and create a pull request + +## Style Guidelines + +### Python Style + +We follow PEP 8 with some modifications: + +- Line length: 100 characters +- Use Black for formatting +- Use type hints where possible +- Write docstrings for all public functions/classes (Google style) + +```python +def compute_zmp(forces: np.ndarray, positions: np.ndarray) -> np.ndarray: + """ + Compute Zero Moment Point from ground reaction forces. + + Args: + forces: Ground reaction forces (N x 3) + positions: Contact positions (N x 3) + + Returns: + zmp: Zero Moment Point (2,) + """ + # Implementation +``` + +### Code Formatting + +Run these before committing: + +```bash +# Format code +black src/ tests/ + +# Check linting +flake8 src/ tests/ + +# Sort imports +isort src/ tests/ +``` + +Or use pre-commit: + +```bash +pre-commit run --all-files +``` + +## Testing + +### Writing Tests + +- Place tests in `tests/` directory +- Name test files `test_*.py` +- Use descriptive test names: `test_compute_zmp_with_equal_forces` +- Test edge cases and error conditions +- Aim for >90% code coverage + +### Running Tests + +```bash +# Run all tests +pytest tests/ -v + +# Run with coverage +pytest tests/ --cov=humanoid_planner --cov-report=html + +# Run specific test file +pytest tests/test_zmp_constraint.py -v + +# Run specific test +pytest tests/test_zmp_constraint.py::TestZMPConstraint::test_compute_zmp -v +``` + +## Documentation + +### Code Documentation + +- Use Google-style docstrings +- Document all parameters, return values, and exceptions +- Include examples for complex functions + +### README and Docs + +- Update README.md if you add features +- Add algorithm documentation in `docs/` +- Include usage examples + +## Pull Request Process + +1. **Update documentation** - Ensure README and docs are updated +2. **Add tests** - New features must have tests +3. **Pass CI** - All CI checks must pass +4. **Update CHANGELOG** - Add your changes to CHANGELOG.md +5. **Request review** - Request review from maintainers +6. **Address feedback** - Make requested changes +7. **Squash commits** - Keep history clean (if requested) + +### PR Title Format + +Use conventional commits format: + +- `feat: Add new trajectory optimizer` +- `fix: Correct ZMP calculation bug` +- `docs: Update API documentation` +- `test: Add tests for kinematics` +- `refactor: Improve support polygon computation` +- `chore: Update dependencies` + +### PR Description Template + +```markdown +## Description +Brief description of changes + +## Type of Change +- [ ] Bug fix +- [ ] New feature +- [ ] Breaking change +- [ ] Documentation update + +## Testing +Describe testing performed + +## Checklist +- [ ] Code follows style guidelines +- [ ] Self-review completed +- [ ] Comments added for complex code +- [ ] Documentation updated +- [ ] Tests added/updated +- [ ] All tests pass +- [ ] No breaking changes (or documented) +``` + +## Project Structure + +``` +humanoid-motion-planning/ +├── src/humanoid_planner/ # Main package +│ ├── motion_planner.py # Core motion planning +│ ├── zmp_constraint.py # ZMP constraints +│ ├── kinematics.py # FK/IK solvers +│ ├── perception.py # Perception modules +│ └── ... +├── tests/ # Test files +├── config/ # Configuration files +├── scripts/ # Example scripts +├── docs/ # Documentation +└── examples/ # Example notebooks +``` + +## Recognition + +Contributors will be recognized in: +- README.md contributors section +- Release notes +- GitHub contributors page + +## Questions? + +Feel free to: +- Open an issue for questions +- Join discussions in GitHub Discussions +- Contact maintainers + +Thank you for contributing! 🚀 diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..9094376 --- /dev/null +++ b/Dockerfile @@ -0,0 +1,46 @@ +# Use official Python runtime as base image +FROM python:3.10-slim + +# Set working directory +WORKDIR /app + +# Install system dependencies +RUN apt-get update && apt-get install -y \ + build-essential \ + cmake \ + git \ + libgl1-mesa-glx \ + libglib2.0-0 \ + && rm -rf /var/lib/apt/lists/* + +# Copy requirements first for better caching +COPY requirements.txt . +RUN pip install --no-cache-dir -r requirements.txt + +# Install development dependencies +RUN pip install --no-cache-dir \ + pytest \ + pytest-cov \ + black \ + flake8 \ + jupyter \ + ipython + +# Copy the rest of the application +COPY . . + +# Install the package +RUN pip install -e . + +# Create directories for data and results +RUN mkdir -p /app/data /app/results /app/logs + +# Set environment variables +ENV PYTHONUNBUFFERED=1 +ENV PYTHONPATH=/app/src:$PYTHONPATH + +# Expose port for Jupyter +EXPOSE 8888 + +# Default command +CMD ["/bin/bash"] diff --git a/config/perception_config.yaml b/config/perception_config.yaml new file mode 100644 index 0000000..132df47 --- /dev/null +++ b/config/perception_config.yaml @@ -0,0 +1,145 @@ +# Perception Configuration +# Parameters for object detection, environment state estimation, and workspace analysis + +perception: + # Object detection settings + object_detection: + enabled: true + detection_rate: 30 # Hz + detection_range: 3.0 # meters + confidence_threshold: 0.6 + + # Camera configuration + camera: + rgb_topic: "/camera/color/image_raw" + depth_topic: "/camera/depth/image_raw" + camera_info_topic: "/camera/color/camera_info" + frame_id: "camera_link" + + # Intrinsics (example values) + fx: 525.0 + fy: 525.0 + cx: 319.5 + cy: 239.5 + width: 640 + height: 480 + + # Detection classes + object_classes: + - "target" + - "obstacle" + - "surface" + - "container" + - "tool" + + # Model settings + model: + type: "yolo" # or "mask_rcnn", "detectron2" + weights_path: "models/yolo_weights.pt" + device: "cuda" # or "cpu" + batch_size: 1 + + # Point cloud processing + point_cloud: + enabled: true + topic: "/camera/depth/points" + frame_id: "camera_link" + downsample_voxel_size: 0.01 # meters + + # Segmentation + segmentation: + method: "euclidean_clustering" + cluster_tolerance: 0.02 + min_cluster_size: 100 + max_cluster_size: 25000 + + # Plane detection (for surfaces) + plane_detection: + enabled: true + distance_threshold: 0.01 + max_iterations: 1000 + + # Environment state estimation + environment: + update_rate: 10 # Hz + + # Workspace bounds + workspace: + min_bounds: [-1.5, -1.5, 0.0] + max_bounds: [1.5, 1.5, 2.5] + + # Object tracking + tracking: + enabled: true + max_tracking_distance: 0.3 # meters + tracking_history: 30 # frames + + # Obstacle expansion (for safety) + obstacle_padding: 0.05 # meters + + # Workspace analysis + workspace_analysis: + enabled: true + + # Reachability map + reachability: + resolution: 0.05 # meters + num_samples: 5000 + cache_results: true + cache_path: "cache/reachability_map.npy" + + # Collision checking + collision: + enabled: true + check_rate: 100 # Hz + min_clearance: 0.05 # meters + + # Goal prediction + goal_prediction: + enabled: true + + # Grasp planning + grasp: + approach_distance: 0.15 # meters + retreat_distance: 0.2 # meters + grasp_offset: 0.05 # meters into object + + # Approach directions (preferences) + preferred_approaches: + - [0, 0, -1] # From above + - [0, -1, 0] # From front + - [1, 0, 0] # From side + + # Via point generation + via_points: + enabled: true + num_via_points: 3 + obstacle_clearance: 0.2 # meters + + # Goal quality evaluation + quality: + distance_weight: 0.3 + reachability_weight: 0.4 + clearance_weight: 0.3 + +# Visualization settings +visualization: + enabled: true + + # Markers + markers: + detected_objects: true + support_polygon: true + zmp: true + com: true + trajectory: true + reachability_map: false + + # Colors (RGBA) + colors: + target: [0.0, 1.0, 0.0, 0.8] + obstacle: [1.0, 0.0, 0.0, 0.6] + support_polygon: [0.0, 0.0, 1.0, 0.5] + zmp: [1.0, 0.0, 0.0, 1.0] + com: [0.0, 1.0, 1.0, 1.0] + trajectory: [1.0, 1.0, 0.0, 0.8] diff --git a/config/robot_config.yaml b/config/robot_config.yaml new file mode 100644 index 0000000..f82b4dd --- /dev/null +++ b/config/robot_config.yaml @@ -0,0 +1,123 @@ +# Robot Configuration +# Defines robot-specific parameters for humanoid motion planning + +robot: + name: "humanoid_robot" + num_joints: 32 # Total DOF + + # Joint limits (radians) + joint_limits: + # Left arm (7 DOF) + left_arm: + shoulder_pitch: [-1.57, 1.57] + shoulder_roll: [-1.57, 1.57] + shoulder_yaw: [-1.57, 1.57] + elbow: [0.0, 2.62] + wrist_pitch: [-1.57, 1.57] + wrist_roll: [-1.57, 1.57] + wrist_yaw: [-1.57, 1.57] + + # Right arm (7 DOF) + right_arm: + shoulder_pitch: [-1.57, 1.57] + shoulder_roll: [-1.57, 1.57] + shoulder_yaw: [-1.57, 1.57] + elbow: [0.0, 2.62] + wrist_pitch: [-1.57, 1.57] + wrist_roll: [-1.57, 1.57] + wrist_yaw: [-1.57, 1.57] + + # Torso (3 DOF) + torso: + pitch: [-0.5, 0.5] + roll: [-0.3, 0.3] + yaw: [-0.785, 0.785] + + # Left leg (6 DOF) + left_leg: + hip_pitch: [-1.57, 1.57] + hip_roll: [-0.5, 0.5] + hip_yaw: [-0.785, 0.785] + knee: [0.0, 2.35] + ankle_pitch: [-0.785, 0.785] + ankle_roll: [-0.5, 0.5] + + # Right leg (6 DOF) + right_leg: + hip_pitch: [-1.57, 1.57] + hip_roll: [-0.5, 0.5] + hip_yaw: [-0.785, 0.785] + knee: [0.0, 2.35] + ankle_pitch: [-0.785, 0.785] + ankle_roll: [-0.5, 0.5] + + # Velocity limits (rad/s) + velocity_limits: + arm_joints: 2.0 + torso_joints: 1.0 + leg_joints: 1.5 + + # Acceleration limits (rad/s²) + acceleration_limits: + arm_joints: 5.0 + torso_joints: 3.0 + leg_joints: 4.0 + + # Mass properties + mass: + total: 60.0 # kg + torso: 25.0 + head: 5.0 + left_arm: 4.0 + right_arm: 4.0 + left_leg: 11.0 + right_leg: 11.0 + + # Link dimensions (meters) + dimensions: + torso: + height: 0.6 + width: 0.4 + depth: 0.3 + + upper_arm: 0.28 + forearm: 0.26 + hand: + length: 0.15 + width: 0.08 + + thigh: 0.4 + shin: 0.42 + foot: + length: 0.26 + width: 0.12 + height: 0.08 + + # End-effector properties + end_effectors: + left_hand: + max_reach: 0.8 # meters from shoulder + gripper_length: 0.15 + grasp_force: 50.0 # Newtons + + right_hand: + max_reach: 0.8 + gripper_length: 0.15 + grasp_force: 50.0 + + # Center of Mass (default standing pose) + default_com: + height: 0.95 # meters above ground + position: [0.0, 0.0, 0.95] + +# Base configuration +base: + height: 1.0 # Standing height (meters) + foot_separation: 0.2 # Lateral distance between feet (meters) + +# URDF/Model paths +model: + urdf_path: "robots/humanoid.urdf" + mesh_directory: "robots/meshes/" + use_collision_geometry: true + use_visual_geometry: true diff --git a/docker-compose.yml b/docker-compose.yml new file mode 100644 index 0000000..46dff37 --- /dev/null +++ b/docker-compose.yml @@ -0,0 +1,51 @@ +version: '3.8' + +services: + humanoid-planner: + build: + context: . + dockerfile: Dockerfile + image: humanoid-motion-planning:latest + container_name: humanoid-planner + volumes: + - .:/app + - ./data:/app/data + - ./results:/app/results + - ./logs:/app/logs + environment: + - PYTHONUNBUFFERED=1 + - DISPLAY=${DISPLAY} + ports: + - "8888:8888" # Jupyter + stdin_open: true + tty: true + command: bash + + jupyter: + build: + context: . + dockerfile: Dockerfile + image: humanoid-motion-planning:latest + container_name: humanoid-jupyter + volumes: + - .:/app + - ./data:/app/data + - ./results:/app/results + ports: + - "8888:8888" + command: jupyter notebook --ip=0.0.0.0 --port=8888 --no-browser --allow-root + + # Development environment with hot-reload + dev: + build: + context: . + dockerfile: Dockerfile + image: humanoid-motion-planning:latest + container_name: humanoid-dev + volumes: + - .:/app + environment: + - PYTHONUNBUFFERED=1 + stdin_open: true + tty: true + command: bash diff --git a/examples/complete_demo.py b/examples/complete_demo.py new file mode 100644 index 0000000..01e5b4a --- /dev/null +++ b/examples/complete_demo.py @@ -0,0 +1,263 @@ +#!/usr/bin/env python3 +""" +Complete End-to-End Reaching Demo + +Demonstrates the full pipeline: +1. Environment perception and object detection +2. Goal prediction and grasp planning +3. Trajectory optimization with ZMP constraints +4. Stability verification +5. Visualization +""" + +import numpy as np +import argparse +import matplotlib.pyplot as plt +from humanoid_planner import ( + ObjectDetector, EnvironmentState, GoalPredictor, + WorkspaceAnalyzer, ObjectType, + TrajectoryOptimizer, ZMPConstraint, + SupportPolygonCalculator, StabilityAnalyzer, + ForwardKinematics +) + + +def setup_environment(): + """Create a demo environment with objects.""" + detector = ObjectDetector(detection_range=3.0) + + # Add target object + target = detector.add_object( + position=np.array([0.6, 0.2, 1.1]), + object_type=ObjectType.TARGET, + dimensions=np.array([0.08, 0.08, 0.15]), + confidence=0.95 + ) + print(f"✓ Target detected at: {target.position}") + + # Add obstacles + obstacle1 = detector.add_object( + position=np.array([0.4, 0.0, 0.9]), + object_type=ObjectType.OBSTACLE, + dimensions=np.array([0.15, 0.15, 0.3]), + confidence=0.88 + ) + print(f"✓ Obstacle 1 at: {obstacle1.position}") + + obstacle2 = detector.add_object( + position=np.array([0.5, -0.3, 0.8]), + object_type=ObjectType.OBSTACLE, + dimensions=np.array([0.2, 0.2, 0.2]), + confidence=0.82 + ) + print(f"✓ Obstacle 2 at: {obstacle2.position}") + + # Add surface + surface = detector.add_object( + position=np.array([0.6, 0.0, 0.7]), + object_type=ObjectType.SURFACE, + dimensions=np.array([1.0, 0.8, 0.05]), + confidence=0.99 + ) + print(f"✓ Surface at: {surface.position}") + + return detector + + +def predict_grasp(detector, predictor): + """Predict optimal grasp position.""" + target = detector.get_objects_by_type(ObjectType.TARGET)[0] + obstacles = detector.get_objects_by_type(ObjectType.OBSTACLE) + + # Predict grasp position + grasp_pos, grasp_orient = predictor.predict_grasp_position( + target.position, + target.dimensions, + approach_direction=np.array([0, 0, -1]) # From above + ) + print(f"\n✓ Predicted grasp position: {grasp_pos}") + + # Generate via points + start_pos = np.array([0.0, 0.0, 1.2]) + via_points = predictor.predict_via_points( + start_pos, grasp_pos, obstacles, num_via_points=3 + ) + print(f"✓ Generated {len(via_points)} via points") + + return grasp_pos, via_points + + +def plan_trajectory(num_joints=7): + """Plan joint space trajectory.""" + optimizer = TrajectoryOptimizer(num_joints=num_joints, dt=0.1) + + # Simplified joint space planning + q_start = np.zeros(num_joints) + q_goal = np.array([0.3, 0.5, -0.4, 0.8, 0.2, -0.3, 0.1]) + + print("\n✓ Planning trajectory...") + trajectory = optimizer.optimize_trajectory( + q_start=q_start, + q_goal=q_goal, + num_waypoints=25, + q_limits=(np.ones(num_joints) * -np.pi, np.ones(num_joints) * np.pi), + velocity_limit=2.0 + ) + + print(f"✓ Trajectory planned with {len(trajectory)} waypoints") + print(f" Duration: {len(trajectory) * 0.1:.2f} seconds") + + return trajectory, optimizer + + +def verify_stability(trajectory): + """Verify ZMP stability along trajectory.""" + # Setup stability analysis + zmp_constraint = ZMPConstraint(margin=0.02) + support_calc = SupportPolygonCalculator(foot_size=(0.25, 0.12)) + stability_analyzer = StabilityAnalyzer() + + # Define foot poses (double support) + left_foot = np.array([0.0, 0.1, 0.0, 0.0, 0.0, 0.0]) + right_foot = np.array([0.0, -0.1, 0.0, 0.0, 0.0, 0.0]) + + support_polygon = support_calc.compute_double_support_polygon( + left_foot, right_foot + ) + + print("\n✓ Verifying stability...") + print(f" Support polygon area: {support_calc.compute_polygon_area(support_polygon):.4f} m²") + + # Simulate CoM trajectory (simplified) + violations = 0 + margins = [] + + for i, q in enumerate(trajectory): + # Simplified CoM position + com_pos = np.array([0.0, 0.0, 0.95]) + com_acc = np.zeros(3) + + zmp = zmp_constraint.compute_zmp_from_com(com_pos, com_acc, height=0.95) + + stable = zmp_constraint.is_stable(zmp, support_polygon) + margin = zmp_constraint.compute_stability_margin(zmp, support_polygon) + + if not stable: + violations += 1 + margins.append(margin) + + print(f" ZMP violations: {violations}/{len(trajectory)}") + print(f" Min stability margin: {min(margins):.4f} m") + print(f" Avg stability margin: {np.mean(margins):.4f} m") + + return support_polygon, margins + + +def visualize_results(trajectory, optimizer, support_polygon, margins): + """Visualize trajectory and stability metrics.""" + fig, axes = plt.subplots(2, 2, figsize=(15, 10)) + + # Plot 1: Joint trajectories + time = np.arange(len(trajectory)) * 0.1 + for joint_idx in range(min(7, trajectory.shape[1])): + axes[0, 0].plot(time, trajectory[:, joint_idx], label=f'Joint {joint_idx+1}') + + axes[0, 0].set_xlabel('Time (s)') + axes[0, 0].set_ylabel('Joint Angle (rad)') + axes[0, 0].set_title('Joint Trajectories') + axes[0, 0].legend(loc='upper right', fontsize=8) + axes[0, 0].grid(True) + + # Plot 2: Velocity profile + velocities = optimizer.compute_velocity_profile(trajectory) + time_vel = np.arange(len(velocities)) * 0.1 + max_vels = np.max(np.abs(velocities), axis=1) + + axes[0, 1].plot(time_vel, max_vels, 'b-', linewidth=2) + axes[0, 1].axhline(y=2.0, color='r', linestyle='--', label='Velocity Limit') + axes[0, 1].set_xlabel('Time (s)') + axes[0, 1].set_ylabel('Max Joint Velocity (rad/s)') + axes[0, 1].set_title('Velocity Profile') + axes[0, 1].legend() + axes[0, 1].grid(True) + + # Plot 3: Support polygon + polygon_closed = np.vstack([support_polygon, support_polygon[0]]) + axes[1, 0].plot(polygon_closed[:, 0], polygon_closed[:, 1], 'b-', linewidth=2) + axes[1, 0].fill(support_polygon[:, 0], support_polygon[:, 1], alpha=0.3, color='blue') + axes[1, 0].plot(0, 0, 'ro', markersize=10, label='ZMP (simplified)') + axes[1, 0].set_xlabel('X (m)') + axes[1, 0].set_ylabel('Y (m)') + axes[1, 0].set_title('Support Polygon') + axes[1, 0].legend() + axes[1, 0].grid(True) + axes[1, 0].axis('equal') + + # Plot 4: Stability margins + time_margin = np.arange(len(margins)) * 0.1 + axes[1, 1].plot(time_margin, margins, 'g-', linewidth=2) + axes[1, 1].axhline(y=0, color='r', linestyle='--', label='Stability Threshold') + axes[1, 1].set_xlabel('Time (s)') + axes[1, 1].set_ylabel('Stability Margin (m)') + axes[1, 1].set_title('Stability Margins Over Time') + axes[1, 1].legend() + axes[1, 1].grid(True) + + plt.tight_layout() + plt.savefig('/tmp/reaching_demo_results.png', dpi=150, bbox_inches='tight') + print("\n✓ Visualization saved to /tmp/reaching_demo_results.png") + + return fig + + +def main(): + """Main demo function.""" + parser = argparse.ArgumentParser(description='Complete reaching task demo') + parser.add_argument('--visualize', action='store_true', help='Show visualizations') + args = parser.parse_args() + + print("=" * 70) + print("HUMANOID REACHING TASK - COMPLETE DEMO") + print("=" * 70) + + # Step 1: Setup environment + print("\n[1/5] Setting up environment...") + detector = setup_environment() + + # Step 2: Predict grasp + print("\n[2/5] Predicting grasp position...") + predictor = GoalPredictor() + grasp_pos, via_points = predict_grasp(detector, predictor) + + # Step 3: Plan trajectory + print("\n[3/5] Planning trajectory...") + trajectory, optimizer = plan_trajectory(num_joints=7) + + # Step 4: Verify stability + print("\n[4/5] Verifying stability...") + support_polygon, margins = verify_stability(trajectory) + + # Step 5: Visualize + print("\n[5/5] Generating visualization...") + if args.visualize: + fig = visualize_results(trajectory, optimizer, support_polygon, margins) + plt.show() + else: + visualize_results(trajectory, optimizer, support_polygon, margins) + + # Summary + print("\n" + "=" * 70) + print("DEMO COMPLETED SUCCESSFULLY!") + print("=" * 70) + print("\nSummary:") + print(f" • Objects detected: {len(detector.detected_objects)}") + print(f" • Trajectory waypoints: {len(trajectory)}") + print(f" • Execution time: {len(trajectory) * 0.1:.2f} s") + print(f" • Min stability margin: {min(margins):.4f} m") + print(f" • ZMP violations: {sum(1 for m in margins if m < 0)}") + print("\n✓ All systems operational!") + print("=" * 70) + + +if __name__ == "__main__": + main() diff --git a/requirements.txt b/requirements.txt index 5677699..77d1116 100644 --- a/requirements.txt +++ b/requirements.txt @@ -2,3 +2,4 @@ numpy>=1.21.0 scipy>=1.7.0 matplotlib>=3.4.0 pyyaml>=5.4.0 +dataclasses>=0.6;python_version<"3.7" diff --git a/src/humanoid_planner/__init__.py b/src/humanoid_planner/__init__.py index 56f5153..73401ad 100644 --- a/src/humanoid_planner/__init__.py +++ b/src/humanoid_planner/__init__.py @@ -6,9 +6,12 @@ - SupportPolygonCalculator: Support polygon computation - TrajectoryOptimizer: Trajectory optimization - ForwardKinematics, InverseKinematics: Robot kinematics +- StabilityAnalyzer: Stability analysis and CoM tracking +- Perception: Object detection and workspace analysis Optional modules (require Drake): - MotionPlanner: Full motion planning with Drake simulation +- DrakeTrajectoryOptimizer: Advanced trajectory optimization with Drake """ __version__ = "0.1.0" @@ -18,6 +21,11 @@ from .support_polygon import SupportPolygonCalculator from .trajectory_optimizer import TrajectoryOptimizer from .kinematics import ForwardKinematics, InverseKinematics +from .stability_analysis import StabilityAnalyzer, CenterOfMassTracker +from .perception import ( + ObjectDetector, EnvironmentState, WorkspaceAnalyzer, + GoalPredictor, DetectedObject, ObjectType +) __all__ = [ "ZMPConstraint", @@ -25,12 +33,21 @@ "TrajectoryOptimizer", "ForwardKinematics", "InverseKinematics", + "StabilityAnalyzer", + "CenterOfMassTracker", + "ObjectDetector", + "EnvironmentState", + "WorkspaceAnalyzer", + "GoalPredictor", + "DetectedObject", + "ObjectType", ] # Try to import MotionPlanner (requires Drake) try: - from .motion_planner import MotionPlanner - __all__.append("MotionPlanner") + from .motion_planner import MotionPlanner, DrakeTrajectoryOptimizer + __all__.extend(["MotionPlanner", "DrakeTrajectoryOptimizer"]) except ImportError: MotionPlanner = None + DrakeTrajectoryOptimizer = None pass diff --git a/src/humanoid_planner/motion_planner.py b/src/humanoid_planner/motion_planner.py index c322356..4083dc4 100644 --- a/src/humanoid_planner/motion_planner.py +++ b/src/humanoid_planner/motion_planner.py @@ -12,12 +12,17 @@ """ import numpy as np -from typing import Optional +from typing import Optional, Tuple, List, Dict +from .zmp_constraint import ZMPConstraint +from .support_polygon import SupportPolygonCalculator +from .trajectory_optimizer import TrajectoryOptimizer +from .stability_analysis import StabilityAnalyzer try: from pydrake.all import ( MultibodyPlant, Parser, DiagramBuilder, Simulator, - MathematicalProgram, Solve, eq, le, ge + MathematicalProgram, Solve, eq, le, ge, AddMultibodyPlantSceneGraph, + SnoptSolver, IpoptSolver ) DRAKE_AVAILABLE = True except ImportError: @@ -28,28 +33,405 @@ class MotionPlanner: """ - Main motion planner for humanoid reaching tasks. + Main motion planner for humanoid reaching tasks with ZMP constraints. - Requires Drake for full simulation functionality. + Integrates Drake optimization framework for trajectory generation. """ - def __init__(self, urdf_path: Optional[str] = None): - """Initialize motion planner.""" + def __init__(self, + urdf_path: Optional[str] = None, + time_step: float = 0.01, + zmp_margin: float = 0.02): + """ + Initialize motion planner. + + Args: + urdf_path: Path to robot URDF file + time_step: Simulation time step + zmp_margin: Safety margin for ZMP constraints + """ + self.urdf_path = urdf_path + self.time_step = time_step + + # Initialize sub-modules + self.zmp_constraint = ZMPConstraint(margin=zmp_margin) + self.support_calc = SupportPolygonCalculator() + self.trajectory_opt = TrajectoryOptimizer(num_joints=12, dt=time_step) + self.stability_analyzer = StabilityAnalyzer(zmp_margin=zmp_margin) + + # Drake components (if available) + self.plant = None + self.plant_context = None + + if DRAKE_AVAILABLE and urdf_path is not None: + self._initialize_drake(urdf_path) + + def _initialize_drake(self, urdf_path: str): + """ + Initialize Drake MultibodyPlant from URDF. + + Args: + urdf_path: Path to URDF file + """ if not DRAKE_AVAILABLE: - raise ImportError( - "MotionPlanner requires Drake. Install with: pip install pydrake\n" - "For planning without simulation, use ZMPConstraint, " - "SupportPolygonCalculator, and TrajectoryOptimizer directly." + return + + try: + builder = DiagramBuilder() + self.plant, scene_graph = AddMultibodyPlantSceneGraph( + builder, time_step=self.time_step ) + + parser = Parser(self.plant) + parser.AddModelFromFile(urdf_path) + + self.plant.Finalize() + + diagram = builder.Build() + self.plant_context = diagram.CreateDefaultContext() + + print(f"Drake plant initialized with {self.plant.num_positions()} DOFs") + + except Exception as e: + print(f"Warning: Failed to initialize Drake plant: {e}") + self.plant = None + + def plan_reaching_task(self, + target_pose: np.ndarray, + start_config: Optional[np.ndarray] = None, + foot_poses: Optional[Tuple[np.ndarray, np.ndarray]] = None, + max_planning_time: float = 5.0, + num_waypoints: int = 20) -> Dict: + """ + Plan a reaching trajectory with ZMP constraints. - self.urdf_path = urdf_path - # Drake initialization would go here + Args: + target_pose: Target end-effector pose [x, y, z, qx, qy, qz, qw] + start_config: Starting joint configuration + foot_poses: (left_foot, right_foot) poses for stability + max_planning_time: Maximum planning time (seconds) + num_waypoints: Number of trajectory waypoints + + Returns: + result: Dictionary with trajectory and metrics + """ + # Default starting configuration + if start_config is None: + start_config = np.zeros(12) # Assuming 12-DOF robot + + # Default foot poses (double support) + if foot_poses is None: + left_foot = np.array([0.0, 0.1, 0.0, 0.0, 0.0, 0.0]) + right_foot = np.array([0.0, -0.1, 0.0, 0.0, 0.0, 0.0]) + foot_poses = (left_foot, right_foot) + + # Compute support polygon + support_polygon = self.support_calc.compute_double_support_polygon( + foot_poses[0], foot_poses[1] + ) + + # Use inverse kinematics to find goal configuration + target_pos = target_pose[:3] + goal_config = self._solve_ik_for_target(target_pos, start_config) + + if goal_config is None: + print("Warning: IK failed, using heuristic goal") + goal_config = start_config + np.random.uniform(-0.3, 0.3, len(start_config)) + + # Optimize trajectory + trajectory = self.trajectory_opt.optimize_trajectory( + q_start=start_config, + q_goal=goal_config, + num_waypoints=num_waypoints, + q_limits=self._get_joint_limits(len(start_config)), + velocity_limit=1.0 + ) + + # Verify ZMP constraints + zmp_violations = self._verify_zmp_constraints(trajectory, support_polygon) + + # Compute metrics + metrics = self._compute_trajectory_metrics(trajectory, support_polygon) + + result = { + 'trajectory': trajectory, + 'goal_config': goal_config, + 'support_polygon': support_polygon, + 'zmp_violations': zmp_violations, + 'metrics': metrics, + 'success': zmp_violations == 0 + } + + return result + + def _solve_ik_for_target(self, + target_pos: np.ndarray, + q_init: np.ndarray) -> Optional[np.ndarray]: + """ + Solve inverse kinematics for target position. + + Args: + target_pos: Target position (3,) + q_init: Initial guess + + Returns: + q_solution: Joint configuration or None + """ + # Simplified IK - in practice would use Drake's IK or custom solver + from scipy.optimize import minimize + + def cost(q): + # Simplified FK - just use a simple chain model + pos = self._compute_simplified_fk(q) + return np.linalg.norm(pos - target_pos) ** 2 + + q_limits = self._get_joint_limits(len(q_init)) + bounds = [(q_limits[0][i], q_limits[1][i]) for i in range(len(q_init))] + + result = minimize(cost, q_init, method='SLSQP', bounds=bounds) + + if result.success and cost(result.x) < 0.01: # 10cm tolerance + return result.x + + return None + + def _compute_simplified_fk(self, q: np.ndarray) -> np.ndarray: + """ + Simplified forward kinematics for reaching arm. + + Args: + q: Joint angles + + Returns: + pos: End-effector position (3,) + """ + # Simplified model - just sum up link contributions + # In practice, would use proper DH parameters or Drake + link_lengths = np.ones(min(6, len(q))) * 0.3 # 30cm links + + x = np.sum(link_lengths[:len(q)] * np.cos(q[:len(link_lengths)])) + y = np.sum(link_lengths[:len(q)] * np.sin(q[:len(link_lengths)])) + z = 0.8 + np.sum(q[len(link_lengths):] * 0.1) # Torso height + contribution + + return np.array([x, y, z]) + + def _get_joint_limits(self, num_joints: int) -> Tuple[np.ndarray, np.ndarray]: + """ + Get joint limits. + + Args: + num_joints: Number of joints + + Returns: + q_min, q_max: Joint limits + """ + q_min = -np.pi * np.ones(num_joints) + q_max = np.pi * np.ones(num_joints) + + return q_min, q_max + + def _verify_zmp_constraints(self, + trajectory: np.ndarray, + support_polygon: np.ndarray) -> int: + """ + Verify ZMP constraints along trajectory. + + Args: + trajectory: Joint trajectory (T x num_joints) + support_polygon: Support polygon vertices + + Returns: + num_violations: Number of ZMP violations + """ + num_violations = 0 + + for q in trajectory: + # Compute CoM (simplified) + com_pos = self._compute_com_position(q) + com_acc = np.zeros(3) # Simplified - no dynamics + + # Compute ZMP + zmp = self.zmp_constraint.compute_zmp_from_com( + com_pos, com_acc, height=com_pos[2] + ) + + # Check stability + if not self.zmp_constraint.is_stable(zmp, support_polygon): + num_violations += 1 + + return num_violations + + def _compute_com_position(self, q: np.ndarray) -> np.ndarray: + """ + Compute center of mass position. + + Args: + q: Joint configuration + + Returns: + com: CoM position (3,) + """ + # Simplified CoM computation + # In practice, would use proper multi-body dynamics + + if self.plant is not None and DRAKE_AVAILABLE: + # Use Drake for accurate CoM + try: + self.plant.SetPositions(self.plant_context, q) + com = self.plant.CalcCenterOfMassPosition(self.plant_context) + return com + except: + pass + + # Fallback: simplified model + return np.array([0.0, 0.0, 0.9]) # Fixed height approximation + + def _compute_trajectory_metrics(self, + trajectory: np.ndarray, + support_polygon: np.ndarray) -> Dict: + """ + Compute trajectory quality metrics. + + Args: + trajectory: Joint trajectory + support_polygon: Support polygon + + Returns: + metrics: Dictionary of metrics + """ + # Velocity profile + velocities = self.trajectory_opt.compute_velocity_profile(trajectory) + max_velocity = np.max(np.abs(velocities)) + avg_velocity = np.mean(np.abs(velocities)) + + # Acceleration profile + accelerations = self.trajectory_opt.compute_acceleration_profile(trajectory) + max_acceleration = np.max(np.abs(accelerations)) + + # Smoothness (jerk) + if len(trajectory) >= 4: + jerk = np.diff(accelerations, axis=0) + smoothness = np.mean(np.abs(jerk)) + else: + smoothness = 0.0 + + # Execution time + execution_time = len(trajectory) * self.time_step + + # Stability margins + stability_margins = [] + for q in trajectory: + com_pos = self._compute_com_position(q) + margin = self.stability_analyzer.compute_static_stability_margin( + com_pos, support_polygon + ) + stability_margins.append(margin) + + min_stability_margin = np.min(stability_margins) + avg_stability_margin = np.mean(stability_margins) + + return { + 'execution_time': execution_time, + 'max_velocity': max_velocity, + 'avg_velocity': avg_velocity, + 'max_acceleration': max_acceleration, + 'smoothness': smoothness, + 'min_stability_margin': min_stability_margin, + 'avg_stability_margin': avg_stability_margin, + 'num_waypoints': len(trajectory) + } + + def execute_trajectory(self, trajectory: np.ndarray): + """ + Execute planned trajectory. + + This is a placeholder for actual robot execution. + In practice, would send commands to robot controller. + + Args: + trajectory: Joint trajectory to execute + """ + print(f"Executing trajectory with {len(trajectory)} waypoints") + print(f"Duration: {len(trajectory) * self.time_step:.2f} seconds") + + # In real system: send to robot controller + # For now, just validate + if self.plant is not None: + print("Trajectory validated with Drake plant") + + +class DrakeTrajectoryOptimizer: + """ + Drake-based trajectory optimization with direct collocation. + + This provides more sophisticated optimization than the basic optimizer. + """ + + def __init__(self, plant: Optional['MultibodyPlant'] = None): + """ + Initialize Drake optimizer. + + Args: + plant: Drake MultibodyPlant + """ + if not DRAKE_AVAILABLE: + raise ImportError("DrakeTrajectoryOptimizer requires pydrake") + + self.plant = plant + + def optimize_with_constraints(self, + start_state: np.ndarray, + goal_state: np.ndarray, + num_timesteps: int = 20, + constraints: Optional[List] = None) -> Optional[np.ndarray]: + """ + Optimize trajectory using Drake's mathematical program. + + Args: + start_state: Starting state + goal_state: Goal state + num_timesteps: Number of timesteps + constraints: List of constraint functions + + Returns: + trajectory: Optimized trajectory or None + """ + prog = MathematicalProgram() + + # Decision variables: joint positions at each timestep + q = [] + for i in range(num_timesteps): + qi = prog.NewContinuousVariables(len(start_state), f"q_{i}") + q.append(qi) + + # Start and goal constraints + prog.AddLinearEqualityConstraint(q[0] == start_state) + prog.AddLinearEqualityConstraint(q[-1] == goal_state) + + # Smoothness cost (minimize acceleration) + for i in range(num_timesteps - 2): + accel = q[i] - 2*q[i+1] + q[i+2] + prog.AddQuadraticCost(accel.dot(accel)) + + # Add custom constraints + if constraints is not None: + for constraint_fn in constraints: + for i in range(num_timesteps): + prog.AddConstraint(constraint_fn, q[i]) + + # Solve + solver = SnoptSolver() if SnoptSolver.is_available() else IpoptSolver() + result = solver.Solve(prog) - def plan_reaching_task(self, target_pose, **kwargs): - """Plan reaching trajectory.""" - raise NotImplementedError("Full implementation requires Drake") + if result.is_success(): + trajectory = np.array([result.GetSolution(qi) for qi in q]) + return trajectory + else: + print(f"Optimization failed: {result.get_solution_result()}") + return None # Placeholder for when Drake is not available if not DRAKE_AVAILABLE: MotionPlanner = None + DrakeTrajectoryOptimizer = None diff --git a/src/humanoid_planner/perception.py b/src/humanoid_planner/perception.py new file mode 100644 index 0000000..5ca3fcd --- /dev/null +++ b/src/humanoid_planner/perception.py @@ -0,0 +1,567 @@ +""" +Perception Module for Humanoid Motion Planning + +Provides interfaces for object detection, environment state estimation, +and workspace analysis for reaching tasks. +""" + +import numpy as np +from typing import List, Tuple, Optional, Dict +from dataclasses import dataclass +from enum import Enum + + +class ObjectType(Enum): + """Types of objects that can be detected.""" + UNKNOWN = 0 + TARGET = 1 + OBSTACLE = 2 + GRIPPER = 3 + SURFACE = 4 + + +@dataclass +class DetectedObject: + """Represents a detected object in the environment.""" + + id: int + object_type: ObjectType + position: np.ndarray # 3D position [x, y, z] + orientation: Optional[np.ndarray] = None # Quaternion [qx, qy, qz, qw] + dimensions: Optional[np.ndarray] = None # [length, width, height] + confidence: float = 1.0 + timestamp: float = 0.0 + + def to_dict(self) -> dict: + """Convert to dictionary.""" + return { + 'id': self.id, + 'type': self.object_type.name, + 'position': self.position.tolist(), + 'orientation': self.orientation.tolist() if self.orientation is not None else None, + 'dimensions': self.dimensions.tolist() if self.dimensions is not None else None, + 'confidence': self.confidence, + 'timestamp': self.timestamp + } + + +class ObjectDetector: + """Interface for object detection in the robot's workspace.""" + + def __init__(self, + detection_range: float = 2.0, + confidence_threshold: float = 0.5): + """ + Initialize object detector. + + Args: + detection_range: Maximum detection distance (meters) + confidence_threshold: Minimum confidence for detection + """ + self.detection_range = detection_range + self.confidence_threshold = confidence_threshold + self.detected_objects = [] + self.next_id = 0 + + def detect_objects(self, + sensor_data: Optional[dict] = None) -> List[DetectedObject]: + """ + Detect objects from sensor data. + + This is a placeholder interface. In practice, this would integrate + with actual perception systems (camera, depth sensors, etc.). + + Args: + sensor_data: Dictionary containing sensor readings + + Returns: + objects: List of detected objects + """ + # Placeholder implementation - returns simulated objects + if sensor_data is None: + return self.detected_objects + + # In real implementation, this would process camera/sensor data + # and use CV/ML models for detection + return self._process_sensor_data(sensor_data) + + def _process_sensor_data(self, sensor_data: dict) -> List[DetectedObject]: + """ + Process raw sensor data to detect objects. + + Args: + sensor_data: Raw sensor data + + Returns: + objects: List of detected objects + """ + objects = [] + + # Example: Process point cloud or RGB-D data + if 'point_cloud' in sensor_data: + objects.extend(self._detect_from_point_cloud(sensor_data['point_cloud'])) + + if 'rgb_image' in sensor_data and 'depth_image' in sensor_data: + objects.extend(self._detect_from_rgbd( + sensor_data['rgb_image'], + sensor_data['depth_image'] + )) + + # Filter by confidence and range + filtered_objects = [ + obj for obj in objects + if obj.confidence >= self.confidence_threshold + and np.linalg.norm(obj.position) <= self.detection_range + ] + + return filtered_objects + + def _detect_from_point_cloud(self, point_cloud: np.ndarray) -> List[DetectedObject]: + """ + Detect objects from point cloud data. + + Args: + point_cloud: Point cloud (N x 3) + + Returns: + objects: Detected objects + """ + # Placeholder - would use clustering/segmentation + objects = [] + + # Simple clustering by spatial proximity + if len(point_cloud) > 0: + # Could use DBSCAN or region growing + pass + + return objects + + def _detect_from_rgbd(self, + rgb_image: np.ndarray, + depth_image: np.ndarray) -> List[DetectedObject]: + """ + Detect objects from RGB-D images. + + Args: + rgb_image: RGB image + depth_image: Depth image + + Returns: + objects: Detected objects + """ + # Placeholder - would use CNN-based detection + objects = [] + + # In practice: use YOLO, Mask R-CNN, or similar + # for object detection and segmentation + + return objects + + def add_object(self, + position: np.ndarray, + object_type: ObjectType = ObjectType.TARGET, + dimensions: Optional[np.ndarray] = None, + confidence: float = 1.0) -> DetectedObject: + """ + Manually add a detected object. + + Args: + position: Object position (3,) + object_type: Type of object + dimensions: Object dimensions [length, width, height] + confidence: Detection confidence + + Returns: + obj: Created object + """ + obj = DetectedObject( + id=self.next_id, + object_type=object_type, + position=position.copy(), + dimensions=dimensions.copy() if dimensions is not None else None, + confidence=confidence + ) + + self.next_id += 1 + self.detected_objects.append(obj) + + return obj + + def get_objects_by_type(self, object_type: ObjectType) -> List[DetectedObject]: + """ + Get all detected objects of a specific type. + + Args: + object_type: Type to filter by + + Returns: + objects: Filtered list of objects + """ + return [obj for obj in self.detected_objects if obj.object_type == object_type] + + def get_nearest_object(self, + reference_point: np.ndarray, + object_type: Optional[ObjectType] = None) -> Optional[DetectedObject]: + """ + Get nearest object to a reference point. + + Args: + reference_point: Reference position (3,) + object_type: Optional type filter + + Returns: + nearest_obj: Nearest object or None + """ + objects = self.detected_objects + if object_type is not None: + objects = self.get_objects_by_type(object_type) + + if len(objects) == 0: + return None + + distances = [np.linalg.norm(obj.position - reference_point) for obj in objects] + min_idx = np.argmin(distances) + + return objects[min_idx] + + +class EnvironmentState: + """Represents the current state of the robot's environment.""" + + def __init__(self): + """Initialize environment state.""" + self.objects: List[DetectedObject] = [] + self.robot_position: np.ndarray = np.zeros(3) + self.robot_orientation: np.ndarray = np.array([0, 0, 0, 1]) # Quaternion + self.workspace_bounds: Tuple[np.ndarray, np.ndarray] = ( + np.array([-1.0, -1.0, 0.0]), + np.array([1.0, 1.0, 2.0]) + ) + self.timestamp: float = 0.0 + + def update(self, + objects: List[DetectedObject], + robot_position: Optional[np.ndarray] = None, + robot_orientation: Optional[np.ndarray] = None, + timestamp: Optional[float] = None): + """ + Update environment state. + + Args: + objects: List of detected objects + robot_position: Robot base position + robot_orientation: Robot base orientation + timestamp: Current timestamp + """ + self.objects = objects + + if robot_position is not None: + self.robot_position = robot_position.copy() + + if robot_orientation is not None: + self.robot_orientation = robot_orientation.copy() + + if timestamp is not None: + self.timestamp = timestamp + + def get_obstacles_in_region(self, + center: np.ndarray, + radius: float) -> List[DetectedObject]: + """ + Get all obstacles within a spherical region. + + Args: + center: Center of region (3,) + radius: Radius of region (meters) + + Returns: + obstacles: Objects within region + """ + obstacles = [] + + for obj in self.objects: + if obj.object_type == ObjectType.OBSTACLE: + dist = np.linalg.norm(obj.position - center) + if dist <= radius: + obstacles.append(obj) + + return obstacles + + def is_position_free(self, + position: np.ndarray, + min_clearance: float = 0.1) -> bool: + """ + Check if a position is free of obstacles. + + Args: + position: Position to check (3,) + min_clearance: Minimum clearance required (meters) + + Returns: + free: True if position is free + """ + for obj in self.objects: + if obj.object_type == ObjectType.OBSTACLE: + dist = np.linalg.norm(obj.position - position) + + # Account for object dimensions + if obj.dimensions is not None: + obj_radius = np.max(obj.dimensions) / 2 + else: + obj_radius = 0.05 # Default radius + + if dist < obj_radius + min_clearance: + return False + + return True + + +class WorkspaceAnalyzer: + """Analyzes robot workspace for reachability and collision.""" + + def __init__(self, + resolution: float = 0.1, + workspace_bounds: Optional[Tuple[np.ndarray, np.ndarray]] = None): + """ + Initialize workspace analyzer. + + Args: + resolution: Grid resolution for discretization (meters) + workspace_bounds: (min_bounds, max_bounds) as 3D vectors + """ + self.resolution = resolution + + if workspace_bounds is None: + self.workspace_bounds = ( + np.array([-1.0, -1.0, 0.0]), + np.array([1.0, 1.0, 2.0]) + ) + else: + self.workspace_bounds = workspace_bounds + + self.reachability_map = None + + def compute_reachability_map(self, + kinematics_solver, + num_samples: int = 1000) -> np.ndarray: + """ + Compute reachability map for robot end-effector. + + Args: + kinematics_solver: Forward kinematics solver + num_samples: Number of random configurations to sample + + Returns: + reachability_map: 3D grid of reachability scores + """ + min_bounds, max_bounds = self.workspace_bounds + grid_size = ((max_bounds - min_bounds) / self.resolution).astype(int) + + reachability_map = np.zeros(grid_size) + + # Sample random joint configurations + for _ in range(num_samples): + # Generate random joint angles (would use actual joint limits) + q = np.random.uniform(-np.pi, np.pi, kinematics_solver.num_joints) + + # Compute forward kinematics + pos, _ = kinematics_solver.compute_fk(q) + + # Map to grid cell + grid_idx = ((pos - min_bounds) / self.resolution).astype(int) + + # Check bounds + if (grid_idx >= 0).all() and (grid_idx < grid_size).all(): + reachability_map[tuple(grid_idx)] += 1 + + # Normalize + if reachability_map.max() > 0: + reachability_map = reachability_map / reachability_map.max() + + self.reachability_map = reachability_map + return reachability_map + + def is_reachable(self, + position: np.ndarray, + threshold: float = 0.1) -> bool: + """ + Check if a position is reachable. + + Args: + position: Target position (3,) + threshold: Minimum reachability score + + Returns: + reachable: True if position is reachable + """ + if self.reachability_map is None: + # No map computed yet + return True + + min_bounds, _ = self.workspace_bounds + grid_idx = ((position - min_bounds) / self.resolution).astype(int) + + # Check bounds + if (grid_idx < 0).any() or (grid_idx >= self.reachability_map.shape).any(): + return False + + return self.reachability_map[tuple(grid_idx)] >= threshold + + def find_nearest_reachable_position(self, + target: np.ndarray, + threshold: float = 0.1) -> Optional[np.ndarray]: + """ + Find nearest reachable position to target. + + Args: + target: Desired position (3,) + threshold: Minimum reachability score + + Returns: + position: Nearest reachable position or None + """ + if self.reachability_map is None: + return target + + min_bounds, max_bounds = self.workspace_bounds + grid_idx = ((target - min_bounds) / self.resolution).astype(int) + + # Search in expanding radius + max_search_radius = 10 # grid cells + + for radius in range(max_search_radius): + # Check all cells at this radius + for dx in range(-radius, radius + 1): + for dy in range(-radius, radius + 1): + for dz in range(-radius, radius + 1): + if abs(dx) == radius or abs(dy) == radius or abs(dz) == radius: + idx = grid_idx + np.array([dx, dy, dz]) + + if (idx >= 0).all() and (idx < self.reachability_map.shape).all(): + if self.reachability_map[tuple(idx)] >= threshold: + # Convert back to world coordinates + pos = min_bounds + idx * self.resolution + return pos + + return None + + +class GoalPredictor: + """Predicts optimal goal positions for reaching tasks.""" + + def __init__(self): + """Initialize goal predictor.""" + self.history = [] + + def predict_grasp_position(self, + object_position: np.ndarray, + object_dimensions: Optional[np.ndarray] = None, + approach_direction: Optional[np.ndarray] = None) -> Tuple[np.ndarray, np.ndarray]: + """ + Predict optimal grasp position for an object. + + Args: + object_position: Object center position (3,) + object_dimensions: Object dimensions [length, width, height] + approach_direction: Preferred approach direction (3,), normalized + + Returns: + grasp_position: Predicted grasp position (3,) + grasp_orientation: Predicted grasp orientation as quaternion (4,) + """ + # Default: approach from above + if approach_direction is None: + approach_direction = np.array([0, 0, -1]) + + approach_direction = approach_direction / np.linalg.norm(approach_direction) + + # Compute grasp position with offset + grasp_offset = 0.1 # 10cm offset for gripper + + if object_dimensions is not None: + # Adjust offset based on object size + grasp_offset = object_dimensions[2] / 2 + 0.05 + + grasp_position = object_position - approach_direction * grasp_offset + + # Compute orientation (simplified - gripper aligned with approach) + # In practice, would consider object geometry and gripper constraints + grasp_orientation = np.array([0, 0, 0, 1]) # Identity quaternion + + return grasp_position, grasp_orientation + + def predict_via_points(self, + start_position: np.ndarray, + goal_position: np.ndarray, + obstacles: List[DetectedObject], + num_via_points: int = 3) -> List[np.ndarray]: + """ + Predict intermediate via points to avoid obstacles. + + Args: + start_position: Starting position (3,) + goal_position: Goal position (3,) + obstacles: List of obstacles + num_via_points: Number of via points to generate + + Returns: + via_points: List of intermediate positions + """ + via_points = [] + + # Linear interpolation as baseline + t_values = np.linspace(0, 1, num_via_points + 2)[1:-1] + + for t in t_values: + point = start_position + t * (goal_position - start_position) + + # Check for obstacles and adjust + for obstacle in obstacles: + dist = np.linalg.norm(point - obstacle.position) + min_clearance = 0.2 # 20cm clearance + + if dist < min_clearance: + # Push point away from obstacle + direction = (point - obstacle.position) / dist + point = obstacle.position + direction * min_clearance + + via_points.append(point) + + return via_points + + def evaluate_goal_quality(self, + goal_position: np.ndarray, + environment: EnvironmentState, + robot_position: np.ndarray) -> float: + """ + Evaluate quality of a goal position. + + Args: + goal_position: Candidate goal position (3,) + environment: Current environment state + robot_position: Current robot position (3,) + + Returns: + quality: Quality score (0 to 1, higher is better) + """ + quality = 1.0 + + # Penalize distance + distance = np.linalg.norm(goal_position - robot_position) + quality *= np.exp(-distance / 2.0) # Exponential decay + + # Penalize proximity to obstacles + for obj in environment.objects: + if obj.object_type == ObjectType.OBSTACLE: + dist = np.linalg.norm(goal_position - obj.position) + min_clearance = 0.3 + + if dist < min_clearance: + quality *= (dist / min_clearance) ** 2 + + # Check workspace bounds + min_bounds, max_bounds = environment.workspace_bounds + if not ((goal_position >= min_bounds).all() and (goal_position <= max_bounds).all()): + quality *= 0.1 # Heavily penalize out-of-bounds + + return quality diff --git a/src/humanoid_planner/stability_analysis.py b/src/humanoid_planner/stability_analysis.py new file mode 100644 index 0000000..c53051a --- /dev/null +++ b/src/humanoid_planner/stability_analysis.py @@ -0,0 +1,365 @@ +""" +Stability Analysis Module + +Provides comprehensive stability analysis for humanoid robots including: +- Center of Mass (CoM) computation +- ZMP calculation and verification +- Static stability margins +- Dynamic stability criteria +""" + +import numpy as np +from typing import Tuple, Optional, List +from .zmp_constraint import ZMPConstraint +from .support_polygon import SupportPolygonCalculator + + +class StabilityAnalyzer: + """Analyzes robot stability using ZMP and CoM metrics.""" + + def __init__(self, + gravity: float = 9.81, + zmp_margin: float = 0.02): + """ + Initialize stability analyzer. + + Args: + gravity: Gravitational acceleration (m/s^2) + zmp_margin: Safety margin from support polygon edge (meters) + """ + self.gravity = gravity + self.zmp_constraint = ZMPConstraint(margin=zmp_margin, gravity=gravity) + + def compute_center_of_mass(self, + link_positions: np.ndarray, + link_masses: np.ndarray) -> np.ndarray: + """ + Compute robot's center of mass. + + Args: + link_positions: Positions of robot links (N x 3) + link_masses: Mass of each link (N,) + + Returns: + com: Center of mass position (3,) [x, y, z] + """ + total_mass = np.sum(link_masses) + if total_mass < 1e-6: + return np.zeros(3) + + com = np.sum(link_positions * link_masses[:, np.newaxis], axis=0) / total_mass + return com + + def compute_com_jacobian(self, + link_jacobians: List[np.ndarray], + link_masses: np.ndarray) -> np.ndarray: + """ + Compute Jacobian of center of mass. + + Args: + link_jacobians: List of link Jacobians (each 6 x num_joints) + link_masses: Mass of each link (N,) + + Returns: + J_com: CoM Jacobian (3 x num_joints) + """ + total_mass = np.sum(link_masses) + if total_mass < 1e-6: + return np.zeros((3, link_jacobians[0].shape[1])) + + # Linear velocity part of Jacobian + J_com = np.zeros((3, link_jacobians[0].shape[1])) + + for i, (J_link, mass) in enumerate(zip(link_jacobians, link_masses)): + # Add weighted contribution of each link + J_com += mass * J_link[:3, :] / total_mass + + return J_com + + def compute_static_stability_margin(self, + com_pos: np.ndarray, + support_polygon: np.ndarray) -> float: + """ + Compute static stability margin. + + The margin is the minimum distance from CoM projection to support polygon edges. + + Args: + com_pos: Center of mass position (3,) + support_polygon: Support polygon vertices (N x 2) + + Returns: + margin: Stability margin (positive = stable) + """ + com_2d = com_pos[:2] + return self.zmp_constraint.compute_stability_margin(com_2d, support_polygon) + + def compute_dynamic_stability(self, + com_pos: np.ndarray, + com_vel: np.ndarray, + com_acc: np.ndarray, + support_polygon: np.ndarray) -> Tuple[bool, float]: + """ + Compute dynamic stability using ZMP criterion. + + Args: + com_pos: CoM position (3,) + com_vel: CoM velocity (3,) + com_acc: CoM acceleration (3,) + support_polygon: Support polygon vertices (N x 2) + + Returns: + stable: True if dynamically stable + margin: Stability margin (distance to polygon edge) + """ + # Compute ZMP from CoM dynamics + height = com_pos[2] + zmp = self.zmp_constraint.compute_zmp_from_com(com_pos, com_acc, height) + + # Check if ZMP is within support polygon + stable = self.zmp_constraint.is_stable(zmp, support_polygon) + margin = self.zmp_constraint.compute_stability_margin(zmp, support_polygon) + + return stable, margin + + def compute_capture_point(self, + com_pos: np.ndarray, + com_vel: np.ndarray) -> np.ndarray: + """ + Compute capture point (also called extrapolated CoM). + + The capture point is where the robot would need to step to come to rest. + + Args: + com_pos: CoM position (3,) + com_vel: CoM velocity (3,) + + Returns: + capture_point: Capture point position (2,) [x, y] + """ + height = com_pos[2] + omega = np.sqrt(self.gravity / height) # Natural frequency + + # Capture point formula: CP = CoM + CoM_vel / omega + cp_x = com_pos[0] + com_vel[0] / omega + cp_y = com_pos[1] + com_vel[1] / omega + + return np.array([cp_x, cp_y]) + + def check_trajectory_stability(self, + com_trajectory: np.ndarray, + com_velocities: np.ndarray, + com_accelerations: np.ndarray, + support_polygons: List[np.ndarray]) -> Tuple[bool, List[float]]: + """ + Check stability along entire trajectory. + + Args: + com_trajectory: CoM positions over time (T x 3) + com_velocities: CoM velocities over time (T x 3) + com_accelerations: CoM accelerations over time (T x 3) + support_polygons: Support polygons at each timestep (list of N x 2 arrays) + + Returns: + all_stable: True if stable throughout trajectory + margins: Stability margins at each timestep + """ + margins = [] + all_stable = True + + for i in range(len(com_trajectory)): + com_pos = com_trajectory[i] + com_vel = com_velocities[i] if i < len(com_velocities) else np.zeros(3) + com_acc = com_accelerations[i] if i < len(com_accelerations) else np.zeros(3) + support_poly = support_polygons[min(i, len(support_polygons) - 1)] + + stable, margin = self.compute_dynamic_stability( + com_pos, com_vel, com_acc, support_poly + ) + + margins.append(margin) + if not stable: + all_stable = False + + return all_stable, margins + + def compute_tipping_axis(self, + com_pos: np.ndarray, + support_polygon: np.ndarray) -> Tuple[Optional[np.ndarray], Optional[np.ndarray]]: + """ + Compute the support polygon edge closest to tipping. + + Args: + com_pos: CoM position (3,) + support_polygon: Support polygon vertices (N x 2) + + Returns: + edge_start: Start of critical edge (2,) or None + edge_end: End of critical edge (2,) or None + """ + com_2d = com_pos[:2] + + # Find edge with minimum distance to CoM projection + min_dist = float('inf') + critical_edge = None + + for i in range(len(support_polygon)): + p1 = support_polygon[i] + p2 = support_polygon[(i + 1) % len(support_polygon)] + + dist = self.zmp_constraint._point_to_line_distance(com_2d, p1, p2) + + if dist < min_dist: + min_dist = dist + critical_edge = (p1, p2) + + if critical_edge is None: + return None, None + + return critical_edge[0], critical_edge[1] + + def compute_stability_metrics(self, + com_pos: np.ndarray, + com_vel: np.ndarray, + com_acc: np.ndarray, + support_polygon: np.ndarray) -> dict: + """ + Compute comprehensive stability metrics. + + Args: + com_pos: CoM position (3,) + com_vel: CoM velocity (3,) + com_acc: CoM acceleration (3,) + support_polygon: Support polygon vertices (N x 2) + + Returns: + metrics: Dictionary of stability metrics + """ + # Static stability + static_margin = self.compute_static_stability_margin(com_pos, support_polygon) + + # Dynamic stability + dynamic_stable, dynamic_margin = self.compute_dynamic_stability( + com_pos, com_vel, com_acc, support_polygon + ) + + # Capture point + capture_point = self.compute_capture_point(com_pos, com_vel) + + # ZMP + height = com_pos[2] + zmp = self.zmp_constraint.compute_zmp_from_com(com_pos, com_acc, height) + + # Tipping axis + edge_start, edge_end = self.compute_tipping_axis(com_pos, support_polygon) + + return { + 'static_margin': static_margin, + 'dynamic_margin': dynamic_margin, + 'is_stable': dynamic_stable, + 'zmp': zmp, + 'capture_point': capture_point, + 'com_height': com_pos[2], + 'com_2d': com_pos[:2], + 'tipping_edge_start': edge_start, + 'tipping_edge_end': edge_end + } + + +class CenterOfMassTracker: + """Tracks center of mass state over time.""" + + def __init__(self, dt: float = 0.01): + """ + Initialize CoM tracker. + + Args: + dt: Time step (seconds) + """ + self.dt = dt + self.history = [] + + def update(self, com_pos: np.ndarray, timestamp: Optional[float] = None): + """ + Update CoM state. + + Args: + com_pos: Current CoM position (3,) + timestamp: Optional timestamp + """ + if timestamp is None: + if len(self.history) > 0: + timestamp = self.history[-1]['timestamp'] + self.dt + else: + timestamp = 0.0 + + self.history.append({ + 'timestamp': timestamp, + 'position': com_pos.copy(), + }) + + def get_velocity(self, window_size: int = 5) -> np.ndarray: + """ + Estimate CoM velocity using finite differences. + + Args: + window_size: Number of samples for smoothing + + Returns: + velocity: Estimated velocity (3,) + """ + if len(self.history) < 2: + return np.zeros(3) + + # Use last few samples for smoother estimate + n = min(window_size, len(self.history)) + positions = np.array([self.history[-i]['position'] for i in range(n, 0, -1)]) + times = np.array([self.history[-i]['timestamp'] for i in range(n, 0, -1)]) + + # Linear regression for velocity + dt = times[-1] - times[0] + if dt < 1e-6: + return np.zeros(3) + + velocity = (positions[-1] - positions[0]) / dt + return velocity + + def get_acceleration(self, window_size: int = 5) -> np.ndarray: + """ + Estimate CoM acceleration using finite differences. + + Args: + window_size: Number of samples for smoothing + + Returns: + acceleration: Estimated acceleration (3,) + """ + if len(self.history) < 3: + return np.zeros(3) + + # Use last few samples + n = min(window_size, len(self.history)) + positions = np.array([self.history[-i]['position'] for i in range(n, 0, -1)]) + + # Second-order finite difference + if n >= 3: + acc = (positions[-1] - 2*positions[-2] + positions[-3]) / (self.dt ** 2) + return acc + + return np.zeros(3) + + def clear(self): + """Clear history.""" + self.history = [] + + def get_trajectory(self) -> np.ndarray: + """ + Get full CoM trajectory. + + Returns: + trajectory: Array of positions (T x 3) + """ + if len(self.history) == 0: + return np.array([]) + + return np.array([h['position'] for h in self.history]) diff --git a/tests/test_integration.py b/tests/test_integration.py new file mode 100644 index 0000000..3178303 --- /dev/null +++ b/tests/test_integration.py @@ -0,0 +1,393 @@ +""" +Integration tests for end-to-end motion planning scenarios. +""" + +import pytest +import numpy as np +from humanoid_planner import ( + ZMPConstraint, SupportPolygonCalculator, TrajectoryOptimizer, + ForwardKinematics, InverseKinematics, StabilityAnalyzer, + ObjectDetector, EnvironmentState, GoalPredictor, ObjectType +) + + +class TestEndToEndReachingTask: + """Test complete reaching task pipeline.""" + + def test_simple_reaching_task(self): + """Test basic reaching task without obstacles.""" + # Setup environment + detector = ObjectDetector() + target_obj = detector.add_object( + position=np.array([0.5, 0.3, 1.2]), + object_type=ObjectType.TARGET + ) + + env = EnvironmentState() + env.update(detector.detected_objects) + + # Setup robot + num_joints = 7 # Arm only + optimizer = TrajectoryOptimizer(num_joints=num_joints, dt=0.1) + + # Plan trajectory + q_start = np.zeros(num_joints) + q_goal = np.ones(num_joints) * 0.3 # Simplified goal + + trajectory = optimizer.optimize_trajectory( + q_start=q_start, + q_goal=q_goal, + num_waypoints=10, + velocity_limit=1.0 + ) + + # Verify trajectory + assert trajectory is not None + assert len(trajectory) == 10 + assert np.allclose(trajectory[0], q_start, atol=1e-3) + assert np.allclose(trajectory[-1], q_goal, atol=1e-3) + + def test_reaching_with_stability_verification(self): + """Test reaching with ZMP stability verification.""" + # Setup stability components + zmp_constraint = ZMPConstraint(margin=0.02) + support_calc = SupportPolygonCalculator() + stability_analyzer = StabilityAnalyzer() + + # Define foot poses + left_foot = np.array([0.0, 0.1, 0.0, 0.0, 0.0, 0.0]) + right_foot = np.array([0.0, -0.1, 0.0, 0.0, 0.0, 0.0]) + + # Compute support polygon + support_polygon = support_calc.compute_double_support_polygon( + left_foot, right_foot + ) + + # Simulate CoM trajectory + com_trajectory = np.array([ + [0.0, 0.0, 0.9], + [0.02, 0.0, 0.9], + [0.04, 0.0, 0.9], + [0.06, 0.0, 0.9], + [0.08, 0.0, 0.9] + ]) + + com_velocities = np.diff(com_trajectory, axis=0) / 0.1 + com_accelerations = np.diff(com_velocities, axis=0) / 0.1 + + # Pad to match trajectory length + com_velocities = np.vstack([com_velocities, com_velocities[-1:]]) + com_accelerations = np.vstack([com_accelerations, com_accelerations[-1:], com_accelerations[-1:]]) + + support_polygons = [support_polygon] * len(com_trajectory) + + # Check stability + all_stable, margins = stability_analyzer.check_trajectory_stability( + com_trajectory, com_velocities, com_accelerations, support_polygons + ) + + # With small CoM movements, should be stable + assert all_stable or len([m for m in margins if m > 0]) >= len(margins) * 0.8 + + def test_ik_to_trajectory_pipeline(self): + """Test IK solving followed by trajectory optimization.""" + # Setup kinematics + dh_params = np.array([ + [0.3, 0, 0, 0], + [0.3, 0, 0, 0], + [0.2, 0, 0, 0] + ]) + + fk = ForwardKinematics(dh_params) + ik = InverseKinematics(fk) + + # Target position + target_pos = np.array([0.6, 0.1, 0.0]) + + # Solve IK + q_init = np.zeros(3) + q_goal = ik.solve_ik(target_pos, q_init=q_init, max_iter=50) + + if q_goal is not None: + # Verify IK solution + achieved_pos, _ = fk.compute_fk(q_goal) + error = np.linalg.norm(achieved_pos - target_pos) + + # Should be close to target + assert error < 0.05 # 5cm tolerance + + # Plan trajectory + optimizer = TrajectoryOptimizer(num_joints=3, dt=0.1) + trajectory = optimizer.optimize_trajectory( + q_start=q_init, + q_goal=q_goal, + num_waypoints=15 + ) + + # Verify trajectory feasibility + assert trajectory is not None + assert len(trajectory) == 15 + + def test_goal_prediction_and_planning(self): + """Test goal prediction integrated with planning.""" + # Setup perception + detector = ObjectDetector() + target = detector.add_object( + position=np.array([0.5, 0.0, 1.0]), + object_type=ObjectType.TARGET, + dimensions=np.array([0.1, 0.1, 0.2]) + ) + + # Add obstacle + obstacle = detector.add_object( + position=np.array([0.3, 0.0, 1.0]), + object_type=ObjectType.OBSTACLE, + dimensions=np.array([0.15, 0.15, 0.15]) + ) + + env = EnvironmentState() + env.update(detector.detected_objects) + + # Predict grasp position + predictor = GoalPredictor() + grasp_pos, grasp_orient = predictor.predict_grasp_position( + target.position, target.dimensions + ) + + # Verify grasp position is reasonable + assert grasp_pos.shape == (3,) + dist_to_target = np.linalg.norm(grasp_pos - target.position) + assert dist_to_target < 0.3 # Should be close to object + + # Generate via points avoiding obstacle + start_pos = np.array([0.0, 0.0, 0.5]) + via_points = predictor.predict_via_points( + start_pos, grasp_pos, [obstacle], num_via_points=2 + ) + + # Verify via points + assert len(via_points) == 2 + + # Check that via points have clearance from obstacle + for point in via_points: + dist_to_obs = np.linalg.norm(point - obstacle.position) + # Should maintain some clearance + assert dist_to_obs > 0.1 + + +class TestMultiModuleIntegration: + """Test integration between multiple modules.""" + + def test_zmp_and_support_polygon_integration(self): + """Test ZMP constraint with support polygon computation.""" + # Create support polygon + support_calc = SupportPolygonCalculator(foot_size=(0.25, 0.12)) + left_foot = np.array([0.0, 0.1, 0.0, 0.0, 0.0, 0.0]) + right_foot = np.array([0.0, -0.1, 0.0, 0.0, 0.0, 0.0]) + + polygon = support_calc.compute_double_support_polygon(left_foot, right_foot) + + # Test ZMP stability + zmp_constraint = ZMPConstraint(margin=0.02) + + # Test various ZMP positions + test_positions = [ + np.array([0.0, 0.0]), # Center - should be stable + np.array([0.5, 0.5]), # Far outside - should be unstable + ] + + for zmp in test_positions: + stable = zmp_constraint.is_stable(zmp, polygon) + margin = zmp_constraint.compute_stability_margin(zmp, polygon) + + # Center should be stable with positive margin + if np.allclose(zmp, [0.0, 0.0]): + assert stable + assert margin > 0 + + def test_kinematics_and_trajectory_optimization(self): + """Test FK/IK with trajectory optimization.""" + # Setup kinematics + dh_params = np.array([ + [0.3, 0, 0, 0], + [0.3, 0, 0, 0], + [0.2, 0, 0, 0], + [0.15, 0, 0, 0] + ]) + + fk = ForwardKinematics(dh_params) + ik = InverseKinematics(fk) + + # Generate start and goal configurations + q_start = np.array([0.0, 0.0, 0.0, 0.0]) + + # Compute target from a known configuration + q_target_config = np.array([0.5, 0.3, -0.2, 0.1]) + target_pos, _ = fk.compute_fk(q_target_config) + + # Solve IK + q_goal = ik.solve_ik(target_pos, q_init=q_start, max_iter=100) + + if q_goal is not None: + # Optimize trajectory + optimizer = TrajectoryOptimizer(num_joints=4, dt=0.1) + + trajectory = optimizer.optimize_trajectory( + q_start=q_start, + q_goal=q_goal, + num_waypoints=20, + velocity_limit=2.0 + ) + + # Verify trajectory + assert trajectory is not None + assert len(trajectory) == 20 + + # Check velocity constraints + velocities = optimizer.compute_velocity_profile(trajectory) + max_vel = np.max(np.abs(velocities)) + assert max_vel <= 2.0 + 0.1 # Small tolerance + + # Check smoothness + if len(trajectory) >= 3: + accelerations = optimizer.compute_acceleration_profile(trajectory) + assert accelerations is not None + + def test_perception_to_planning_pipeline(self): + """Test complete perception to planning pipeline.""" + # 1. Perception: Detect objects + detector = ObjectDetector(detection_range=2.0) + + target = detector.add_object( + np.array([0.6, 0.2, 1.0]), + ObjectType.TARGET, + dimensions=np.array([0.08, 0.08, 0.15]), + confidence=0.95 + ) + + obstacle1 = detector.add_object( + np.array([0.4, 0.1, 0.8]), + ObjectType.OBSTACLE, + dimensions=np.array([0.2, 0.2, 0.3]), + confidence=0.88 + ) + + # 2. Environment state + env = EnvironmentState() + env.update(detector.detected_objects, robot_position=np.array([0.0, 0.0, 0.0])) + + # 3. Goal prediction + predictor = GoalPredictor() + + # Predict grasp + grasp_pos, grasp_orient = predictor.predict_grasp_position( + target.position, target.dimensions + ) + + # Evaluate goal quality + quality = predictor.evaluate_goal_quality( + grasp_pos, env, np.array([0.0, 0.0, 0.5]) + ) + + assert 0.0 <= quality <= 1.0 + + # 4. Plan trajectory (simplified) + optimizer = TrajectoryOptimizer(num_joints=7, dt=0.1) + + # Simplified: just plan in joint space + q_start = np.zeros(7) + q_goal = np.ones(7) * 0.3 # Approximate goal + + trajectory = optimizer.optimize_trajectory( + q_start, q_goal, num_waypoints=15 + ) + + assert trajectory is not None + assert len(trajectory) == 15 + + # 5. Verify stability (simplified) + support_calc = SupportPolygonCalculator() + left_foot = np.array([0.0, 0.1, 0.0, 0.0, 0.0, 0.0]) + right_foot = np.array([0.0, -0.1, 0.0, 0.0, 0.0, 0.0]) + + support_polygon = support_calc.compute_double_support_polygon( + left_foot, right_foot + ) + + # Check that support polygon is reasonable + area = support_calc.compute_polygon_area(support_polygon) + assert area > 0.01 # Should have some area + + +class TestPerformanceBenchmarks: + """Performance benchmarks for key operations.""" + + def test_trajectory_optimization_speed(self): + """Benchmark trajectory optimization speed.""" + import time + + optimizer = TrajectoryOptimizer(num_joints=12, dt=0.1) + + q_start = np.zeros(12) + q_goal = np.ones(12) * 0.5 + + start_time = time.time() + + trajectory = optimizer.optimize_trajectory( + q_start, q_goal, + num_waypoints=20, + velocity_limit=1.0 + ) + + elapsed = time.time() - start_time + + # Should complete in reasonable time (< 5 seconds) + assert elapsed < 5.0 + assert trajectory is not None + + def test_support_polygon_computation_speed(self): + """Benchmark support polygon computation.""" + import time + + calc = SupportPolygonCalculator() + left_foot = np.array([0.0, 0.1, 0.0, 0.0, 0.0, 0.0]) + right_foot = np.array([0.0, -0.1, 0.0, 0.0, 0.0, 0.0]) + + start_time = time.time() + + for _ in range(100): + polygon = calc.compute_double_support_polygon(left_foot, right_foot) + + elapsed = time.time() - start_time + + # Should be fast (< 1 second for 100 iterations) + assert elapsed < 1.0 + + def test_ik_convergence_speed(self): + """Benchmark IK solver convergence.""" + import time + + dh_params = np.array([ + [0.3, 0, 0, 0], + [0.3, 0, 0, 0], + [0.2, 0, 0, 0] + ]) + + fk = ForwardKinematics(dh_params) + ik = InverseKinematics(fk) + + target_pos = np.array([0.5, 0.2, 0.0]) + q_init = np.zeros(3) + + start_time = time.time() + + q_solution = ik.solve_ik(target_pos, q_init=q_init, max_iter=100) + + elapsed = time.time() - start_time + + # Should converge quickly (< 0.5 seconds) + assert elapsed < 0.5 + + +if __name__ == "__main__": + pytest.main([__file__, "-v"]) diff --git a/tests/test_perception.py b/tests/test_perception.py new file mode 100644 index 0000000..8973bc6 --- /dev/null +++ b/tests/test_perception.py @@ -0,0 +1,299 @@ +""" +Unit tests for perception module. +""" + +import pytest +import numpy as np +from humanoid_planner import ( + ObjectDetector, EnvironmentState, WorkspaceAnalyzer, + GoalPredictor, DetectedObject, ObjectType, ForwardKinematics +) + + +class TestObjectDetector: + """Test object detection functionality.""" + + def test_initialization(self): + """Test detector initialization.""" + detector = ObjectDetector(detection_range=3.0, confidence_threshold=0.7) + + assert detector.detection_range == 3.0 + assert detector.confidence_threshold == 0.7 + assert len(detector.detected_objects) == 0 + + def test_add_object(self): + """Test manually adding objects.""" + detector = ObjectDetector() + + position = np.array([0.5, 0.3, 1.0]) + obj = detector.add_object( + position=position, + object_type=ObjectType.TARGET, + confidence=0.95 + ) + + assert obj.id == 0 + assert obj.object_type == ObjectType.TARGET + assert np.allclose(obj.position, position) + assert obj.confidence == 0.95 + assert len(detector.detected_objects) == 1 + + def test_get_objects_by_type(self): + """Test filtering objects by type.""" + detector = ObjectDetector() + + detector.add_object(np.array([1.0, 0.0, 0.0]), ObjectType.TARGET) + detector.add_object(np.array([2.0, 0.0, 0.0]), ObjectType.OBSTACLE) + detector.add_object(np.array([3.0, 0.0, 0.0]), ObjectType.TARGET) + + targets = detector.get_objects_by_type(ObjectType.TARGET) + obstacles = detector.get_objects_by_type(ObjectType.OBSTACLE) + + assert len(targets) == 2 + assert len(obstacles) == 1 + + def test_get_nearest_object(self): + """Test finding nearest object.""" + detector = ObjectDetector() + + detector.add_object(np.array([1.0, 0.0, 0.0]), ObjectType.TARGET) + detector.add_object(np.array([2.0, 0.0, 0.0]), ObjectType.TARGET) + detector.add_object(np.array([0.5, 0.0, 0.0]), ObjectType.TARGET) + + reference = np.array([0.0, 0.0, 0.0]) + nearest = detector.get_nearest_object(reference, ObjectType.TARGET) + + assert nearest is not None + assert np.allclose(nearest.position, [0.5, 0.0, 0.0]) + + def test_detected_object_to_dict(self): + """Test object serialization.""" + obj = DetectedObject( + id=1, + object_type=ObjectType.TARGET, + position=np.array([0.5, 0.3, 1.0]), + confidence=0.9 + ) + + obj_dict = obj.to_dict() + + assert obj_dict['id'] == 1 + assert obj_dict['type'] == 'TARGET' + assert obj_dict['confidence'] == 0.9 + + +class TestEnvironmentState: + """Test environment state management.""" + + def test_initialization(self): + """Test environment initialization.""" + env = EnvironmentState() + + assert len(env.objects) == 0 + assert np.allclose(env.robot_position, [0.0, 0.0, 0.0]) + + def test_update(self): + """Test state update.""" + env = EnvironmentState() + + obj = DetectedObject( + id=0, + object_type=ObjectType.TARGET, + position=np.array([1.0, 0.0, 0.5]) + ) + + robot_pos = np.array([0.0, 0.0, 0.0]) + + env.update([obj], robot_position=robot_pos, timestamp=1.0) + + assert len(env.objects) == 1 + assert np.allclose(env.robot_position, robot_pos) + assert env.timestamp == 1.0 + + def test_get_obstacles_in_region(self): + """Test getting obstacles in region.""" + env = EnvironmentState() + + # Add obstacles at different distances + env.objects = [ + DetectedObject(0, ObjectType.OBSTACLE, np.array([0.5, 0.0, 0.0])), + DetectedObject(1, ObjectType.OBSTACLE, np.array([2.0, 0.0, 0.0])), + DetectedObject(2, ObjectType.TARGET, np.array([0.3, 0.0, 0.0])) # Not obstacle + ] + + center = np.array([0.0, 0.0, 0.0]) + obstacles = env.get_obstacles_in_region(center, radius=1.0) + + # Should only get the first obstacle (within 1m) + assert len(obstacles) == 1 + assert obstacles[0].id == 0 + + def test_is_position_free(self): + """Test collision-free position checking.""" + env = EnvironmentState() + + # Add obstacle + env.objects = [ + DetectedObject( + 0, ObjectType.OBSTACLE, + np.array([1.0, 0.0, 0.0]), + dimensions=np.array([0.1, 0.1, 0.1]) + ) + ] + + # Position far from obstacle - should be free + assert env.is_position_free(np.array([5.0, 0.0, 0.0]), min_clearance=0.1) + + # Position close to obstacle - should not be free + assert not env.is_position_free(np.array([1.0, 0.0, 0.0]), min_clearance=0.1) + + +class TestWorkspaceAnalyzer: + """Test workspace analysis.""" + + def test_initialization(self): + """Test analyzer initialization.""" + analyzer = WorkspaceAnalyzer(resolution=0.1) + + assert analyzer.resolution == 0.1 + assert analyzer.reachability_map is None + + def test_is_reachable_without_map(self): + """Test reachability check without map.""" + analyzer = WorkspaceAnalyzer() + + # Without computed map, should return True + position = np.array([0.5, 0.0, 1.0]) + assert analyzer.is_reachable(position) + + def test_compute_reachability_map(self): + """Test reachability map computation.""" + # Create simple FK solver + dh_params = np.array([ + [0.3, 0, 0, 0], + [0.3, 0, 0, 0], + [0.2, 0, 0, 0] + ]) + fk = ForwardKinematics(dh_params) + + analyzer = WorkspaceAnalyzer( + resolution=0.2, + workspace_bounds=( + np.array([0.0, -0.5, 0.0]), + np.array([1.0, 0.5, 1.0]) + ) + ) + + # Compute with small number of samples for speed + reachability_map = analyzer.compute_reachability_map(fk, num_samples=100) + + assert reachability_map is not None + assert reachability_map.shape == (5, 5, 5) # (1.0/0.2, 1.0/0.2, 1.0/0.2) + assert analyzer.reachability_map is not None + + +class TestGoalPredictor: + """Test goal prediction.""" + + def test_initialization(self): + """Test predictor initialization.""" + predictor = GoalPredictor() + + assert len(predictor.history) == 0 + + def test_predict_grasp_position(self): + """Test grasp position prediction.""" + predictor = GoalPredictor() + + object_pos = np.array([0.5, 0.0, 1.0]) + object_dims = np.array([0.1, 0.1, 0.2]) + + grasp_pos, grasp_orient = predictor.predict_grasp_position( + object_pos, object_dims + ) + + assert grasp_pos.shape == (3,) + assert grasp_orient.shape == (4,) + + # Grasp position should be offset from object + assert not np.allclose(grasp_pos, object_pos) + + def test_predict_via_points(self): + """Test via point generation.""" + predictor = GoalPredictor() + + start = np.array([0.0, 0.0, 1.0]) + goal = np.array([1.0, 0.0, 1.0]) + + # No obstacles + via_points = predictor.predict_via_points(start, goal, [], num_via_points=3) + + assert len(via_points) == 3 + + # Via points should be between start and goal + for point in via_points: + assert 0.0 < point[0] < 1.0 + + def test_predict_via_points_with_obstacles(self): + """Test via point generation avoiding obstacles.""" + predictor = GoalPredictor() + + start = np.array([0.0, 0.0, 1.0]) + goal = np.array([1.0, 0.0, 1.0]) + + # Add obstacle in the path + obstacle = DetectedObject( + 0, ObjectType.OBSTACLE, + np.array([0.5, 0.0, 1.0]), + dimensions=np.array([0.1, 0.1, 0.1]) + ) + + via_points = predictor.predict_via_points( + start, goal, [obstacle], num_via_points=3 + ) + + assert len(via_points) == 3 + + # Via points should be adjusted away from obstacle + for point in via_points: + dist = np.linalg.norm(point - obstacle.position) + # Should maintain some clearance + assert dist > 0.05 or not (0.4 < point[0] < 0.6) + + def test_evaluate_goal_quality(self): + """Test goal quality evaluation.""" + predictor = GoalPredictor() + env = EnvironmentState() + + robot_pos = np.array([0.0, 0.0, 0.0]) + goal_pos = np.array([0.5, 0.0, 1.0]) + + quality = predictor.evaluate_goal_quality(goal_pos, env, robot_pos) + + assert 0.0 <= quality <= 1.0 + + def test_goal_quality_with_obstacles(self): + """Test that obstacles reduce goal quality.""" + predictor = GoalPredictor() + env = EnvironmentState() + + robot_pos = np.array([0.0, 0.0, 0.0]) + goal_pos = np.array([0.5, 0.0, 1.0]) + + # Quality without obstacles + quality_no_obs = predictor.evaluate_goal_quality(goal_pos, env, robot_pos) + + # Add obstacle near goal + env.objects = [ + DetectedObject(0, ObjectType.OBSTACLE, np.array([0.5, 0.1, 1.0])) + ] + + # Quality with obstacle + quality_with_obs = predictor.evaluate_goal_quality(goal_pos, env, robot_pos) + + # Quality should be lower with obstacle nearby + assert quality_with_obs < quality_no_obs + + +if __name__ == "__main__": + pytest.main([__file__, "-v"]) diff --git a/tests/test_stability_analysis.py b/tests/test_stability_analysis.py new file mode 100644 index 0000000..b762b2c --- /dev/null +++ b/tests/test_stability_analysis.py @@ -0,0 +1,251 @@ +""" +Unit tests for stability analysis module. +""" + +import pytest +import numpy as np +from humanoid_planner import StabilityAnalyzer, CenterOfMassTracker + + +class TestStabilityAnalyzer: + """Test stability analysis functionality.""" + + def test_compute_center_of_mass(self): + """Test CoM computation from link positions and masses.""" + analyzer = StabilityAnalyzer() + + # Three links in a line + link_positions = np.array([ + [0.0, 0.0, 0.0], + [1.0, 0.0, 0.0], + [2.0, 0.0, 0.0] + ]) + link_masses = np.array([10.0, 10.0, 10.0]) + + com = analyzer.compute_center_of_mass(link_positions, link_masses) + + # CoM should be at center + assert np.allclose(com, [1.0, 0.0, 0.0], atol=1e-6) + + def test_compute_center_of_mass_unequal_masses(self): + """Test CoM with unequal masses.""" + analyzer = StabilityAnalyzer() + + link_positions = np.array([ + [0.0, 0.0, 0.0], + [1.0, 0.0, 0.0] + ]) + link_masses = np.array([30.0, 10.0]) + + com = analyzer.compute_center_of_mass(link_positions, link_masses) + + # Weighted average: (30*0 + 10*1) / 40 = 0.25 + assert np.allclose(com, [0.25, 0.0, 0.0], atol=1e-6) + + def test_static_stability_margin(self): + """Test static stability margin calculation.""" + analyzer = StabilityAnalyzer() + + com_pos = np.array([0.0, 0.0, 1.0]) + support_polygon = np.array([ + [0.2, 0.2], + [0.2, -0.2], + [-0.2, -0.2], + [-0.2, 0.2] + ]) + + margin = analyzer.compute_static_stability_margin(com_pos, support_polygon) + + # CoM at center, margin should be ~0.2m (distance to edge) + assert margin > 0.0 + assert margin < 0.3 + + def test_dynamic_stability(self): + """Test dynamic stability calculation.""" + analyzer = StabilityAnalyzer() + + com_pos = np.array([0.0, 0.0, 1.0]) + com_vel = np.array([0.1, 0.0, 0.0]) + com_acc = np.array([0.0, 0.0, 0.0]) + + support_polygon = np.array([ + [0.2, 0.2], + [0.2, -0.2], + [-0.2, -0.2], + [-0.2, 0.2] + ]) + + stable, margin = analyzer.compute_dynamic_stability( + com_pos, com_vel, com_acc, support_polygon + ) + + assert isinstance(stable, bool) + assert isinstance(margin, float) + + def test_capture_point(self): + """Test capture point calculation.""" + analyzer = StabilityAnalyzer() + + com_pos = np.array([0.0, 0.0, 1.0]) + com_vel = np.array([0.5, 0.0, 0.0]) # Moving forward + + cp = analyzer.compute_capture_point(com_pos, com_vel) + + # Capture point should be ahead of CoM + assert cp[0] > com_pos[0] + assert cp.shape == (2,) + + def test_trajectory_stability(self): + """Test stability checking along trajectory.""" + analyzer = StabilityAnalyzer() + + # Simple trajectory + com_trajectory = np.array([ + [0.0, 0.0, 1.0], + [0.05, 0.0, 1.0], + [0.1, 0.0, 1.0] + ]) + + com_velocities = np.array([ + [0.5, 0.0, 0.0], + [0.5, 0.0, 0.0] + ]) + + com_accelerations = np.array([ + [0.0, 0.0, 0.0] + ]) + + support_polygon = np.array([ + [0.2, 0.2], + [0.2, -0.2], + [-0.2, -0.2], + [-0.2, 0.2] + ]) + + support_polygons = [support_polygon] * 3 + + all_stable, margins = analyzer.check_trajectory_stability( + com_trajectory, com_velocities, com_accelerations, support_polygons + ) + + assert isinstance(all_stable, bool) + assert len(margins) == len(com_trajectory) + + def test_stability_metrics(self): + """Test comprehensive stability metrics.""" + analyzer = StabilityAnalyzer() + + com_pos = np.array([0.0, 0.0, 1.0]) + com_vel = np.array([0.1, 0.0, 0.0]) + com_acc = np.array([0.0, 0.0, 0.0]) + + support_polygon = np.array([ + [0.2, 0.2], + [0.2, -0.2], + [-0.2, -0.2], + [-0.2, 0.2] + ]) + + metrics = analyzer.compute_stability_metrics( + com_pos, com_vel, com_acc, support_polygon + ) + + # Check all expected keys + assert 'static_margin' in metrics + assert 'dynamic_margin' in metrics + assert 'is_stable' in metrics + assert 'zmp' in metrics + assert 'capture_point' in metrics + assert 'com_height' in metrics + assert np.allclose(metrics['com_height'], 1.0) + + +class TestCenterOfMassTracker: + """Test CoM tracking functionality.""" + + def test_initialization(self): + """Test tracker initialization.""" + tracker = CenterOfMassTracker(dt=0.01) + + assert tracker.dt == 0.01 + assert len(tracker.history) == 0 + + def test_update(self): + """Test CoM state update.""" + tracker = CenterOfMassTracker() + + com_pos = np.array([0.0, 0.0, 1.0]) + tracker.update(com_pos) + + assert len(tracker.history) == 1 + assert np.allclose(tracker.history[0]['position'], com_pos) + + def test_velocity_estimation(self): + """Test velocity estimation from history.""" + tracker = CenterOfMassTracker(dt=0.1) + + # Add positions with constant velocity + for i in range(5): + com_pos = np.array([i * 0.1, 0.0, 1.0]) + tracker.update(com_pos, timestamp=i * 0.1) + + velocity = tracker.get_velocity() + + # Velocity should be ~1.0 m/s in x direction + assert velocity.shape == (3,) + assert np.allclose(velocity[0], 1.0, atol=0.1) + assert np.allclose(velocity[1], 0.0, atol=0.1) + + def test_acceleration_estimation(self): + """Test acceleration estimation.""" + tracker = CenterOfMassTracker(dt=0.1) + + # Add positions with constant acceleration + for i in range(10): + t = i * 0.1 + # x = 0.5 * a * t^2, with a = 1.0 + x = 0.5 * 1.0 * t**2 + com_pos = np.array([x, 0.0, 1.0]) + tracker.update(com_pos, timestamp=t) + + acceleration = tracker.get_acceleration() + + # Should estimate acceleration ~1.0 + assert acceleration.shape == (3,) + # Finite differences have numerical error + assert np.abs(acceleration[0] - 1.0) < 0.5 + + def test_get_trajectory(self): + """Test getting full trajectory.""" + tracker = CenterOfMassTracker() + + positions = [ + np.array([0.0, 0.0, 1.0]), + np.array([0.1, 0.0, 1.0]), + np.array([0.2, 0.0, 1.0]) + ] + + for pos in positions: + tracker.update(pos) + + trajectory = tracker.get_trajectory() + + assert trajectory.shape == (3, 3) + assert np.allclose(trajectory, positions) + + def test_clear(self): + """Test clearing history.""" + tracker = CenterOfMassTracker() + + tracker.update(np.array([0.0, 0.0, 1.0])) + tracker.update(np.array([0.1, 0.0, 1.0])) + + assert len(tracker.history) == 2 + + tracker.clear() + + assert len(tracker.history) == 0 + + +if __name__ == "__main__": + pytest.main([__file__, "-v"])