diff --git a/rock/actions/sandbox/response.py b/rock/actions/sandbox/response.py index 1e44d26017..771a17f40b 100644 --- a/rock/actions/sandbox/response.py +++ b/rock/actions/sandbox/response.py @@ -16,6 +16,8 @@ class State(str, Enum): PENDING = "pending" RUNNING = "running" STOPPED = "stopped" + ARCHIVING = "archiving" + ARCHIVED = "archived" DELETED = "deleted" @@ -54,6 +56,7 @@ class SandboxStatusResponse(BaseModel): start_time: str | None = None stop_time: str | None = None create_time: str | None = None + state_history: list[dict[str, str]] = [] class CommandResponse(BaseModel): diff --git a/rock/actions/sandbox/sandbox_info.py b/rock/actions/sandbox/sandbox_info.py index 33ee9ab1d5..50aa663e83 100644 --- a/rock/actions/sandbox/sandbox_info.py +++ b/rock/actions/sandbox/sandbox_info.py @@ -26,7 +26,9 @@ class SandboxInfo(TypedDict, total=False): start_time: str stop_time: str delete_time: str + archive_time: str extended_params: dict[str, str] + state_history: list[dict[str, str]] _SANDBOX_INFO_KEYS = frozenset(SandboxInfo.__annotations__.keys()) diff --git a/rock/admin/core/schema.py b/rock/admin/core/schema.py index d9e22c5579..c97ce1d26e 100644 --- a/rock/admin/core/schema.py +++ b/rock/admin/core/schema.py @@ -40,6 +40,8 @@ class SandboxRecord(Base): create_time = Column(String(64), nullable=False, default="") start_time = Column(String(64), nullable=True) stop_time = Column(String(64), nullable=True) + archive_time = Column(String(64), nullable=True) + delete_time = Column(String(64), nullable=True) host_name = Column(String(255), nullable=True) auth_token = Column(String(512), nullable=True) rock_authorization_encrypted = Column(String(1024), nullable=True) diff --git a/rock/admin/entrypoints/sandbox_api.py b/rock/admin/entrypoints/sandbox_api.py index 34c1ac0af5..a11f22bec0 100644 --- a/rock/admin/entrypoints/sandbox_api.py +++ b/rock/admin/entrypoints/sandbox_api.py @@ -4,7 +4,7 @@ from typing import Annotated, Any import httpx -from fastapi import APIRouter, Body, Depends, File, Form, UploadFile +from fastapi import APIRouter, Body, Depends, File, Form, Header, UploadFile from rock.actions import ( BashObservation, @@ -327,7 +327,7 @@ async def _apply_accelerator_type_validation(config: DockerDeploymentConfig) -> if config.accelerator_type not in allowed: raise BadRequestRockError( - f"Invalid accelerator_type {config.accelerator_type!r}. " f"Allowed values: {sorted(allowed)}" + f"Invalid accelerator_type {config.accelerator_type!r}. Allowed values: {sorted(allowed)}" ) @@ -512,6 +512,19 @@ async def delete(sandbox_id: str = Body(..., embed=True)) -> RockResponse: return RockResponse(result=f"{sandbox_id} deleted") +@sandbox_router.post("/sandboxes/{sandbox_id}/archive") +@handle_exceptions(error_message="archive sandbox failed") +async def archive( + sandbox_id: NonBlankStr, + rock_authorization: str | None = Header(default=None, alias="X-Key"), +) -> RockResponse: + archive_cfg = sandbox_manager.rock_config.lifecycle.archive + if not archive_cfg.is_allowed(rock_authorization): + raise BadRequestRockError("archive not allowed for this authorization key") + await sandbox_manager.archive_sandbox(sandbox_id) + return RockResponse(result=f"{sandbox_id} archiving") + + @sandbox_router.post("/restart") @handle_exceptions(error_message="restart sandbox failed") async def restart(sandbox_id: str = Body(..., embed=True)) -> RockResponse[SandboxStartResponse]: diff --git a/rock/admin/main.py b/rock/admin/main.py index a3c7537f40..ae84be6ce3 100644 --- a/rock/admin/main.py +++ b/rock/admin/main.py @@ -179,6 +179,7 @@ async def lifespan(app: FastAPI): meta_store=meta_store, ) set_sandbox_manager(sandbox_manager) + warmup_service = WarmupService(rock_config.warmup) await warmup_service.init() set_warmup_service(warmup_service) diff --git a/rock/admin/proto/response.py b/rock/admin/proto/response.py index 41aca0e7fe..fb8f140046 100644 --- a/rock/admin/proto/response.py +++ b/rock/admin/proto/response.py @@ -15,6 +15,13 @@ class SandboxStartResponse(SandboxResponse): disk_limit_rootfs: str | None = None +class StateTransitionRecord(BaseModel): + from_state: str + to_state: str + event: str + timestamp: str + + # TODO: inherit from SandboxStartResponse class SandboxStatusResponse(BaseModel): sandbox_id: str = None @@ -36,6 +43,7 @@ class SandboxStatusResponse(BaseModel): start_time: str | None = None stop_time: str | None = None create_time: str | None = None + state_history: list[StateTransitionRecord] = [] @classmethod def from_sandbox_info(cls, sandbox_info: "SandboxInfo") -> "SandboxStatusResponse": @@ -53,6 +61,7 @@ def from_sandbox_info(cls, sandbox_info: "SandboxInfo") -> "SandboxStatusRespons cpus=sandbox_info.get("cpus"), memory=sandbox_info.get("memory"), disk_limit_rootfs=sandbox_info.get("disk_limit_rootfs"), + state_history=sandbox_info.get("state_history", []), ) diff --git a/rock/config.py b/rock/config.py index 1b9b0257b4..aa13628c91 100644 --- a/rock/config.py +++ b/rock/config.py @@ -172,12 +172,22 @@ class ArchiveAcrConfig: @dataclass class ArchiveConfig: + enabled: bool = False + allowed_keys: list[str] | None = None dir_storage: ArchiveDirStorageConfig = field(default_factory=ArchiveDirStorageConfig) acr: ArchiveAcrConfig = field(default_factory=ArchiveAcrConfig) - scan_interval_sec: int = 30 - timeout_sec: int = 1800 max_retries: int = 3 prefix: str = "rock-archives/" + max_image_push_size: str = "16g" + max_dir_upload_size: str = "16g" + + def is_allowed(self, rock_authorization: str | None) -> bool: + """Check if the caller is allowed to use archive. None = open to all.""" + if self.allowed_keys is None: + return True + if not isinstance(self.allowed_keys, list): + return False + return rock_authorization in self.allowed_keys def __post_init__(self): if isinstance(self.dir_storage, dict): @@ -188,6 +198,9 @@ def __post_init__(self): @dataclass class SandboxLifecycleConfig: + reconcile_interval_seconds: int = 30 + archive_timeout_seconds: int = 1800 + restore_timeout_seconds: int = 1800 archive: ArchiveConfig = field(default_factory=ArchiveConfig) default_startup_timeout_seconds: float = 600 @@ -595,4 +608,5 @@ async def update(self): f", image_registry_mirrors={self.image_registry_mirrors}" f", image_mirror_lookup_allowlist={self.image_mirror_lookup_allowlist}" f", instance_registry_mirrors={self.runtime.instance_registry_mirrors}" + f", lifecycle={self.lifecycle}" ) diff --git a/rock/deployments/config.py b/rock/deployments/config.py index 891d8422c3..6103520fd7 100644 --- a/rock/deployments/config.py +++ b/rock/deployments/config.py @@ -117,7 +117,9 @@ class DockerDeploymentConfig(DeploymentConfig): """Custom name for the container. If None, a random name will be generated.""" auto_delete_seconds: int | None = None - """If set, the container will be automatically deleted after container stopped.""" + """Per-sandbox auto-delete policy. 0 = --rm (immediate delete on stop); + >0 = delete after this many seconds idle; None = fall back to global + lifecycle.auto_delete_after_sec.""" type: Literal["docker"] = "docker" """Deployment type discriminator for serialization/deserialization and CLI parsing. Should not be modified.""" diff --git a/rock/deployments/docker.py b/rock/deployments/docker.py index 971130eb3a..a831a3269a 100644 --- a/rock/deployments/docker.py +++ b/rock/deployments/docker.py @@ -964,6 +964,10 @@ async def restart(self): logger.info(f"Restarting container {self._container_name} with docker start") + self._service_status.update_status( + phase_name="image_pull", status=Status.SUCCESS, message="skip image pull on restart" + ) + # Reuse the same Popen-based attached start used by start(), so the # restart path also produces a valid self._container_process. Without # this, _stop() would skip its `if self._container_process is not None` @@ -1003,6 +1007,118 @@ async def restart(self): logger.info(f"Container {self._container_name} restarted successfully") + def _container_exists(self) -> bool: + result = subprocess.run( + ["docker", "inspect", self._container_name], + capture_output=True, + timeout=5, + ) + return result.returncode == 0 + + async def restart_from_image(self, image_ref: str): + """Restart a container, recreating it from a committed image if it no longer exists. + + Used by the archive-restore flow: the image was previously committed + and pushed to a registry. If the original container still exists, + behaves like restart(). If not, creates a new container from + *image_ref* with the same config (ports, memory, cpus, volumes). + """ + if self._container_exists(): + await self.restart() + return + + logger.info(f"Container {self._container_name} not found, recreating from {image_ref}") + + executor = get_executor() + loop = asyncio.get_running_loop() + + # Recover port mappings from persisted file, or allocate new ones. + status_path = PersistedServiceStatus.gen_service_status_path(self._container_name) + self._service_status.set_sandbox_id(self._container_name) + if os.path.exists(status_path): + with open(status_path) as f: + data = json.load(f) + for port_value, mapping in data.get("port_mapping", {}).items(): + self._service_status.add_port_mapping(int(port_value), mapping) + if self._config.port is None: + self._config.port = self._service_status.port_mapping.get(Port.PROXY) + + if self._config.port is None: + await self.do_port_mapping() + + # Build docker create command from archived image. + env_arg = ["-e", f"ROCK_TIME_ZONE={env_vars.ROCK_TIME_ZONE}"] + volume_args = self._prepare_volume_mounts() + if env_vars.ROCK_LOGGING_PATH: + log_file_path = f"{env_vars.ROCK_LOGGING_PATH}/{self._container_name}" + os.makedirs(log_file_path, exist_ok=True) + os.chmod(log_file_path, 0o777) + volume_args.extend(["-v", f"{log_file_path}:{env_vars.ROCK_LOGGING_PATH}"]) + env_arg.extend( + [ + "-e", + f"ROCK_LOGGING_PATH={env_vars.ROCK_LOGGING_PATH}", + "-e", + f"ROCK_LOGGING_LEVEL={env_vars.ROCK_LOGGING_LEVEL}", + ] + ) + volume_args.extend(self._prepare_timezone_mount()) + runtime_args = self._build_runtime_args() + + cmds = [ + "docker", + "create", + "--entrypoint", + "", + *env_arg, + *volume_args, + *runtime_args, + "-p", + f"{self._config.port}:{Port.PROXY}", + "-p", + f"{self._service_status.get_mapped_port(Port.SERVER)}:8080", + "-p", + f"{self._service_status.get_mapped_port(Port.SSH)}:22", + *self._memory(), + *self._cpus(), + *self._storage_opts(), + *self._config.docker_args, + "--name", + self._container_name, + image_ref, + *self._get_rocklet_start_cmd(), + ] + + cmd_str = shlex.join(cmds) + logger.info(f"Recreating container {self._container_name} from archived image {image_ref}") + logger.info(f"Command: {cmd_str!r}") + + await loop.run_in_executor(executor, self._docker_create, cmds) + try: + self._container_process = await loop.run_in_executor(executor, self._docker_start) + except Exception: + DockerUtil.remove_container_force(self._container_name) + raise + + if self._config.port is None: + self._config.port = self._service_status.port_mapping.get(Port.PROXY) + if self._config.port is None: + raise Exception(f"Cannot determine rocklet port for container {self._container_name}") + + logger.info(f"Starting runtime at {self._config.port}") + self._runtime = RemoteSandboxRuntime.from_config( + RemoteSandboxRuntimeConfig(port=self._config.port, timeout=self._runtime_timeout) + ) + self._runtime.set_executor(executor) + + with StageTimer("startup_timing", f"[{self._container_name}] Wait until alive", logger): + await self._wait_until_alive(timeout=self._config.startup_timeout) + + if self._config.enable_auto_clear: + self._check_stop_task = asyncio.create_task(self._check_stop()) + + logger.info(f"Container {self._container_name} recreated from {image_ref} successfully") + async def delete(self) -> None: """Remove the container via ``docker rm -f``. diff --git a/rock/sandbox/archive/abstract.py b/rock/sandbox/archive/abstract.py index cf39a620e6..549682d835 100644 --- a/rock/sandbox/archive/abstract.py +++ b/rock/sandbox/archive/abstract.py @@ -1,4 +1,10 @@ +from __future__ import annotations + from abc import ABC, abstractmethod +from typing import TYPE_CHECKING + +if TYPE_CHECKING: + from rock.config import ArchiveAcrConfig, ArchiveDirStorageConfig class AbstractDirStorage(ABC): @@ -8,6 +14,22 @@ class AbstractDirStorage(ABC): decides compression format and upload strategy internally. """ + @classmethod + def from_config(cls, cfg: ArchiveDirStorageConfig) -> AbstractDirStorage: + from rock.sandbox.archive.oss_storage import OssDirStorage + from rock.sandbox.archive.s3_storage import S3DirStorage + + kwargs = dict( + endpoint=cfg.endpoint, + bucket=cfg.bucket, + access_key_id=cfg.access_key_id, + access_key_secret=cfg.access_key_secret, + region=cfg.region, + ) + if cfg.type == "s3": + return S3DirStorage(**kwargs) + return OssDirStorage(**kwargs) + @abstractmethod async def upload_dir(self, local_dir: str, key: str) -> None: """Pack and upload local_dir to key. Raises FileNotFoundError if local_dir missing.""" @@ -35,6 +57,16 @@ class AbstractImageStorage(ABC): (node-local docker daemon). exists / delete are registry HTTP API calls. """ + @classmethod + def from_config(cls, cfg: ArchiveAcrConfig) -> AbstractImageStorage: + from rock.sandbox.archive.registry_v2 import DockerRegistryV2ImageStorage + + return DockerRegistryV2ImageStorage( + registry_url=cfg.registry_url, + username=cfg.username, + password=cfg.password, + ) + @property @abstractmethod def registry_url(self) -> str: diff --git a/rock/sandbox/archive/constants.py b/rock/sandbox/archive/constants.py new file mode 100644 index 0000000000..1915bd1924 --- /dev/null +++ b/rock/sandbox/archive/constants.py @@ -0,0 +1,8 @@ +class ArchiveKeys: + @staticmethod + def dir_key(sandbox_id: str, prefix: str) -> str: + return f"{prefix}{sandbox_id}.tar.gz" + + @staticmethod + def image_ref(sandbox_id: str, registry_url: str, namespace: str) -> str: + return f"{registry_url}/{namespace}/sandbox_archived:{sandbox_id}" diff --git a/rock/sandbox/base_manager.py b/rock/sandbox/base_manager.py index c89e731aef..70678c39ae 100644 --- a/rock/sandbox/base_manager.py +++ b/rock/sandbox/base_manager.py @@ -1,5 +1,6 @@ import asyncio import time +from abc import ABC, abstractmethod import ray from apscheduler.schedulers.asyncio import AsyncIOScheduler @@ -16,7 +17,7 @@ logger = init_logger(__name__) -class BaseManager: +class BaseManager(ABC): _check_job_bg_task: object = None rock_config: RockConfig = None @@ -36,6 +37,7 @@ def __init__( ) self._report_interval = 10 self._check_job_interval = 180 + self._reconcile_interval = rock_config.lifecycle.reconcile_interval_seconds self._setup_scheduler() self.deployment_manager = DeploymentManager(rock_config, enable_runtime_auto_clear) @@ -61,7 +63,6 @@ def _setup_metrics_scheduler(self): logger.info("APScheduler started for metrics collection") def _setup_job_check_scheduler(self): - """Set up scheduler""" self.scheduler = AsyncIOScheduler( timezone="UTC", job_defaults={"coalesce": True, "max_instances": 1, "misfire_grace_time": 30} ) @@ -71,8 +72,14 @@ def _setup_job_check_scheduler(self): id="job_check", name="Sandbox Job Check", ) + self.scheduler.add_job( + func=self._reconcile, + trigger=IntervalTrigger(seconds=self._reconcile_interval), + id="reconcile", + name="Sandbox Reconcile", + ) self.scheduler.start() - logger.info("APScheduler started for job check") + logger.info("APScheduler started for job check and reconcile") async def _collect_and_report_metrics(self): start_time = time.time() @@ -133,6 +140,14 @@ async def _collect_sandbox_meta(self) -> tuple[int, dict[str, dict[str, str]]]: meta[sandbox_info.get("sandbox_id")] = {"image": image} return cnt, meta + @abstractmethod + async def _check_job_background(self): + ... + + @abstractmethod + async def _reconcile(self): + ... + def stop_monitoring(self): if self.scheduler and self.scheduler.running: logger.info("Stopping APScheduler...") diff --git a/rock/sandbox/operator/abstract.py b/rock/sandbox/operator/abstract.py index b54afd77cb..0440a98964 100644 --- a/rock/sandbox/operator/abstract.py +++ b/rock/sandbox/operator/abstract.py @@ -40,6 +40,33 @@ async def stop(self, sandbox_id: str, reason: StopReason = StopReason.MANUAL) -> async def delete(self, config: DeploymentConfig, host_ip: str | None = None) -> bool: ... + def supports_archive(self) -> bool: + return False + + async def start_archive( + self, + config: DeploymentConfig, + host_ip: str | None, + dir_storage_config: dict, + image_storage_config: dict, + archive_params: dict | None = None, + ) -> None: + from rock.sdk.common.exceptions import BadRequestRockError + + raise BadRequestRockError(f"archive not supported on {type(self).__name__}") + + async def start_restore( + self, + config: DeploymentConfig, + host_ip: str | None, + dir_storage_config: dict, + image_storage_config: dict, + archive_params: dict | None = None, + ) -> None: + from rock.sdk.common.exceptions import BadRequestRockError + + raise BadRequestRockError(f"restore not supported on {type(self).__name__}") + def set_redis_provider(self, redis_provider: RedisProvider): self._redis_provider = redis_provider diff --git a/rock/sandbox/operator/ray.py b/rock/sandbox/operator/ray.py index 0f03f31d72..e73f1ca504 100644 --- a/rock/sandbox/operator/ray.py +++ b/rock/sandbox/operator/ray.py @@ -1,3 +1,4 @@ +import asyncio import json import ray @@ -32,23 +33,26 @@ def __init__(self, ray_service: RayService, runtime_config: RuntimeConfig): def _get_actor_name(self, sandbox_id: str) -> str: return f"sandbox-{sandbox_id}" - async def create_actor(self, config: DockerDeploymentConfig, pin_to_host_ip: str | None = None): + async def create_actor( + self, + config: DockerDeploymentConfig, + pin_to_host_ip: str | None = None, + ): actor_options = self._generate_actor_options(config, pin_to_host_ip=pin_to_host_ip) deployment: DockerDeployment = config.get_deployment() sandbox_actor = SandboxActor.options(**actor_options).remote(config, deployment) return sandbox_actor - def _generate_actor_options(self, config: DockerDeploymentConfig, pin_to_host_ip: str | None = None) -> dict: + def _generate_actor_options( + self, + config: DockerDeploymentConfig, + pin_to_host_ip: str | None = None, + ) -> dict: actor_name = self._get_actor_name(config.container_name) actor_options = {"name": actor_name, "lifetime": "detached"} try: - memory = parse_size_to_bytes(config.memory) actor_options["num_cpus"] = config.cpus - actor_options["memory"] = memory - # Pin to a specific node via Ray's implicit `node:` resource - # (registered automatically per node with value 1.0; we consume a - # negligible 0.001 so we don't block other actors). Used by restart - # to land on the host that owns the existing container. + actor_options["memory"] = parse_size_to_bytes(config.memory) if pin_to_host_ip: actor_options["resources"] = {f"node:{pin_to_host_ip}": 0.001} return actor_options @@ -225,3 +229,50 @@ def use_rocklet(self) -> bool: if self._nacos_provider.get_switch_status(GET_STATUS_SWITCH): return True return False + + def supports_archive(self) -> bool: + return True + + async def start_archive( + self, + config: DockerDeploymentConfig, + host_ip: str | None, + dir_storage_config: dict, + image_storage_config: dict, + archive_params: dict | None = None, + ) -> None: + async with self._ray_service.get_ray_rwlock().read_lock(): + config.cpus = 0.1 + config.memory = "256m" + sandbox_actor = await self.create_actor(config, pin_to_host_ip=host_ip) + asyncio.create_task( + self._run_archive_and_kill(sandbox_actor, dir_storage_config, image_storage_config, archive_params) + ) + + async def _run_archive_and_kill(self, actor, dir_cfg, image_cfg, archive_params=None): + logger.info("_run_archive_and_kill started") + try: + await self._ray_service.async_ray_get(actor.archive.remote(dir_cfg, image_cfg, archive_params)) + logger.info("_run_archive_and_kill completed successfully") + except Exception as e: + logger.exception(f"archive remote failed: {e}") + finally: + ray.kill(actor) + + async def start_restore( + self, + config: DockerDeploymentConfig, + host_ip: str | None, + dir_storage_config: dict, + image_storage_config: dict, + archive_params: dict | None = None, + ) -> None: + """Fire-and-forget restore: actor pulls image, downloads logs, starts container. + + The actor stays alive as the long-lived detached actor for the sandbox + (same as what `submit` creates for a new sandbox). `get_status` alive + detection will transition the state to RUNNING once the container is up. + """ + async with self._ray_service.get_ray_rwlock().read_lock(): + sandbox_actor = await self.create_actor(config, pin_to_host_ip=host_ip) + sandbox_actor.restore_and_start.remote(dir_storage_config, image_storage_config, archive_params) diff --git a/rock/sandbox/sandbox_actor.py b/rock/sandbox/sandbox_actor.py index f34aea31ed..fae8a4cb1f 100644 --- a/rock/sandbox/sandbox_actor.py +++ b/rock/sandbox/sandbox_actor.py @@ -1,6 +1,7 @@ import asyncio import datetime import os +import shutil import socket import subprocess import tempfile @@ -8,6 +9,7 @@ import ray from fastapi import UploadFile +from rock import env_vars from rock.actions import ( BashObservation, CloseBashSessionResponse, @@ -32,11 +34,24 @@ from rock.deployments.docker import DockerDeployment from rock.deployments.status import ServiceStatus from rock.logger import init_logger +from rock.sandbox.archive.constants import ArchiveKeys +from rock.sandbox.archive.oss_storage import OssDirStorage +from rock.sandbox.archive.registry_v2 import DockerRegistryV2ImageStorage +from rock.sandbox.archive.s3_storage import S3DirStorage from rock.sandbox.gem_actor import GemActor +from rock.utils.format import parse_size_to_bytes logger = init_logger(__name__) +def _make_dir_storage(config: dict): + config = dict(config) + storage_type = config.pop("type", "oss") + if storage_type == "s3": + return S3DirStorage(**config) + return OssDirStorage(**config) + + @ray.remote(scheduling_strategy="SPREAD") class SandboxActor(GemActor): _export_interval_millis: int = 10000 @@ -127,6 +142,14 @@ async def _run_shell_command( await process.wait() raise subprocess.TimeoutExpired(args, timeout) + async def _get_image_size(self, image_tag: str) -> int: + result = await self._run_shell_command("docker", "image", "inspect", "--format={{.Size}}", image_tag) + return int(result.stdout.decode().strip()) + + async def _get_dir_size(self, dir_path: str) -> int: + result = await self._run_shell_command("du", "-sb", dir_path) + return int(result.stdout.decode().split()[0]) + async def start(self): try: await self._deployment.start() @@ -316,3 +339,106 @@ async def sandbox_info(self) -> SandboxInfo: "disk_limit_rootfs": self._config.disk_limit_rootfs, } return {} + + async def archive( + self, + dir_storage_config: dict, + image_storage_config: dict, + archive_params: dict | None = None, + ) -> None: + """Async archive: commit+push image, then tar+upload log dir.""" + sandbox_id = self._config.container_name + dir_storage = _make_dir_storage(dir_storage_config) + image_storage = DockerRegistryV2ImageStorage(**image_storage_config) + archive_params = archive_params or {} + prefix = archive_params.get("archive_prefix", "rock-archives/") + acr_ns = archive_params.get("acr_namespace", "sandbox_archive") + + local_tag = f"archive-staging-{sandbox_id}:latest" + await self._run_shell_command("docker", "commit", sandbox_id, local_tag) + + max_image = archive_params.get("max_image_push_size", "") + if max_image: + image_size = await self._get_image_size(local_tag) + max_bytes = parse_size_to_bytes(max_image) + if image_size > max_bytes: + await self._run_shell_command("docker", "rmi", local_tag, check=False) + raise RuntimeError( + f"[{sandbox_id}] image size {image_size} bytes exceeds limit {max_image} ({max_bytes} bytes)" + ) + + ref = ArchiveKeys.image_ref(sandbox_id, image_storage.registry_url, acr_ns) + try: + await image_storage.push_from_local(local_tag, ref) + except Exception: + await self._run_shell_command("docker", "rmi", local_tag, check=False) + raise + await self._run_shell_command("docker", "rmi", local_tag, check=False) + + log_root = env_vars.ROCK_LOGGING_PATH + if not log_root: + logger.info(f"[{sandbox_id}] ROCK_LOGGING_PATH not set, skipping log dir archive") + return + + log_dir = f"{log_root}/{sandbox_id}" + key = ArchiveKeys.dir_key(sandbox_id, prefix) + + if os.path.isdir(log_dir): + max_dir = archive_params.get("max_dir_upload_size", "") + if max_dir: + dir_size = await self._get_dir_size(log_dir) + max_bytes = parse_size_to_bytes(max_dir) + if dir_size > max_bytes: + raise RuntimeError( + f"[{sandbox_id}] log dir size {dir_size} bytes exceeds limit {max_dir} ({max_bytes} bytes)" + ) + try: + await dir_storage.upload_dir(log_dir, key) + except Exception: + await image_storage.delete(ref) + raise + shutil.rmtree(log_dir, ignore_errors=True) + else: + logger.info(f"[{sandbox_id}] log dir {log_dir} not found, skipping log archive") + + async def restore_and_start( + self, + dir_storage_config: dict, + image_storage_config: dict, + archive_params: dict | None = None, + ) -> None: + """Full restore: pull image + download logs + docker start + arm watchdog.""" + sandbox_id = self._config.container_name + dir_storage = _make_dir_storage(dir_storage_config) + image_storage = DockerRegistryV2ImageStorage(**image_storage_config) + archive_params = archive_params or {} + prefix = archive_params.get("archive_prefix", "rock-archives/") + acr_ns = archive_params.get("acr_namespace", "sandbox_archive") + + ref = ArchiveKeys.image_ref(sandbox_id, image_storage.registry_url, acr_ns) + await image_storage.pull_to_local(ref) + + log_root = env_vars.ROCK_LOGGING_PATH + if log_root: + key = ArchiveKeys.dir_key(sandbox_id, prefix) + target_dir = f"{log_root}/{sandbox_id}" + if await dir_storage.exists(key): + try: + if os.path.exists(target_dir): + shutil.rmtree(target_dir) + await dir_storage.download_to_dir(key, target_dir) + except Exception as e: + logger.warning(f"[{sandbox_id}] log restore failed, continuing without logs: {e}") + else: + logger.warning(f"[{sandbox_id}] archived log key {key} not found in OSS, skipping log restore") + else: + logger.info(f"[{sandbox_id}] ROCK_LOGGING_PATH not set, skipping log restore") + + if isinstance(self._deployment, DockerDeployment): + await self._deployment.restart_from_image(ref) + else: + await self._deployment.restart() + if isinstance(self._deployment, DockerDeployment): + self._clean_container_background() + await self._setup_monitor() + logger.info(f"[{sandbox_id}] restore_and_start complete") diff --git a/rock/sandbox/sandbox_manager.py b/rock/sandbox/sandbox_manager.py index 1c1ff3070a..335d793862 100644 --- a/rock/sandbox/sandbox_manager.py +++ b/rock/sandbox/sandbox_manager.py @@ -1,5 +1,7 @@ import asyncio +import datetime import time +from datetime import timezone from fastapi import UploadFile @@ -31,11 +33,13 @@ from rock.logger import init_logger from rock.rocklet import __version__ as swe_version from rock.sandbox import __version__ as gateway_version +from rock.sandbox.archive.abstract import AbstractDirStorage, AbstractImageStorage +from rock.sandbox.archive.constants import ArchiveKeys from rock.sandbox.base_manager import BaseManager from rock.sandbox.operator.abstract import AbstractOperator from rock.sandbox.sandbox_actor import SandboxActor from rock.sandbox.sandbox_meta_store import SandboxMetaStore -from rock.sandbox.sandbox_statemachine import SandboxStateMachine +from rock.sandbox.sandbox_statemachine import SandboxStateMachine, get_last_entered_at from rock.sandbox.service.sandbox_proxy_service import SandboxProxyService from rock.sandbox.utils.timeout import SandboxTimeoutHelper from rock.sdk.common.exceptions import BadRequestRockError, InternalServerRockError @@ -67,10 +71,28 @@ def __init__( self._ray_service = ray_service self._ray_namespace = ray_namespace self._operator = operator + self._dir_storage = None + self._image_storage = None + self._init_archive_storage(rock_config) self._aes_encrypter = AESEncryption() self._proxy_service = SandboxProxyService(rock_config=rock_config, meta_store=meta_store) logger.info("sandbox service init success") + def _init_archive_storage(self, rock_config: RockConfig) -> None: + archive_cfg = rock_config.lifecycle.archive + if not archive_cfg.enabled: + return + image_registry_cfg = archive_cfg.acr + dir_storage_cfg = archive_cfg.dir_storage + registry_ready = image_registry_cfg.registry_url and image_registry_cfg.username and image_registry_cfg.password + dir_storage_ready = ( + dir_storage_cfg.endpoint and dir_storage_cfg.access_key_id and dir_storage_cfg.access_key_secret + ) + if not (registry_ready and dir_storage_ready): + raise RuntimeError("archive.enabled=true but image registry or dir_storage credentials are missing") + self._dir_storage = AbstractDirStorage.from_config(dir_storage_cfg) + self._image_storage = AbstractImageStorage.from_config(image_registry_cfg) + async def _get_current_statemachine(self, sandbox_id: str) -> SandboxStateMachine | None: """Fetch current state from meta store and return a restored SandboxStateMachine, or None if not found.""" info = await self._meta_store.get(sandbox_id, check_db=True) @@ -158,16 +180,18 @@ async def restart_async(self, sandbox_id: str) -> SandboxStartResponse: raise BadRequestRockError(f"Sandbox {sandbox_id} not found") state = sm.current_state.value - if state != State.STOPPED: + if state == State.ARCHIVED: + await self.restart_from_archived(sandbox_id) + elif state == State.STOPPED: + await sm.send( + "restart", + sandbox_id=sandbox_id, + operator=self._operator, + meta_store=self._meta_store, + ) + else: raise BadRequestRockError(f"Sandbox {sandbox_id} cannot be restarted: current state is '{state.value}'") - await sm.send( - "restart", - sandbox_id=sandbox_id, - operator=self._operator, - meta_store=self._meta_store, - ) - info: SandboxInfo = sm.sandbox_info or {} return SandboxStartResponse( sandbox_id=sandbox_id, @@ -233,16 +257,19 @@ async def delete(self, sandbox_id: str, reason: DeleteReason = DeleteReason.MANU if state == State.DELETED: logger.info(f"delete: sandbox {sandbox_id} already deleted, noop") return - if state != State.STOPPED: + if state not in (State.STOPPED, State.ARCHIVED): raise BadRequestRockError( - f"Sandbox {sandbox_id} cannot be deleted: current state is '{state.value}', must be stopped first" + f"Sandbox {sandbox_id} cannot be deleted: current state is '{state.value}', must be stopped or archived first" ) + await sm.send( "delete", sandbox_id=sandbox_id, operator=self._operator, meta_store=self._meta_store, reason=reason, + dir_storage=self._dir_storage, + image_storage=self._image_storage, ) async def get_mount(self, sandbox_id): @@ -270,31 +297,35 @@ async def commit(self, sandbox_id, image_tag: str, username: str, password: str) logger.info(f"commit {sandbox_id} to {image_tag} finished, result {result}") return result + async def _try_advance_pending(self, sandbox_id: str, sm) -> dict | None: + """Probe operator alive; fire ``alive`` transition if RUNNING. Returns operator info or None.""" + operator_sandbox_info = await self._operator.get_status(sandbox_id=sandbox_id) + if operator_sandbox_info is None: + return None + is_alive = operator_sandbox_info.get("state") == State.RUNNING + if sm.current_state.value == State.PENDING and is_alive: + await sm.send( + "alive", sandbox_id=sandbox_id, meta_store=self._meta_store, sandbox_info=operator_sandbox_info + ) + if operator_sandbox_info.get("state") in (State.PENDING, State.RUNNING): + await self._refresh_timeout(sandbox_id) + return operator_sandbox_info + @monitor_sandbox_operation() async def get_status(self, sandbox_id, include_all_states: bool = False) -> SandboxStatusResponse: - # get status from meta_store sm = await self._get_current_statemachine(sandbox_id) if sm is None: raise BadRequestRockError(f"Sandbox {sandbox_id} not found") - # update status from operator - is_alive = False - operator_sandbox_info: SandboxInfo | None = await self._operator.get_status(sandbox_id=sandbox_id) - if operator_sandbox_info is not None: - is_alive = operator_sandbox_info.get("state") == State.RUNNING - if sm.current_state.value == State.PENDING and is_alive: - await sm.send( - "alive", sandbox_id=sandbox_id, meta_store=self._meta_store, sandbox_info=operator_sandbox_info - ) - if operator_sandbox_info.get("state") in (State.PENDING, State.RUNNING): - await self._refresh_timeout(sandbox_id) + operator_sandbox_info = await self._try_advance_pending(sandbox_id, sm) + is_alive = operator_sandbox_info is not None and operator_sandbox_info.get("state") == State.RUNNING # compat with legacy get_status behavior by default (include_all_states == False), # raise 'not found' if not on pending or running status. if not include_all_states and sm.current_state.value not in (State.PENDING, State.RUNNING): raise BadRequestRockError(f"Sandbox {sandbox_id} not found") - if operator_sandbox_info is not None: + if operator_sandbox_info is not None and sm.current_state.value in (State.PENDING, State.RUNNING): sandbox_info = operator_sandbox_info else: sandbox_info = sm.sandbox_info @@ -319,6 +350,7 @@ async def get_status(self, sandbox_id, include_all_states: bool = False) -> Sand start_time=sandbox_info.get("start_time"), stop_time=sandbox_info.get("stop_time"), create_time=sandbox_info.get("create_time"), + state_history=sm.sandbox_info.get("state_history", []) if sm.sandbox_info else [], ) async def build_sandbox_info_from_redis(self, sandbox_id: str, deployment_info: SandboxInfo) -> SandboxInfo | None: @@ -408,6 +440,11 @@ async def _check_job_background(self): logger.error("check_job_background Exception", exc_info=e) continue + async def _reconcile(self) -> None: + """Reconcile intermediate states (PENDING, ARCHIVING) on short interval.""" + await self._reconcile_pending() + await self._reconcile_archiving() + async def get_sandbox_statistics(self, sandbox_id): actor_name = self.deployment_manager.get_actor_name(sandbox_id) sandbox_actor = await self._ray_service.async_ray_get_actor(actor_name, self._ray_namespace) @@ -436,6 +473,172 @@ def validate_sandbox_spec(self, runtime_config: RuntimeConfig, deployment_config parse_size_to_bytes(deployment_config.disk_limit_rootfs) except ValueError as e: logger.warning(f"Invalid disk_limit_rootfs size: {deployment_config.disk_limit_rootfs}", exc_info=e) - raise BadRequestRockError( - f"Invalid disk_limit_rootfs size: {deployment_config.disk_limit_rootfs}" - ) + raise BadRequestRockError(f"Invalid disk_limit_rootfs size: {deployment_config.disk_limit_rootfs}") + + async def archive_sandbox(self, sandbox_id: str) -> None: + """Validate preconditions, then fire archive transition (cleanup + actor dispatch in on_archive).""" + if not self._operator or not self._operator.supports_archive(): + raise BadRequestRockError(f"archive not supported on {type(self._operator).__name__}") + if not self._dir_storage or not self._image_storage: + raise BadRequestRockError("archive not configured: missing storage credentials") + + sm = await self._get_current_statemachine(sandbox_id) + if sm is None: + raise BadRequestRockError(f"sandbox {sandbox_id} not found") + + archive_cfg = self.rock_config.lifecycle.archive + archive_params = { + "archive_prefix": archive_cfg.prefix, + "acr_namespace": archive_cfg.acr.namespace, + "max_image_push_size": archive_cfg.max_image_push_size, + "max_dir_upload_size": archive_cfg.max_dir_upload_size, + } + await sm.send( + "archive", + sandbox_id=sandbox_id, + meta_store=self._meta_store, + operator=self._operator, + dir_storage=self._dir_storage, + image_storage=self._image_storage, + archive_params=archive_params, + ) + + async def restart_from_archived(self, sandbox_id: str) -> None: + """Async restart from ARCHIVED: transition to PENDING, fire-and-forget actor. + + The actor handles pull + download + docker start. get_status alive detection + drives the PENDING → RUNNING transition. + """ + if not self._dir_storage or not self._image_storage: + raise BadRequestRockError("archive not configured: missing storage credentials") + + sm = await self._get_current_statemachine(sandbox_id) + if sm is None: + raise BadRequestRockError(f"sandbox {sandbox_id} not found") + + if sm.current_state.value != State.ARCHIVED: + return + + info = sm.sandbox_info or {} + spec = info.get("spec") or {} + if not spec: + raise BadRequestRockError(f"sandbox {sandbox_id} has no spec snapshot; cannot restore") + + timeout_info = SandboxTimeoutHelper.make_timeout_info(DockerDeploymentConfig(**spec).auto_clear_time) + + await sm.send( + "restore", + sandbox_id=sandbox_id, + meta_store=self._meta_store, + timeout_info=timeout_info, + operator=self._operator, + dir_storage=self._dir_storage, + image_storage=self._image_storage, + ) + + async def _reconcile_archiving(self) -> None: + """Reconcile ARCHIVING sandboxes: verify completion or roll back to STOPPED on timeout.""" + if not self._operator or not self._operator.supports_archive(): + return + if not self._dir_storage or not self._image_storage: + return + + try: + archiving = await self._meta_store.list_by("state", State.ARCHIVING.value) + except Exception as e: + logger.warning(f"[reconcile_archiving] list_by failed: {e}") + return + + for info in archiving: + sandbox_id = info.get("sandbox_id", "") + if not sandbox_id: + continue + + acr_ns = info.get("acr_namespace", self.rock_config.lifecycle.archive.acr.namespace) + ref = ArchiveKeys.image_ref(sandbox_id, self._image_storage.registry_url, acr_ns) + + try: + img_ok = await self._image_storage.exists(ref) + except Exception as e: + logger.warning(f"exists check failed for {sandbox_id}: {e}") + continue + + if img_ok: + try: + sm = await self._get_current_statemachine(sandbox_id) + if sm and sm.current_state.value == State.ARCHIVING: + await sm.send("archive_done", sandbox_id=sandbox_id, meta_store=self._meta_store) + logger.info(f"archive_done: {sandbox_id}") + except Exception as e: + logger.error(f"archive_done transition failed for {sandbox_id}: {e}", exc_info=True) + continue + + started_at = get_last_entered_at(info.get("state_history", []), "archiving") + if not started_at: + continue + try: + started = datetime.datetime.fromisoformat(started_at.replace("Z", "+00:00")) + elapsed = (datetime.datetime.now(timezone.utc) - started).total_seconds() + except (ValueError, TypeError): + continue + + if elapsed < self.rock_config.lifecycle.archive_timeout_seconds: + continue + + try: + sm = await self._get_current_statemachine(sandbox_id) + if sm and sm.current_state.value == State.ARCHIVING: + await sm.send( + "archive_failed", + sandbox_id=sandbox_id, + meta_store=self._meta_store, + reason=f"timeout after {int(elapsed)}s", + ) + logger.warning(f"archive_failed: {sandbox_id} ({int(elapsed)}s)") + except Exception as e: + logger.error(f"archive_failed transition failed for {sandbox_id}: {e}", exc_info=True) + + async def _reconcile_pending(self) -> None: + """Reconcile PENDING sandboxes: advance to RUNNING or timeout restore back to ARCHIVED.""" + try: + pending_list = await self._meta_store.list_by("state", State.PENDING.value) + except Exception as e: + logger.warning(f"[reconcile_pending] list_by failed: {e}") + return + + for info in pending_list: + sandbox_id = info.get("sandbox_id", "") + if not sandbox_id: + continue + try: + # 1. Restore timeout (archive_time present = restoring from ARCHIVED) + started_at = get_last_entered_at(info.get("state_history", []), "pending") + if info.get("archive_time") and started_at: + try: + started = datetime.datetime.fromisoformat(started_at.replace("Z", "+00:00")) + elapsed = (datetime.datetime.now(timezone.utc) - started).total_seconds() + except (ValueError, TypeError): + elapsed = 0 + if elapsed >= self.rock_config.lifecycle.restore_timeout_seconds: + sm = await self._get_current_statemachine(sandbox_id) + if sm and sm.current_state.value == State.PENDING: + await sm.send( + "restore_failed", + sandbox_id=sandbox_id, + meta_store=self._meta_store, + reason=f"timeout after {int(elapsed)}s", + ) + logger.warning(f"[reconcile_pending] restore_failed: {sandbox_id} ({int(elapsed)}s)") + continue + + # 2. Try advance PENDING → RUNNING + sm = await self._get_current_statemachine(sandbox_id) + if sm is None: + continue + await self._try_advance_pending(sandbox_id, sm) + + except asyncio.CancelledError: + continue + except Exception as e: + logger.error(f"[reconcile_pending] {sandbox_id}: {e}", exc_info=True) + continue diff --git a/rock/sandbox/sandbox_meta_store.py b/rock/sandbox/sandbox_meta_store.py index 9b04f66a3f..c492a6bd62 100644 --- a/rock/sandbox/sandbox_meta_store.py +++ b/rock/sandbox/sandbox_meta_store.py @@ -135,7 +135,8 @@ async def get(self, sandbox_id: str, check_db: bool = False) -> SandboxInfo | No if check_db: result = await self._db.get(sandbox_id) if result: - return result + status_blob = result.pop("status", None) or {} + return {**status_blob, **result} return None async def exists(self, sandbox_id: str) -> bool: diff --git a/rock/sandbox/sandbox_statemachine.py b/rock/sandbox/sandbox_statemachine.py index 105b2423d9..6077db15bb 100644 --- a/rock/sandbox/sandbox_statemachine.py +++ b/rock/sandbox/sandbox_statemachine.py @@ -15,6 +15,7 @@ from rock.common.constants import DeleteReason, StopReason from rock.deployments.config import DockerDeploymentConfig from rock.logger import init_logger +from rock.sandbox.archive.constants import ArchiveKeys from rock.sandbox.utils.timeout import SandboxTimeoutHelper from rock.sdk.common.exceptions import BadRequestRockError from rock.utils.system import get_iso8601_timestamp @@ -22,6 +23,13 @@ logger = init_logger(__name__) +def get_last_entered_at(state_history: list[dict[str, str]], target_state: str) -> str: + for record in reversed(state_history): + if record.get("to_state") == target_state: + return record.get("timestamp", "") + return "" + + class SandboxStateMachine(StateChart): """ State machine for sandbox lifecycle management. @@ -46,6 +54,8 @@ class SandboxStateMachine(StateChart): pending = SMState("Pending", initial=True, value=RockState.PENDING) running = SMState("Running", value=RockState.RUNNING) stopped = SMState("Stopped", value=RockState.STOPPED) + archiving = SMState("Archiving", value=RockState.ARCHIVING) + archived = SMState("Archived", value=RockState.ARCHIVED) deleted = SMState("Deleted", final=True, value=RockState.DELETED) # Transitions @@ -53,7 +63,12 @@ class SandboxStateMachine(StateChart): stop_noop = stopped.to(stopped) alive = pending.to(running) restart = stopped.to(pending) - delete = stopped.to(deleted) + delete = stopped.to(deleted) | archived.to(deleted) + archive = stopped.to(archiving) + archive_done = archiving.to(archived) + archive_failed = archiving.to(stopped) + restore = archived.to(pending) + restore_failed = pending.to(archived) def __init__(self, **kwargs): """Initialize with optional sandbox_info.""" @@ -62,6 +77,24 @@ def __init__(self, **kwargs): # Callbacks + def before_transition(self, event, source, target): + if source.value == target.value: + return + if self.sandbox_info is None: + self.sandbox_info = {} + history = self.sandbox_info.setdefault("state_history", []) + history.append( + { + "from_state": source.value.value, + "to_state": target.value.value, + "event": str(event), + "timestamp": get_iso8601_timestamp(), + } + ) + # Cap history to avoid unbounded growth in long-lived sandboxes + if len(history) > 100: + del history[:-100] + async def on_stop(self, sandbox_id: str, operator, meta_store, reason: StopReason = StopReason.MANUAL) -> None: logger.info(f"stop sandbox {sandbox_id} (reason={reason.value})") sandbox_info = self.sandbox_info or {} @@ -98,6 +131,8 @@ async def on_alive(self, sandbox_id: str, meta_store, sandbox_info: SandboxInfo) sandbox_info["state"] = RockState.RUNNING if not sandbox_info.get("start_time"): sandbox_info["start_time"] = get_iso8601_timestamp() + if self.sandbox_info and "state_history" in self.sandbox_info: + sandbox_info["state_history"] = self.sandbox_info["state_history"] await meta_store.update(sandbox_id, sandbox_info) async def on_restart(self, sandbox_id: str, operator, meta_store) -> None: @@ -136,6 +171,7 @@ async def on_restart(self, sandbox_id: str, operator, meta_store) -> None: new_info = dict(info) new_info["state"] = RockState.PENDING new_info.pop("stop_time", None) + new_info.pop("phases", None) await meta_store.update(sandbox_id, new_info) await meta_store.update_timeout(sandbox_id, timeout_info) @@ -145,6 +181,8 @@ async def on_delete( operator, meta_store, reason: DeleteReason = DeleteReason.MANUAL, + dir_storage=None, + image_storage=None, ) -> None: logger.info(f"delete sandbox {sandbox_id} (reason={reason.value})") sandbox_info = self.sandbox_info or {} @@ -168,11 +206,119 @@ async def on_delete( "rely on ContainerCleanupTask to reap docker container" ) + if sandbox_info.get("archive_time") and dir_storage and image_storage: + prefix = sandbox_info.get("archive_prefix", "rock-archives/") + acr_ns = sandbox_info.get("acr_namespace", "sandbox_archive") + key = ArchiveKeys.dir_key(sandbox_id, prefix) + ref = ArchiveKeys.image_ref(sandbox_id, image_storage.registry_url, acr_ns) + try: + await dir_storage.delete(key) + except Exception as e: + logger.warning(f"delete: cleanup archive dir {key} failed: {e}") + try: + await image_storage.delete(ref) + except Exception as e: + logger.warning(f"delete: cleanup archive image {ref} failed: {e}") + sandbox_info["state"] = RockState.DELETED sandbox_info["delete_time"] = get_iso8601_timestamp() await meta_store.archive(sandbox_id, sandbox_info) self.sandbox_info = sandbox_info + async def on_archive( + self, + sandbox_id: str, + meta_store, + operator=None, + dir_storage=None, + image_storage=None, + archive_params: dict | None = None, + ) -> None: + logger.info(f"archive sandbox {sandbox_id}") + sandbox_info = self.sandbox_info or {} + archive_params = archive_params or {} + prefix = archive_params.get("archive_prefix", "rock-archives/") + acr_ns = archive_params.get("acr_namespace", "sandbox_archive") + + sandbox_info["state"] = RockState.ARCHIVING + sandbox_info["archive_prefix"] = prefix + sandbox_info["acr_namespace"] = acr_ns + await meta_store.archive(sandbox_id, sandbox_info) + self.sandbox_info = sandbox_info + + if operator: + spec = sandbox_info.get("spec") or {} + config = DockerDeploymentConfig(**spec) + await operator.start_archive( + config=config, + host_ip=sandbox_info.get("host_ip"), + dir_storage_config=dir_storage.client_config, + image_storage_config=image_storage.client_config, + archive_params=archive_params, + ) + + async def on_archive_done(self, sandbox_id: str, meta_store) -> None: + logger.info(f"archive done sandbox {sandbox_id}") + sandbox_info = self.sandbox_info or {} + sandbox_info["state"] = RockState.ARCHIVED + sandbox_info["archive_time"] = get_iso8601_timestamp() + await meta_store.archive(sandbox_id, sandbox_info) + self.sandbox_info = sandbox_info + + async def on_archive_failed(self, sandbox_id: str, meta_store, reason: str = "") -> None: + logger.info(f"archive failed sandbox {sandbox_id}: {reason}") + sandbox_info = self.sandbox_info or {} + sandbox_info["state"] = RockState.STOPPED + sandbox_info.pop("archive_time", None) + await meta_store.archive(sandbox_id, sandbox_info) + self.sandbox_info = sandbox_info + + async def on_restore( + self, + sandbox_id: str, + meta_store, + timeout_info: dict | None = None, + operator=None, + dir_storage=None, + image_storage=None, + ) -> None: + """ARCHIVED → PENDING: fire-and-forget actor does pull+download+docker start. + + Writes to Redis so that operator.get_status can probe alive status + (PENDING alive detection drives the PENDING → RUNNING transition). + """ + logger.info(f"restore sandbox {sandbox_id}") + sandbox_info = self.sandbox_info or {} + sandbox_info["state"] = RockState.PENDING + sandbox_info.pop("stop_time", None) + await meta_store.update(sandbox_id, sandbox_info) + if timeout_info: + await meta_store.update_timeout(sandbox_id, timeout_info) + self.sandbox_info = sandbox_info + + if operator: + spec = sandbox_info.get("spec") or {} + archive_params = { + "archive_prefix": sandbox_info.get("archive_prefix", "rock-archives/"), + "acr_namespace": sandbox_info.get("acr_namespace", "sandbox_archive"), + } + config = DockerDeploymentConfig(**spec) + await operator.start_restore( + config=config, + host_ip=sandbox_info.get("host_ip"), + dir_storage_config=dir_storage.client_config, + image_storage_config=image_storage.client_config, + archive_params=archive_params, + ) + + async def on_restore_failed(self, sandbox_id: str, meta_store, reason: str = "") -> None: + """PENDING → ARCHIVED: timeout or unrecoverable error during restore.""" + logger.info(f"restore failed sandbox {sandbox_id}: {reason}") + sandbox_info = self.sandbox_info or {} + sandbox_info["state"] = RockState.ARCHIVED + await meta_store.archive(sandbox_id, sandbox_info) + self.sandbox_info = sandbox_info + @classmethod async def from_state_value(cls, state_value: str | None, sandbox_info: SandboxInfo) -> "SandboxStateMachine": """Create a state machine restored to *state_value* (from Redis/DB).""" @@ -180,6 +326,8 @@ async def from_state_value(cls, state_value: str | None, sandbox_info: SandboxIn RockState.PENDING: "pending", RockState.RUNNING: "running", RockState.STOPPED: "stopped", + RockState.ARCHIVING: "archiving", + RockState.ARCHIVED: "archived", RockState.DELETED: "deleted", } sm = ( diff --git a/rock/sdk/sandbox/client.py b/rock/sdk/sandbox/client.py index 85fbc4e605..65eb08a911 100644 --- a/rock/sdk/sandbox/client.py +++ b/rock/sdk/sandbox/client.py @@ -344,6 +344,27 @@ async def restart(self): f"Failed to restart sandbox within {self.config.startup_timeout}s, sandbox: {str(self)}" ) + async def archive(self): + """Archive a stopped sandbox (snapshot container + upload logs to S3). + + The sandbox must be in STOPPED state. After this call the server transitions + it to ARCHIVING, and a background scanner will move it to ARCHIVED once + both the image and log uploads are confirmed. + """ + if not self.sandbox_id: + raise Exception("sandbox_id is not set, cannot archive") + url = f"{self._url}/archive" + headers = self._build_headers() + data = {"sandbox_id": self.sandbox_id} + response = await HttpUtils.post(url, headers, data) + logging.debug(f"Archive sandbox response: {response}") + if "Success" != response.get("status"): + result = response.get("result", None) + if result is not None: + rock_response = SandboxResponse(**result) + raise_for_code(rock_response.code, f"Failed to archive sandbox: {response}") + raise Exception(f"Failed to archive sandbox: {response}") + async def commit(self, image_tag: str, username: str, password: str): if not self.sandbox_id: return diff --git a/scripts/gen_ddl.py b/scripts/gen_ddl.py index 584bb25b21..64aa09f632 100644 --- a/scripts/gen_ddl.py +++ b/scripts/gen_ddl.py @@ -3,10 +3,14 @@ Usage: uv run python scripts/gen_ddl.py # default: postgresql uv run python scripts/gen_ddl.py --dialect sqlite - uv run python scripts/gen_ddl.py --dialect postgresql --out ddl.sql + uv run python scripts/gen_ddl.py --table sandbox_record --out sql/sandbox_record.sql + uv run python scripts/gen_ddl.py --alter-from sql/sandbox_record.sql # diff against old DDL file + uv run python scripts/gen_ddl.py --alter-from HEAD~1 # diff against git commit/tag + uv run python scripts/gen_ddl.py --alter-from v1.3.0 # diff against git tag """ import argparse +import re import sys from sqlalchemy.schema import CreateIndex, CreateTable @@ -25,12 +29,20 @@ def get_dialect(name: str): sys.exit(1) -def gen_ddl(dialect) -> str: +def gen_ddl(dialect, table_filter: str | None = None) -> str: # Import here so all models are registered onto Base.metadata from rock.admin.core.schema import Base # noqa: F401 (side-effect: registers SandboxRecord) + tables = Base.metadata.sorted_tables + if table_filter: + names = {n.strip() for n in table_filter.split(",")} + tables = [t for t in tables if t.name in names] + if not tables: + print(f"No tables matched: {table_filter}", file=sys.stderr) + sys.exit(1) + lines: list[str] = [] - for table in Base.metadata.sorted_tables: + for table in tables: lines.append(str(CreateTable(table).compile(dialect=dialect)).strip() + ";") # table.indexes has set-like semantics; sort for deterministic output. for index in sorted(table.indexes, key=lambda idx: idx.name or ""): @@ -39,21 +51,120 @@ def gen_ddl(dialect) -> str: return "\n\n".join(lines) +def _parse_columns_from_sql(sql: str) -> dict[str, str]: + """Parse column definitions from a CREATE TABLE statement. Returns {column_name: full_definition}.""" + match = re.search(r"CREATE TABLE (\w+)\s*\((.*)\)", sql, re.DOTALL) + if not match: + return {} + body = match.group(2) + columns: dict[str, str] = {} + for line in body.split("\n"): + line = line.strip().rstrip(",") + if not line or line.startswith("PRIMARY KEY") or line.startswith("CONSTRAINT") or line.startswith(")"): + continue + parts = line.split() + if parts: + columns[parts[0].lower()] = line + return columns + + +def _parse_indexes_from_sql(sql: str) -> dict[str, str]: + """Parse CREATE INDEX statements. Returns {index_name: full_statement}.""" + indexes: dict[str, str] = {} + for match in re.finditer(r"(CREATE\s+(?:UNIQUE\s+)?INDEX\s+(\w+)\s+[^;]+);", sql, re.IGNORECASE): + indexes[match.group(2).lower()] = match.group(1).strip() + ";" + return indexes + + +def _read_old_sql(source: str) -> str: + """Read old DDL from a file path or git ref (commit/tag). + + If source looks like a file path (contains '/' or '.sql'), read it directly. + Otherwise treat it as a git ref and read sql/sandbox_record.sql from that ref. + """ + if "/" in source or source.endswith(".sql"): + with open(source) as f: + return f.read() + import subprocess + + # Try each known SQL path under the git ref + for sql_path in ["sql/sandbox_record.sql"]: + try: + return subprocess.check_output( + ["git", "show", f"{source}:{sql_path}"], stderr=subprocess.DEVNULL, text=True + ) + except subprocess.CalledProcessError: + continue + print(f"Cannot find DDL file in git ref '{source}'", file=sys.stderr) + sys.exit(1) + + +def gen_alter(dialect, source: str, table_filter: str | None = None) -> str: + """Compare old DDL (file or git ref) against current ORM and generate ALTER TABLE statements.""" + from rock.admin.core.schema import Base + + old_sql = _read_old_sql(source) + + old_columns = _parse_columns_from_sql(old_sql) + old_indexes = _parse_indexes_from_sql(old_sql) + + tables = Base.metadata.sorted_tables + if table_filter: + names = {n.strip() for n in table_filter.split(",")} + tables = [t for t in tables if t.name in names] + else: + table_match = re.search(r"CREATE TABLE (\w+)", old_sql) + if table_match: + target = table_match.group(1).lower() + tables = [t for t in tables if t.name == target] + + if not tables: + print("No matching table found", file=sys.stderr) + sys.exit(1) + + lines: list[str] = [] + for table in tables: + new_ddl = str(CreateTable(table).compile(dialect=dialect)).strip() + new_columns = _parse_columns_from_sql(new_ddl) + + for col_name, col_def in new_columns.items(): + if col_name not in old_columns: + lines.append(f"ALTER TABLE {table.name} ADD COLUMN {col_def};") + + for idx in sorted(table.indexes, key=lambda i: i.name or ""): + idx_name = (idx.name or "").lower() + if idx_name and idx_name not in old_indexes: + lines.append(str(CreateIndex(idx).compile(dialect=dialect)).strip() + ";") + + return "\n\n".join(lines) if lines else "-- No changes detected" + + def main() -> None: parser = argparse.ArgumentParser(description="Generate DDL from ORM schema") parser.add_argument("--dialect", default="postgresql", choices=["postgresql", "sqlite"]) + parser.add_argument("--table", default=None, help="Comma-separated table names to generate (default: all)") + parser.add_argument( + "--alter-from", + default=None, + dest="alter_from", + help="Old DDL file path or git ref (commit/tag) to diff against", + ) parser.add_argument("--out", default=None, help="Output file path (default: stdout)") args = parser.parse_args() dialect = get_dialect(args.dialect) - ddl = gen_ddl(dialect) + + if args.alter_from: + result = gen_alter(dialect, args.alter_from, table_filter=args.table) + else: + result = gen_ddl(dialect, table_filter=args.table) if args.out: with open(args.out, "w") as f: - f.write(ddl + "\n") + f.write(result + "\n") print(f"Written to {args.out}") else: - print(ddl) + print(result) if __name__ == "__main__": diff --git a/scripts/gen_statemachine_diagram.py b/scripts/gen_statemachine_diagram.py index c618c9be52..b018bdbf3f 100644 --- a/scripts/gen_statemachine_diagram.py +++ b/scripts/gen_statemachine_diagram.py @@ -4,9 +4,25 @@ uv run python scripts/gen_statemachine_diagram.py """ +import re + +import pydot from statemachine.contrib.diagram import DotGraphMachine from rock.sandbox.sandbox_statemachine import SandboxStateMachine -DotGraphMachine(SandboxStateMachine)().write_png("sandbox_statemachine.png") +INTERMEDIATE_STATES = {"pending", "archiving"} + +graph = DotGraphMachine(SandboxStateMachine)() +dot_src = graph.to_string() + +for state in INTERMEDIATE_STATES: + dot_src = re.sub( + rf"({state} \[.*?)fillcolor=white", + r'\1fillcolor="#FFF3CD", color="#856404"', + dot_src, + ) + +(styled_graph,) = pydot.graph_from_dot_data(dot_src) +styled_graph.write_png("sandbox_statemachine.png") print("Written to sandbox_statemachine.png") diff --git a/sql/sandbox_record.sql b/sql/sandbox_record.sql index b02bb565a0..0c3405d7a6 100644 --- a/sql/sandbox_record.sql +++ b/sql/sandbox_record.sql @@ -10,6 +10,8 @@ CREATE TABLE sandbox_record ( create_time VARCHAR(64) NOT NULL, start_time VARCHAR(64), stop_time VARCHAR(64), + archive_time VARCHAR(64), + delete_time VARCHAR(64), host_name VARCHAR(255), auth_token VARCHAR(512), rock_authorization_encrypted VARCHAR(1024), @@ -23,20 +25,20 @@ CREATE TABLE sandbox_record ( PRIMARY KEY (sandbox_id) ); -CREATE INDEX ix_sandbox_record_image ON sandbox_record (image); +CREATE INDEX ix_sandbox_record_cluster_name ON sandbox_record (cluster_name); -CREATE INDEX ix_sandbox_record_host_ip ON sandbox_record (host_ip); +CREATE INDEX ix_sandbox_record_create_user_gray_flag ON sandbox_record (create_user_gray_flag); -CREATE INDEX ix_sandbox_record_host_name ON sandbox_record (host_name); +CREATE INDEX ix_sandbox_record_experiment_id ON sandbox_record (experiment_id); -CREATE INDEX ix_sandbox_record_state ON sandbox_record (state); +CREATE INDEX ix_sandbox_record_host_ip ON sandbox_record (host_ip); -CREATE INDEX ix_sandbox_record_create_user_gray_flag ON sandbox_record (create_user_gray_flag); +CREATE INDEX ix_sandbox_record_host_name ON sandbox_record (host_name); -CREATE INDEX ix_sandbox_record_user_id ON sandbox_record (user_id); +CREATE INDEX ix_sandbox_record_image ON sandbox_record (image); CREATE INDEX ix_sandbox_record_namespace ON sandbox_record (namespace); -CREATE INDEX ix_sandbox_record_experiment_id ON sandbox_record (experiment_id); +CREATE INDEX ix_sandbox_record_state ON sandbox_record (state); -CREATE INDEX ix_sandbox_record_cluster_name ON sandbox_record (cluster_name); +CREATE INDEX ix_sandbox_record_user_id ON sandbox_record (user_id); diff --git a/test_archive_manual_e2e.py b/test_archive_manual_e2e.py new file mode 100644 index 0000000000..22151fe269 --- /dev/null +++ b/test_archive_manual_e2e.py @@ -0,0 +1,229 @@ +""" +Archive P0 手动 E2E 测试脚本(基于 SDK) + +覆盖场景: + A. stopped → archived(archive 全流程) + B. archived → restoring → running(restore 全流程) + C. archived → deleted(直接从 archived 删除) + +交互式流程:start → stop → archive → wait ARCHIVED → restart(restore) → wait RUNNING + → stop → archive → wait ARCHIVED → delete(from archived) + +前置条件: + 1. MinIO 容器运行中(端口 9000),已创建 bucket "rock-archive-test" + 2. Docker Registry 容器运行中(端口 5000,开启 delete) + 3. admin 服务运行中(端口 8080),已注入 archive storage + +启动命令: + uv run python test_archive_manual_e2e.py + +环境变量(可选): + ROCK_BASE_URL admin 地址,默认 http://localhost:8080 + ROCK_IMAGE 测试用镜像,默认 python:3.11 + ARCHIVE_TIMEOUT 等待 ARCHIVED 超时秒数,默认 300 +""" + +import asyncio +import logging +import os +import sys +import time + +logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s", stream=sys.stdout) +logger = logging.getLogger(__name__) + +BASE_URL = os.getenv("ROCK_BASE_URL", "http://localhost:8080") +IMAGE = os.getenv("ROCK_IMAGE", "python:3.11") +ARCHIVE_TIMEOUT = int(os.getenv("ARCHIVE_TIMEOUT", "300")) +INTERACTIVE = os.getenv("INTERACTIVE", "0") == "1" + + +def banner(step: str, msg: str): + logger.info(f"\n{'=' * 70}\n [{step}] {msg}\n{'=' * 70}") + + +def pause(hint: str = ""): + if hint: + logger.info(f"\n 手动检查提示:\n{hint}") + if INTERACTIVE: + input("\n >>> 按 Enter 继续下一步...") + else: + logger.info(" (非交互模式,自动继续)") + + +async def wait_state(sandbox, target_state: str, timeout: int = ARCHIVE_TIMEOUT): + """轮询 get_status 直到 state 达到 target,或超时。""" + start = time.time() + while time.time() - start < timeout: + try: + status = await sandbox.get_status(include_all_states=True) + except Exception as e: + logger.info(f" get_status 暂时失败: {e}") + await asyncio.sleep(5) + continue + current = status.state + logger.info(f" 当前 state: {current}") + if current == target_state: + return status + await asyncio.sleep(5) + raise TimeoutError(f"等待 state={target_state} 超时({timeout}s)") + + +async def wait_state_transition(sandbox, intermediate_state: str, final_state: str, timeout: int = ARCHIVE_TIMEOUT): + """轮询 get_status,验证经过 intermediate_state 后到达 final_state。""" + start = time.time() + saw_intermediate = False + while time.time() - start < timeout: + try: + status = await sandbox.get_status(include_all_states=True) + except Exception as e: + logger.info(f" get_status 暂时失败: {e}") + await asyncio.sleep(3) + continue + current = status.state + logger.info(f" 当前 state: {current}") + if current == intermediate_state: + saw_intermediate = True + if current == final_state: + return status, saw_intermediate + await asyncio.sleep(3) + raise TimeoutError(f"等待 state={final_state} 超时({timeout}s)") + + +async def main(): + from rock.sdk.sandbox.client import Sandbox + from rock.sdk.sandbox.config import SandboxConfig + + config = SandboxConfig( + base_url=BASE_URL, + image=IMAGE, + auto_clear_seconds=600, + auto_delete_seconds=600, + memory="1g", + cpus=1, + ) + sandbox = Sandbox(config) + sandbox_id = None + + try: + # ═══════════════════════════════════════════════════════════════════ + # SCENARIO A: stopped → archived + # ═══════════════════════════════════════════════════════════════════ + + banner("1/11", "启动 Sandbox") + await sandbox.start() + sandbox_id = sandbox.sandbox_id + logger.info(f" sandbox_id: {sandbox_id}") + logger.info(f" host_ip: {sandbox.host_ip}") + + pause(f" docker ps --format '{{{{.Names}}}} {{{{.Status}}}}' | grep {sandbox_id}") + + banner("2/11", "停止 Sandbox") + await sandbox.stop() + await asyncio.sleep(3) + status = await sandbox.get_status(include_all_states=True) + logger.info(f" state: {status.state} (应为 stopped)") + assert status.state == "stopped", f"预期 stopped,实际 {status.state}" + + banner("3/11", "触发 Archive (stopped → archiving → archived)") + await sandbox.archive() + logger.info(" archive 已触发,等待 ARCHIVED...") + + banner("4/11", f"等待扫描器推进到 ARCHIVED(超时 {ARCHIVE_TIMEOUT}s)") + status = await wait_state(sandbox, "archived") + logger.info(" ✓ stopped → archived 成功!") + + pause( + f" # MinIO 应有 tar.gz:\n" + f" python3 -c \"import boto3; s3=boto3.client('s3',endpoint_url='http://localhost:9000',aws_access_key_id='rockadmin',aws_secret_access_key='rockadmin123',region_name='us-east-1'); print([o['Key'] for o in s3.list_objects_v2(Bucket='rock-archive-test',Prefix='rock-archives/datalog-{sandbox_id}').get('Contents',[])])\"\n\n" + f" # Registry 应有 tag:\n" + f" curl -s http://localhost:5000/v2/rock-snapshots/archived-{sandbox_id}/tags/list" + ) + + # ═══════════════════════════════════════════════════════════════════ + # SCENARIO B: archived → restoring → running + # ═══════════════════════════════════════════════════════════════════ + + banner("5/11", "Restart(从 ARCHIVED 恢复 → RESTORING → RUNNING)") + logger.info(" 调用 restart → 内部 restore + docker start(异步)") + await sandbox.restart() + + banner("6/11", "等待经过 RESTORING 到达 RUNNING") + status, saw_restoring = await wait_state_transition(sandbox, "restoring", "running", timeout=180) + if saw_restoring: + logger.info(" ✓ 观察到 RESTORING 中间状态!") + else: + logger.warning(" ⚠ 未捕获到 RESTORING(可能转换太快),但最终到达 RUNNING") + logger.info(" ✓ archived → restoring → running 成功!") + + pause( + f" # 容器应重新运行:\n" + f" docker ps --format '{{{{.Names}}}} {{{{.Status}}}}' | grep {sandbox_id}\n\n" + f" # API 应返回 running:\n" + f" curl -s 'http://localhost:8080/apis/envs/sandbox/v1/get_status?sandbox_id={sandbox_id}&include_all_states=True' | python3 -m json.tool | grep state" + ) + + # ═══════════════════════════════════════════════════════════════════ + # SCENARIO C: archived → deleted (直接从 archived 删除) + # ═══════════════════════════════════════════════════════════════════ + + banner("7/11", "停止 Sandbox(为第二次 archive 做准备)") + await sandbox.stop() + await asyncio.sleep(3) + status = await sandbox.get_status(include_all_states=True) + assert status.state == "stopped", f"预期 stopped,实际 {status.state}" + logger.info(" ✓ stopped") + + banner("8/11", "第二次 Archive") + await sandbox.archive() + logger.info(" archive 已触发") + + banner("9/11", "等待第二次 ARCHIVED") + status = await wait_state(sandbox, "archived") + logger.info(" ✓ 再次到达 ARCHIVED!") + + banner("10/11", "从 ARCHIVED 直接 Delete(验证 archived → deleted)") + await sandbox.delete() + logger.info(" delete 已调用") + await asyncio.sleep(2) + + banner("11/11", "验证最终状态为 DELETED") + status = await sandbox.get_status(include_all_states=True) + logger.info(f" state: {status.state}") + assert status.state == "deleted", f"预期 deleted,实际 {status.state}" + logger.info(" ✓ archived → deleted 成功!") + + pause( + f" # 容器应已删除:\n" + f" docker ps -a | grep {sandbox_id} && echo 'FAIL' || echo 'OK: 已删除'\n\n" + f" # MinIO archive 对象应已清理:\n" + f" python3 -c \"import boto3; s3=boto3.client('s3',endpoint_url='http://localhost:9000',aws_access_key_id='rockadmin',aws_secret_access_key='rockadmin123',region_name='us-east-1'); objs=s3.list_objects_v2(Bucket='rock-archive-test',Prefix='rock-archives/datalog-{sandbox_id}').get('Contents',[]); print('FAIL: 未清理', [o['Key'] for o in objs]) if objs else print('OK: 已清理')\"\n\n" + f" # Registry tag 应已删除:\n" + f" curl -s http://localhost:5000/v2/rock-snapshots/archived-{sandbox_id}/tags/list" + ) + + logger.info("\n" + "=" * 70) + logger.info(" ALL SCENARIOS PASSED:") + logger.info(" A. stopped → archived ✓") + logger.info(" B. archived → restoring → running ✓") + logger.info(" C. archived → deleted ✓") + logger.info("=" * 70) + + except Exception as e: + logger.error(f"\n 测试失败: {e}", exc_info=True) + sys.exit(1) + finally: + if sandbox_id: + logger.info(f"\n [Cleanup] sandbox_id: {sandbox_id}") + try: + await sandbox.stop() + except Exception: + pass + try: + await sandbox.delete() + except Exception: + pass + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/tests/integration/archive/__init__.py b/tests/integration/archive/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/tests/integration/archive/test_archive_e2e.py b/tests/integration/archive/test_archive_e2e.py new file mode 100644 index 0000000000..430db1068e --- /dev/null +++ b/tests/integration/archive/test_archive_e2e.py @@ -0,0 +1,210 @@ +"""E2E integration tests for archive + restore + delete full cycle. + +Uses real MinIO + Docker Registry + a real docker container. +Does NOT require Ray — instantiates the underlying SandboxActor class directly. +""" + +import hashlib +import os +import subprocess +import tempfile +from pathlib import Path +from unittest.mock import AsyncMock, MagicMock + +import pytest + +from rock.actions.sandbox.response import State +from rock.sandbox.archive.constants import ArchiveKeys +from rock.sandbox.archive.registry_v2 import DockerRegistryV2ImageStorage +from rock.sandbox.archive.s3_storage import S3DirStorage + +pytestmark = [pytest.mark.integration] + + +@pytest.fixture +def sandbox_container(): + """Create a real docker container for archive testing.""" + container_name = f"test-archive-e2e-{os.getpid()}" + subprocess.run( + ["docker", "run", "-d", "--name", container_name, "busybox:latest", "sleep", "3600"], + check=True, + capture_output=True, + ) + yield container_name + subprocess.run(["docker", "rm", "-f", container_name], capture_output=True) + + +@pytest.fixture +def log_dir(sandbox_container): + """Create a temp log dir with known content and return (path, file_hashes).""" + base = tempfile.mkdtemp(prefix="rock-archive-e2e-logs-") + log_path = Path(base) / sandbox_container + log_path.mkdir(parents=True) + + hashes = {} + for name, content in [("app.log", b"hello world\n" * 100), ("err.log", b"error line\n")]: + p = log_path / name + p.write_bytes(content) + hashes[name] = hashlib.sha256(content).hexdigest() + + yield str(base), hashes + + import shutil + + shutil.rmtree(base, ignore_errors=True) + + +@pytest.fixture +def dir_storage(local_minio): + endpoint, access_key, secret_key, bucket = local_minio + return S3DirStorage( + endpoint=endpoint, bucket=bucket, access_key_id=access_key, access_key_secret=secret_key, region="us-east-1" + ) + + +@pytest.fixture +def image_storage(local_snapshot_registry): + registry_url, username, password = local_snapshot_registry + return DockerRegistryV2ImageStorage(registry_url=registry_url, username=username, password=password) + + +@pytest.fixture +def actor(sandbox_container): + """Create SandboxActor instance (no Ray) using the underlying class.""" + from rock.sandbox.sandbox_actor import SandboxActor + + ActorClass = SandboxActor.__ray_actor_class__ + + config = MagicMock() + config.container_name = sandbox_container + deployment = MagicMock() + deployment.restart = AsyncMock() + deployment.restart_from_image = AsyncMock() + + actor = ActorClass.__new__(ActorClass) + actor._config = config + actor._deployment = deployment + actor._clean_container_background = MagicMock() + actor._setup_monitor = AsyncMock() + return actor + + +class TestArchiveRestoreDeleteE2E: + """Full lifecycle: archive → verify exists → restore → verify restored → delete cleanup.""" + + @pytest.fixture(autouse=True) + def _patch_actor_storage(self, monkeypatch, log_dir): + """Patch actor to use local S3 instead of OSS, and set ROCK_LOGGING_PATH.""" + import rock.sandbox.archive.oss_storage as oss_mod + + monkeypatch.setattr(oss_mod, "OssDirStorage", S3DirStorage) + + import rock.env_vars as _env_vars + + log_root, _ = log_dir + monkeypatch.setattr(_env_vars, "ROCK_LOGGING_PATH", log_root) + + async def test_full_cycle(self, actor, dir_storage, image_storage, log_dir, sandbox_container, local_minio): + log_root, original_hashes = log_dir + + dir_cfg = dir_storage.client_config + img_cfg = image_storage.client_config + + # === ARCHIVE === + await actor.archive(dir_cfg, img_cfg) + + key = ArchiveKeys.dir_key(sandbox_container, "rock-archives/") + ref = ArchiveKeys.image_ref(sandbox_container, image_storage.registry_url, "sandbox_archive") + + assert await dir_storage.exists(key), "Archive dir object should exist in MinIO" + assert await image_storage.exists(ref), "Archive image should exist in registry" + + # === SIMULATE "LOCAL IS GONE" === + log_path = Path(log_root) / sandbox_container + import shutil + + shutil.rmtree(str(log_path), ignore_errors=True) + assert not log_path.exists() + + local_tag = f"archive-staging-{sandbox_container}:latest" + subprocess.run(["docker", "rmi", local_tag], capture_output=True) + subprocess.run(["docker", "rmi", ref], capture_output=True) + + # === RESTORE === + await actor.restore_and_start(dir_cfg, img_cfg) + + assert log_path.exists(), "Log dir should be restored" + for name, expected_hash in original_hashes.items(): + restored = (log_path / name).read_bytes() + actual_hash = hashlib.sha256(restored).hexdigest() + assert actual_hash == expected_hash, f"File {name} content mismatch after restore" + + result = subprocess.run(["docker", "image", "inspect", ref], capture_output=True) + assert result.returncode == 0, "Image should be pullable locally after restore" + + # Remote still intact + assert await dir_storage.exists(key), "Dir object should still exist after restore" + assert await image_storage.exists(ref), "Image should still exist after restore" + + # === DELETE CLEANUP === + await dir_storage.delete(key) + await image_storage.delete(ref) + + assert not await dir_storage.exists(key), "Dir object should be gone after delete" + assert not await image_storage.exists(ref), "Image should be gone after delete" + + async def test_archive_rollback_on_upload_failure( + self, actor, image_storage, sandbox_container, local_minio, monkeypatch + ): + """If dir upload fails, image should be rolled back.""" + import rock.env_vars as _env_vars + + dir_cfg = { + "endpoint": "http://localhost:1", + "bucket": "nonexistent", + "access_key": "x", + "secret_key": "x", + "region": "us-east-1", + } + img_cfg = image_storage.client_config + + log_root = tempfile.mkdtemp(prefix="rock-archive-rollback-") + monkeypatch.setattr(_env_vars, "ROCK_LOGGING_PATH", log_root) + log_path = Path(log_root) / sandbox_container + log_path.mkdir(parents=True) + (log_path / "dummy.log").write_bytes(b"data") + + try: + with pytest.raises(Exception): + await actor.archive(dir_cfg, img_cfg) + + ref = ArchiveKeys.image_ref(sandbox_container, image_storage.registry_url, "sandbox_archive") + assert not await image_storage.exists(ref), "Image should be cleaned up on failure" + finally: + import shutil + + shutil.rmtree(log_root, ignore_errors=True) + + +class TestIdempotentRestore: + """Verify that restoring when already restored is a no-op at the manager level.""" + + async def test_restart_from_archived_skips_if_not_archived(self): + """State machine guard: if state != ARCHIVED, skip entirely.""" + from rock.sandbox.sandbox_manager import SandboxManager + + m = MagicMock(spec=SandboxManager) + m._operator = MagicMock() + m._operator.restore_archive = AsyncMock() + m._dir_storage = MagicMock() + m._image_storage = MagicMock() + + sm = AsyncMock() + sm.current_state.value = State.STOPPED + sm.sandbox_info = {"sandbox_id": "sbx-x"} + m._get_current_statemachine = AsyncMock(return_value=sm) + + m.restart_from_archived = SandboxManager.restart_from_archived.__get__(m, SandboxManager) + await m.restart_from_archived("sbx-x") + + m._operator.restore_archive.assert_not_called() diff --git a/tests/integration/archive/test_registry_v2.py b/tests/integration/archive/test_registry_v2.py new file mode 100644 index 0000000000..a097d881a8 --- /dev/null +++ b/tests/integration/archive/test_registry_v2.py @@ -0,0 +1,77 @@ +import subprocess + +import pytest + +from rock.sandbox.archive.registry_v2 import DockerRegistryV2ImageStorage + + +@pytest.fixture +def image_storage(local_snapshot_registry) -> DockerRegistryV2ImageStorage: + registry_url, username, password = local_snapshot_registry + return DockerRegistryV2ImageStorage(registry_url=registry_url, username=username, password=password) + + +@pytest.fixture(scope="session") +def test_container(): + """Create a busybox container for commit tests.""" + name = "test-archive-source" + subprocess.run(["docker", "rm", "-f", name], capture_output=True) + subprocess.run( + ["docker", "run", "-d", "--name", name, "busybox:latest", "sleep", "3600"], + check=True, + capture_output=True, + ) + yield name + subprocess.run(["docker", "rm", "-f", name], capture_output=True) + + +@pytest.fixture +def local_image(test_container): + """Commit the test container to a local image tag.""" + tag = "archive-test-local:latest" + subprocess.run(["docker", "commit", test_container, tag], check=True, capture_output=True) + yield tag + subprocess.run(["docker", "rmi", tag], capture_output=True) + + +class TestDockerRegistryV2ImageStorage: + async def test_push_then_exists(self, image_storage, local_image): + ref = f"{image_storage.registry_url}/test-ns/myimg:v1" + await image_storage.push_from_local(local_image, ref) + assert await image_storage.exists(ref) is True + await image_storage.delete(ref) + + async def test_push_pull_roundtrip(self, image_storage, local_image): + ref = f"{image_storage.registry_url}/test-ns/roundtrip:v1" + await image_storage.push_from_local(local_image, ref) + + subprocess.run(["docker", "rmi", ref], capture_output=True) + + await image_storage.pull_to_local(ref) + result = subprocess.run(["docker", "image", "inspect", ref], capture_output=True) + assert result.returncode == 0 + + subprocess.run(["docker", "rmi", ref], capture_output=True) + await image_storage.delete(ref) + + async def test_pull_nonexistent_raises(self, image_storage): + ref = f"{image_storage.registry_url}/test-ns/nosuch:v999" + with pytest.raises(RuntimeError): + await image_storage.pull_to_local(ref) + + async def test_delete_then_not_exists(self, image_storage, local_image): + ref = f"{image_storage.registry_url}/test-ns/todelete:v1" + await image_storage.push_from_local(local_image, ref) + assert await image_storage.delete(ref) is True + assert await image_storage.exists(ref) is False + + async def test_delete_nonexistent(self, image_storage): + ref = f"{image_storage.registry_url}/test-ns/nope:v1" + assert await image_storage.delete(ref) is False + + async def test_push_idempotent(self, image_storage, local_image): + ref = f"{image_storage.registry_url}/test-ns/idempotent:v1" + await image_storage.push_from_local(local_image, ref) + await image_storage.push_from_local(local_image, ref) + assert await image_storage.exists(ref) is True + await image_storage.delete(ref) diff --git a/tests/integration/archive/test_s3_storage.py b/tests/integration/archive/test_s3_storage.py new file mode 100644 index 0000000000..82462f5911 --- /dev/null +++ b/tests/integration/archive/test_s3_storage.py @@ -0,0 +1,87 @@ +import os +import tempfile + +import pytest + +from rock.sandbox.archive.s3_storage import S3DirStorage + + +@pytest.fixture +def s3_storage(local_minio): + endpoint, access_key, secret_key, bucket = local_minio + return S3DirStorage( + endpoint=endpoint, + bucket=bucket, + access_key_id=access_key, + access_key_secret=secret_key, + region="us-east-1", + ) + + +@pytest.fixture +def sample_dir(): + with tempfile.TemporaryDirectory() as td: + d = os.path.join(td, "mydata") + os.makedirs(d) + with open(os.path.join(d, "file1.txt"), "w") as f: + f.write("hello world") + os.makedirs(os.path.join(d, "subdir")) + with open(os.path.join(d, "subdir", "file2.bin"), "wb") as f: + f.write(b"\x00\x01\x02" * 100) + yield d + + +class TestS3DirStorage: + async def test_upload_exists_delete_cycle(self, s3_storage, sample_dir): + key = "test/cycle.tar.gz" + await s3_storage.upload_dir(sample_dir, key) + assert await s3_storage.exists(key) is True + + assert await s3_storage.delete(key) is True + assert await s3_storage.exists(key) is False + + async def test_delete_nonexistent(self, s3_storage): + assert await s3_storage.delete("no/such/key") is False + + async def test_upload_idempotent(self, s3_storage, sample_dir): + key = "test/idempotent.tar.gz" + await s3_storage.upload_dir(sample_dir, key) + await s3_storage.upload_dir(sample_dir, key) + assert await s3_storage.exists(key) is True + await s3_storage.delete(key) + + async def test_upload_download_roundtrip(self, s3_storage, sample_dir): + key = "test/roundtrip.tar.gz" + await s3_storage.upload_dir(sample_dir, key) + + with tempfile.TemporaryDirectory() as td: + restore_dir = os.path.join(td, "mydata") + await s3_storage.download_to_dir(key, restore_dir) + + assert os.path.isdir(restore_dir) + with open(os.path.join(restore_dir, "file1.txt")) as f: + assert f.read() == "hello world" + with open(os.path.join(restore_dir, "subdir", "file2.bin"), "rb") as f: + assert f.read() == b"\x00\x01\x02" * 100 + + await s3_storage.delete(key) + + async def test_upload_nonexistent_dir_raises(self, s3_storage): + with pytest.raises(FileNotFoundError): + await s3_storage.upload_dir("/tmp/does-not-exist-xyz", "test/nope.tar.gz") + + async def test_download_target_exists_raises(self, s3_storage, sample_dir): + key = "test/exists-check.tar.gz" + await s3_storage.upload_dir(sample_dir, key) + + with pytest.raises(FileExistsError): + await s3_storage.download_to_dir(key, sample_dir) + + await s3_storage.delete(key) + + async def test_download_missing_key_raises(self, s3_storage): + with tempfile.TemporaryDirectory() as td: + target = os.path.join(td, "output") + with pytest.raises(FileNotFoundError): + await s3_storage.download_to_dir("no/key", target) + assert not os.path.exists(target) diff --git a/tests/integration/conftest.py b/tests/integration/conftest.py index b39a9a385d..a101bdce79 100644 --- a/tests/integration/conftest.py +++ b/tests/integration/conftest.py @@ -297,3 +297,126 @@ async def local_registry(): subprocess.run(["docker", "rm", "-f", container_name], capture_output=True) htpasswd_file.unlink(missing_ok=True) auth_dir.rmdir() + + +@pytest.fixture(scope="session") +def local_minio(): + """Start a local MinIO server for archive storage tests.""" + container_name = "test-minio-archive" + subprocess.run(["docker", "rm", "-f", container_name], capture_output=True) + port = run_until_complete(find_free_port()) + console_port = run_until_complete(find_free_port()) + subprocess.run( + [ + "docker", + "run", + "-d", + "--name", + container_name, + "-p", + f"{port}:9000", + "-p", + f"{console_port}:9001", + "-e", + "MINIO_ROOT_USER=rockadmin", + "-e", + "MINIO_ROOT_PASSWORD=rockadmin123", + "minio/minio:RELEASE.2024-12-18T13-15-44Z", + "server", + "/data", + "--console-address", + ":9001", + ], + check=True, + ) + + endpoint = f"http://localhost:{port}" + for _ in range(30): + try: + urllib.request.urlopen(f"{endpoint}/minio/health/live", timeout=1) + break + except (urllib.error.URLError, ConnectionError, OSError): + time.sleep(0.5) + else: + raise RuntimeError(f"MinIO at {endpoint} did not become ready") + + import boto3 + + s3 = boto3.client( + "s3", + endpoint_url=endpoint, + aws_access_key_id="rockadmin", + aws_secret_access_key="rockadmin123", + region_name="us-east-1", + ) + s3.create_bucket(Bucket="rock-archive-test") + + yield endpoint, "rockadmin", "rockadmin123", "rock-archive-test" + + subprocess.run(["docker", "rm", "-f", container_name], capture_output=True) + + +@pytest.fixture(scope="session") +def local_snapshot_registry(): + """Start a local Docker registry with basic auth and delete support.""" + container_name = "test-snapshot-registry" + username, password = "testuser", "testpass" + subprocess.run(["docker", "rm", "-f", container_name], capture_output=True) + port = run_until_complete(find_free_port()) + + auth_dir = tempfile.mkdtemp() + htpasswd_file = os.path.join(auth_dir, "htpasswd") + result = subprocess.run( + ["docker", "run", "--rm", "httpd:2", "htpasswd", "-Bbn", username, password], + capture_output=True, + text=True, + ) + with open(htpasswd_file, "w") as f: + f.write(result.stdout) + + subprocess.run( + [ + "docker", + "run", + "-d", + "--name", + container_name, + "-p", + f"{port}:5000", + "-v", + f"{htpasswd_file}:/auth/htpasswd", + "-e", + "REGISTRY_AUTH=htpasswd", + "-e", + "REGISTRY_AUTH_HTPASSWD_REALM=Registry Realm", + "-e", + "REGISTRY_AUTH_HTPASSWD_PATH=/auth/htpasswd", + "-e", + "REGISTRY_STORAGE_DELETE_ENABLED=true", + "registry:2", + ], + check=True, + ) + + registry_url = f"localhost:{port}" + for _ in range(30): + try: + req = urllib.request.Request(f"http://{registry_url}/v2/") + import base64 + + credentials = base64.b64encode(f"{username}:{password}".encode()).decode() + req.add_header("Authorization", f"Basic {credentials}") + r = urllib.request.urlopen(req, timeout=1) + if r.status == 200: + break + except (urllib.error.URLError, ConnectionError, OSError): + time.sleep(0.5) + else: + raise RuntimeError(f"Registry at {registry_url} did not become ready") + + yield registry_url, username, password + + subprocess.run(["docker", "rm", "-f", container_name], capture_output=True) + import shutil + + shutil.rmtree(auth_dir, ignore_errors=True) diff --git a/tests/unit/admin/entrypoints/test_sandbox_restful_api.py b/tests/unit/admin/entrypoints/test_sandbox_restful_api.py index 554e45c626..3b121228b4 100644 --- a/tests/unit/admin/entrypoints/test_sandbox_restful_api.py +++ b/tests/unit/admin/entrypoints/test_sandbox_restful_api.py @@ -1,4 +1,4 @@ -"""Tests for the RESTful sandbox restart endpoint on sandbox_router.""" +"""Tests for RESTful sandbox endpoints on sandbox_router.""" from unittest.mock import AsyncMock, MagicMock @@ -40,3 +40,28 @@ async def test_restart_restful(sandbox_app): assert body["status"] == "Success" assert body["result"]["sandbox_id"] == "sb-1" mock_manager.restart_async.assert_called_once_with("sb-1") + + +@pytest.mark.asyncio +async def test_archive_allowed(sandbox_app): + app, mock_manager = sandbox_app + mock_manager.rock_config.lifecycle.archive.is_allowed.return_value = True + mock_manager.archive_sandbox = AsyncMock() + async with AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as client: + resp = await client.post(f"{BASE}/sandboxes/sb-1/archive") + assert resp.status_code == 200 + assert resp.json()["status"] == "Success" + mock_manager.archive_sandbox.assert_called_once_with("sb-1") + + +@pytest.mark.asyncio +async def test_archive_forbidden(sandbox_app): + """Unauthorised key: handle_exceptions returns HTTP 200 with status=Failed.""" + app, mock_manager = sandbox_app + mock_manager.rock_config.lifecycle.archive.is_allowed.return_value = False + mock_manager.archive_sandbox = AsyncMock() + async with AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as client: + resp = await client.post(f"{BASE}/sandboxes/sb-1/archive", headers={"X-Key": "bad-key"}) + assert resp.status_code == 200 + assert resp.json()["status"] == "Failed" + mock_manager.archive_sandbox.assert_not_called() diff --git a/tests/unit/deployments/test_docker_restart.py b/tests/unit/deployments/test_docker_restart.py new file mode 100644 index 0000000000..93e15a44a8 --- /dev/null +++ b/tests/unit/deployments/test_docker_restart.py @@ -0,0 +1,52 @@ +"""Tests for DockerDeployment.restart() phase handling.""" + +import json +from unittest.mock import AsyncMock, MagicMock, patch + +import pytest + +from rock.deployments.constants import Status +from rock.deployments.docker import DockerDeployment + + +@pytest.fixture +def deployment(): + with patch("rock.deployments.docker.env_vars") as mock_env: + mock_env.ROCK_WORKER_ENV_TYPE = "docker" + mock_env.ROCK_SERVICE_STATUS_DIR = "/tmp/test_status" + d = DockerDeployment(container_name="sb-1", image="python:3.11", port=22555) + return d + + +class TestRestartPhases: + @pytest.mark.asyncio + async def test_image_pull_is_success_after_restart(self, deployment): + """Restart reuses existing container — image_pull should be marked SUCCESS.""" + status_data = { + "phases": { + "image_pull": {"status": "success", "message": "image pull success"}, + "docker_run": {"status": "success", "message": "docker run success"}, + }, + "port_mapping": {"22555": 22555}, + } + + mock_popen = MagicMock() + with ( + patch.object(deployment, "_docker_start", return_value=mock_popen), + patch("os.path.exists", return_value=True), + patch("builtins.open", create=True) as mock_open, + patch.object(deployment, "_wait_until_alive", new_callable=AsyncMock), + patch("rock.deployments.docker.RemoteSandboxRuntime") as mock_runtime_cls, + ): + mock_open.return_value.__enter__ = lambda s: s + mock_open.return_value.__exit__ = MagicMock(return_value=False) + mock_open.return_value.read = MagicMock(return_value=json.dumps(status_data)) + mock_runtime = MagicMock() + mock_runtime_cls.from_config.return_value = mock_runtime + + # Patch json.load to return status_data + with patch("json.load", return_value=status_data): + await deployment.restart() + + phase = deployment._service_status.get_phase("image_pull") + assert phase.status == Status.SUCCESS, f"Expected image_pull to be SUCCESS after restart, got {phase.status}" diff --git a/tests/unit/sandbox/__init__.py b/tests/unit/sandbox/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/tests/unit/sandbox/archive/__init__.py b/tests/unit/sandbox/archive/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/tests/unit/sandbox/archive/test_archive_not_configured.py b/tests/unit/sandbox/archive/test_archive_not_configured.py new file mode 100644 index 0000000000..aa40d87ddf --- /dev/null +++ b/tests/unit/sandbox/archive/test_archive_not_configured.py @@ -0,0 +1,56 @@ +"""Tests: archive methods must return errors / skip gracefully when archive is not configured.""" + +from unittest.mock import AsyncMock, MagicMock + +import pytest + +from rock.config import SandboxLifecycleConfig +from rock.sdk.common.exceptions import BadRequestRockError + + +@pytest.fixture +def manager_no_archive(): + """SandboxManager-like mock WITHOUT _dir_storage / _image_storage.""" + from rock.sandbox.sandbox_manager import SandboxManager + + m = MagicMock(spec=SandboxManager) + m.rock_config.lifecycle = SandboxLifecycleConfig() + m._meta_store = AsyncMock() + m._operator = MagicMock() + m._operator.supports_archive.return_value = True + + m._dir_storage = None + m._image_storage = None + + m.archive_sandbox = SandboxManager.archive_sandbox.__get__(m, SandboxManager) + m.restart_from_archived = SandboxManager.restart_from_archived.__get__(m, SandboxManager) + m._reconcile_archiving = SandboxManager._reconcile_archiving.__get__(m, SandboxManager) + return m + + +class TestArchiveNotConfigured: + async def test_archive_sandbox_raises_error(self, manager_no_archive): + with pytest.raises(BadRequestRockError, match="archive not configured"): + await manager_no_archive.archive_sandbox("sbx-1") + + async def test_restart_from_archived_raises_error(self, manager_no_archive): + with pytest.raises(BadRequestRockError, match="archive not configured"): + await manager_no_archive.restart_from_archived("sbx-1") + + async def test_reconcile_archiving_skips(self, manager_no_archive): + await manager_no_archive._reconcile_archiving() + manager_no_archive._meta_store.list_by.assert_not_called() + + +class TestArchiveOperatorNotSupported: + async def test_archive_sandbox_raises_error(self, manager_no_archive): + manager_no_archive._dir_storage = AsyncMock() + manager_no_archive._image_storage = AsyncMock() + manager_no_archive._operator.supports_archive.return_value = False + with pytest.raises(BadRequestRockError, match="archive not supported"): + await manager_no_archive.archive_sandbox("sbx-1") + + async def test_reconcile_archiving_skips(self, manager_no_archive): + manager_no_archive._operator.supports_archive.return_value = False + await manager_no_archive._reconcile_archiving() + manager_no_archive._meta_store.list_by.assert_not_called() diff --git a/tests/unit/sandbox/archive/test_archive_sandbox_orchestration.py b/tests/unit/sandbox/archive/test_archive_sandbox_orchestration.py new file mode 100644 index 0000000000..5dfbaffed1 --- /dev/null +++ b/tests/unit/sandbox/archive/test_archive_sandbox_orchestration.py @@ -0,0 +1,139 @@ +"""Unit tests for SandboxManager.archive_sandbox and restart_from_archived.""" + +from unittest.mock import AsyncMock, MagicMock + +import pytest + +from rock.actions.sandbox.response import State +from rock.sdk.common.exceptions import BadRequestRockError + + +@pytest.fixture +def manager(): + from rock.sandbox.sandbox_manager import SandboxManager + + m = MagicMock(spec=SandboxManager) + m._meta_store = AsyncMock() + m._meta_store._db = AsyncMock() + m._meta_store._db.get = AsyncMock( + return_value={"sandbox_id": "sbx-1", "spec": {"container_name": "sbx-1", "image": "img:latest"}} + ) + m._operator = MagicMock() + m._operator.supports_archive.return_value = True + m._operator.start_archive = AsyncMock() + m._operator.start_restore = AsyncMock() + m._dir_storage = AsyncMock() + m._dir_storage.client_config = {"endpoint": "e", "bucket": "b", "access_key": "a", "secret_key": "s", "region": "r"} + m._dir_storage.delete = AsyncMock(return_value=True) + m._image_storage = AsyncMock() + m._image_storage.registry_url = "localhost:5000" + m._image_storage.client_config = {"registry_url": "localhost:5000"} + m._image_storage.delete = AsyncMock(return_value=True) + + from rock.config import ArchiveConfig + + m.rock_config = MagicMock() + m.rock_config.lifecycle.archive = ArchiveConfig() + + m.archive_sandbox = SandboxManager.archive_sandbox.__get__(m, SandboxManager) + m.restart_from_archived = SandboxManager.restart_from_archived.__get__(m, SandboxManager) + return m + + +@pytest.fixture +def sm_stopped(): + sm = AsyncMock() + sm.current_state.value = State.STOPPED + sm.sandbox_info = { + "sandbox_id": "sbx-1", + "state": State.STOPPED, + "host_ip": "10.0.0.1", + "spec": {"container_name": "sbx-1", "image": "img:latest"}, + } + return sm + + +@pytest.fixture +def sm_archived(): + sm = AsyncMock() + sm.current_state.value = State.ARCHIVED + sm.sandbox_info = { + "sandbox_id": "sbx-1", + "state": State.ARCHIVED, + "archive_time": "2026-01-01T000000Z", + "host_ip": "10.0.0.1", + "spec": {"container_name": "sbx-1", "image": "img:latest"}, + } + return sm + + +class TestArchiveSandbox: + async def test_happy_path(self, manager, sm_stopped): + manager._get_current_statemachine = AsyncMock(return_value=sm_stopped) + await manager.archive_sandbox("sbx-1") + + sm_stopped.send.assert_called_once() + assert sm_stopped.send.call_args[0][0] == "archive" + kwargs = sm_stopped.send.call_args[1] + assert kwargs["operator"] is manager._operator + assert kwargs["dir_storage"] is manager._dir_storage + assert kwargs["image_storage"] is manager._image_storage + assert "archive_params" in kwargs + + async def test_unsupported_operator_raises(self, manager, sm_stopped): + manager._operator.supports_archive.return_value = False + with pytest.raises(BadRequestRockError): + await manager.archive_sandbox("sbx-1") + + async def test_not_found_raises(self, manager): + manager._get_current_statemachine = AsyncMock(return_value=None) + with pytest.raises(BadRequestRockError): + await manager.archive_sandbox("sbx-1") + + async def test_passes_storage(self, manager, sm_stopped): + """Verify storage is passed through to on_archive for operator use.""" + manager._get_current_statemachine = AsyncMock(return_value=sm_stopped) + + await manager.archive_sandbox("sbx-1") + + sm_stopped.send.assert_called_once() + kwargs = sm_stopped.send.call_args[1] + assert kwargs["dir_storage"] is manager._dir_storage + assert kwargs["image_storage"] is manager._image_storage + + +class TestRestartFromArchived: + async def test_happy_path(self, manager, sm_archived): + manager._get_current_statemachine = AsyncMock(return_value=sm_archived) + await manager.restart_from_archived("sbx-1") + + sm_archived.send.assert_called_once() + assert sm_archived.send.call_args[0][0] == "restore" + kwargs = sm_archived.send.call_args[1] + assert kwargs["operator"] is manager._operator + assert kwargs["dir_storage"] is manager._dir_storage + assert kwargs["image_storage"] is manager._image_storage + + async def test_not_archived_skips(self, manager, sm_stopped): + manager._get_current_statemachine = AsyncMock(return_value=sm_stopped) + await manager.restart_from_archived("sbx-1") + + manager._operator.start_restore.assert_not_called() + sm_stopped.send.assert_not_called() + + async def test_not_found_raises(self, manager): + manager._get_current_statemachine = AsyncMock(return_value=None) + with pytest.raises(BadRequestRockError): + await manager.restart_from_archived("sbx-1") + + async def test_no_spec_raises(self, manager, sm_archived): + sm_archived.sandbox_info = { + "sandbox_id": "sbx-1", + "state": State.ARCHIVED, + "archive_time": "2026-01-01T000000Z", + "host_ip": "10.0.0.1", + "spec": {}, + } + manager._get_current_statemachine = AsyncMock(return_value=sm_archived) + with pytest.raises(BadRequestRockError): + await manager.restart_from_archived("sbx-1") diff --git a/tests/unit/sandbox/archive/test_check_archive_progress.py b/tests/unit/sandbox/archive/test_check_archive_progress.py new file mode 100644 index 0000000000..943a71e5c2 --- /dev/null +++ b/tests/unit/sandbox/archive/test_check_archive_progress.py @@ -0,0 +1,112 @@ +"""Unit tests for SandboxManager._reconcile_archiving scanner.""" + +from datetime import datetime, timezone +from unittest.mock import AsyncMock, MagicMock + +import pytest + +from rock.actions.sandbox.response import State +from rock.config import SandboxLifecycleConfig + + +@pytest.fixture +def manager(): + """Create a minimal SandboxManager-like object with mocked deps.""" + from rock.sandbox.sandbox_manager import SandboxManager + + m = MagicMock(spec=SandboxManager) + m.rock_config.lifecycle = SandboxLifecycleConfig() + m._meta_store = AsyncMock() + m._operator = MagicMock() + m._operator.supports_archive.return_value = True + m._dir_storage = AsyncMock() + m._dir_storage.client_config = { + "endpoint": "http://localhost:9000", + "bucket": "b", + "access_key": "a", + "secret_key": "s", + "region": "r", + } + m._image_storage = AsyncMock() + m._image_storage.registry_url = "localhost:5000" + m._image_storage.client_config = {"registry_url": "localhost:5000"} + m._get_current_statemachine = AsyncMock() + m._reconcile_archiving = SandboxManager._reconcile_archiving.__get__(m, SandboxManager) + return m + + +class TestCheckArchiveProgress: + async def test_empty_list_does_nothing(self, manager): + manager._meta_store.list_by = AsyncMock(return_value=[]) + await manager._reconcile_archiving() + manager._image_storage.exists.assert_not_called() + + async def test_image_exists_triggers_archive_done(self, manager): + info = { + "sandbox_id": "sbx-1", + "archive_time": "2026-01-01T000000Z", + "state_history": [ + { + "from_state": "stopped", + "to_state": "archiving", + "event": "archive", + "timestamp": "2026-01-01T000000Z", + } + ], + "state": State.ARCHIVING, + } + manager._meta_store.list_by = AsyncMock(return_value=[info]) + manager._image_storage.exists = AsyncMock(return_value=True) + + sm_mock = AsyncMock() + sm_mock.current_state.value = State.ARCHIVING + manager._get_current_statemachine = AsyncMock(return_value=sm_mock) + + await manager._reconcile_archiving() + + sm_mock.send.assert_called_once() + call_kwargs = sm_mock.send.call_args + assert call_kwargs[0][0] == "archive_done" + + async def test_image_not_exist_within_timeout_skips(self, manager): + now = datetime.now(timezone.utc).isoformat() + info = { + "sandbox_id": "sbx-1", + "archive_time": "2026-01-01T000000Z", + "state_history": [{"from_state": "stopped", "to_state": "archiving", "event": "archive", "timestamp": now}], + "state": State.ARCHIVING, + } + manager._meta_store.list_by = AsyncMock(return_value=[info]) + manager._image_storage.exists = AsyncMock(return_value=False) + + await manager._reconcile_archiving() + + manager._get_current_statemachine.assert_not_called() + + async def test_timeout_triggers_archive_failed(self, manager): + old_time = "2020-01-01T00:00:00+00:00" + info = { + "sandbox_id": "sbx-1", + "archive_time": "t1", + "state_history": [ + {"from_state": "stopped", "to_state": "archiving", "event": "archive", "timestamp": old_time} + ], + "state": State.ARCHIVING, + } + manager._meta_store.list_by = AsyncMock(return_value=[info]) + manager._image_storage.exists = AsyncMock(return_value=False) + + sm_mock = AsyncMock() + sm_mock.current_state.value = State.ARCHIVING + manager._get_current_statemachine = AsyncMock(return_value=sm_mock) + + await manager._reconcile_archiving() + + sm_mock.send.assert_called_once() + assert sm_mock.send.call_args[0][0] == "archive_failed" + + async def test_operator_not_supporting_archive_returns_early(self, manager): + manager._operator.supports_archive.return_value = False + manager._meta_store.list_by = AsyncMock() + await manager._reconcile_archiving() + manager._meta_store.list_by.assert_not_called() diff --git a/tests/unit/sandbox/archive/test_delete_clears_archive.py b/tests/unit/sandbox/archive/test_delete_clears_archive.py new file mode 100644 index 0000000000..5d8b195bbb --- /dev/null +++ b/tests/unit/sandbox/archive/test_delete_clears_archive.py @@ -0,0 +1,105 @@ +"""Unit tests for delete() passing dir_storage/image_storage to on_delete for archive cleanup.""" + +from unittest.mock import AsyncMock, MagicMock + +import pytest + +from rock.actions.sandbox.response import State +from rock.sdk.common.exceptions import BadRequestRockError + + +@pytest.fixture +def manager(): + from rock.sandbox.sandbox_manager import SandboxManager + + m = MagicMock(spec=SandboxManager) + m._meta_store = AsyncMock() + m._operator = MagicMock() + m._operator.supports_archive.return_value = True + m._dir_storage = AsyncMock() + m._dir_storage.client_config = {"endpoint": "e", "bucket": "b", "access_key": "a", "secret_key": "s", "region": "r"} + m._dir_storage.delete = AsyncMock(return_value=True) + m._image_storage = AsyncMock() + m._image_storage.registry_url = "localhost:5000" + m._image_storage.client_config = {"registry_url": "localhost:5000"} + m._image_storage.delete = AsyncMock(return_value=True) + + m.delete = SandboxManager.delete.__get__(m, SandboxManager) + return m + + +@pytest.fixture +def sm_archived(): + sm = AsyncMock() + sm.current_state.value = State.ARCHIVED + sm.sandbox_info = { + "sandbox_id": "sbx-1", + "state": State.ARCHIVED, + "archive_time": "2026-01-01T000000Z", + "host_ip": "10.0.0.1", + "spec": {"container_name": "sbx-1", "image": "img:latest"}, + } + return sm + + +@pytest.fixture +def sm_stopped(): + sm = AsyncMock() + sm.current_state.value = State.STOPPED + sm.sandbox_info = { + "sandbox_id": "sbx-1", + "state": State.STOPPED, + "host_ip": "10.0.0.1", + "spec": {"container_name": "sbx-1", "image": "img:latest"}, + } + return sm + + +class TestDeletePassesStorageToCallback: + async def test_delete_archived_passes_storage(self, manager, sm_archived): + manager._get_current_statemachine = AsyncMock(return_value=sm_archived) + await manager.delete("sbx-1") + + sm_archived.send.assert_called_once() + assert sm_archived.send.call_args[0][0] == "delete" + kwargs = sm_archived.send.call_args[1] + assert kwargs["dir_storage"] is manager._dir_storage + assert kwargs["image_storage"] is manager._image_storage + + async def test_delete_stopped_passes_storage(self, manager, sm_stopped): + manager._get_current_statemachine = AsyncMock(return_value=sm_stopped) + await manager.delete("sbx-1") + + sm_stopped.send.assert_called_once() + kwargs = sm_stopped.send.call_args[1] + assert kwargs["dir_storage"] is manager._dir_storage + assert kwargs["image_storage"] is manager._image_storage + + async def test_delete_without_storage_passes_none(self, manager, sm_stopped): + manager._dir_storage = None + manager._image_storage = None + manager._get_current_statemachine = AsyncMock(return_value=sm_stopped) + await manager.delete("sbx-1") + + sm_stopped.send.assert_called_once() + kwargs = sm_stopped.send.call_args[1] + assert kwargs["dir_storage"] is None + assert kwargs["image_storage"] is None + + async def test_delete_running_still_rejected(self, manager): + sm = AsyncMock() + sm.current_state.value = State.RUNNING + sm.sandbox_info = {"sandbox_id": "sbx-1"} + manager._get_current_statemachine = AsyncMock(return_value=sm) + + with pytest.raises(BadRequestRockError, match="stopped or archived"): + await manager.delete("sbx-1") + + async def test_delete_already_deleted_noop(self, manager): + sm = AsyncMock() + sm.current_state.value = State.DELETED + manager._get_current_statemachine = AsyncMock(return_value=sm) + + await manager.delete("sbx-1") + + sm.send.assert_not_called() diff --git a/tests/unit/sandbox/archive/test_image_storage_interface.py b/tests/unit/sandbox/archive/test_image_storage_interface.py new file mode 100644 index 0000000000..4786678ace --- /dev/null +++ b/tests/unit/sandbox/archive/test_image_storage_interface.py @@ -0,0 +1,115 @@ +import pytest + +from rock.sandbox.archive.abstract import AbstractImageStorage + + +class TestAbstractImageStorageCannotInstantiate: + def test_cannot_instantiate(self): + with pytest.raises(TypeError): + AbstractImageStorage() + + +class InMemoryFakeImageStorage(AbstractImageStorage): + """In-memory fake for testing callers of AbstractImageStorage.""" + + def __init__(self, registry_url: str = "localhost:5000"): + self._registry_url = registry_url + self._store: dict[str, bytes] = {} + + @property + def registry_url(self) -> str: + return self._registry_url + + async def push_from_local(self, local_image_tag: str, remote_image_ref: str) -> None: + self._store[remote_image_ref] = b"fake-manifest" + + async def pull_to_local(self, remote_image_ref: str) -> None: + if remote_image_ref not in self._store: + raise RuntimeError(f"image not found: {remote_image_ref}") + + async def delete(self, image_ref: str) -> bool: + if image_ref in self._store: + del self._store[image_ref] + return True + return False + + async def exists(self, image_ref: str) -> bool: + return image_ref in self._store + + +class TestInMemoryFakeImageStorage: + @pytest.fixture + def storage(self): + return InMemoryFakeImageStorage() + + async def test_push_then_exists(self, storage): + await storage.push_from_local("local:tag", "localhost:5000/foo:bar") + assert await storage.exists("localhost:5000/foo:bar") is True + + async def test_pull_nonexistent_raises(self, storage): + with pytest.raises(RuntimeError): + await storage.pull_to_local("localhost:5000/no:thing") + + async def test_delete(self, storage): + await storage.push_from_local("local:tag", "localhost:5000/foo:bar") + assert await storage.delete("localhost:5000/foo:bar") is True + assert await storage.exists("localhost:5000/foo:bar") is False + + async def test_delete_nonexistent(self, storage): + assert await storage.delete("nope") is False + + async def test_registry_url_property(self, storage): + assert storage.registry_url == "localhost:5000" + + +class TestDockerRegistryV2ImageStorageWithAuth: + """Test DockerRegistryV2ImageStorage with username/password (authenticated mode).""" + + def test_instantiates_with_auth(self): + from rock.sandbox.archive.registry_v2 import DockerRegistryV2ImageStorage + + s = DockerRegistryV2ImageStorage( + registry_url="rock-registry.cn-shanghai.cr.aliyuncs.com", + username="user", + password="pass", + ) + assert isinstance(s, AbstractImageStorage) + + def test_client_config_with_auth(self): + from rock.sandbox.archive.registry_v2 import DockerRegistryV2ImageStorage + + s = DockerRegistryV2ImageStorage( + registry_url="rock-registry.cn-shanghai.cr.aliyuncs.com", + username="user", + password="pass", + ) + cfg = s.client_config + assert cfg["registry_url"] == "rock-registry.cn-shanghai.cr.aliyuncs.com" + assert cfg["username"] == "user" + assert cfg["password"] == "pass" + + def test_registry_url_property(self): + from rock.sandbox.archive.registry_v2 import DockerRegistryV2ImageStorage + + s = DockerRegistryV2ImageStorage( + registry_url="rock-registry.cn-shanghai.cr.aliyuncs.com", + username="user", + password="pass", + ) + assert s.registry_url == "rock-registry.cn-shanghai.cr.aliyuncs.com" + + async def test_delete_returns_false_when_challenge_fails(self): + from unittest.mock import AsyncMock, MagicMock, patch + + from rock.sandbox.archive.registry_v2 import DockerRegistryV2ImageStorage + + s = DockerRegistryV2ImageStorage(registry_url="r.example.com", username="u", password="p") + mock_response = MagicMock() + mock_response.headers = {} + mock_response.status_code = 401 + mock_client = AsyncMock() + mock_client.get = AsyncMock(return_value=mock_response) + mock_client.__aenter__ = AsyncMock(return_value=mock_client) + mock_client.__aexit__ = AsyncMock(return_value=False) + with patch("httpx.AsyncClient", return_value=mock_client): + assert await s.delete("r.example.com/ns/repo:tag") is False diff --git a/tests/unit/sandbox/archive/test_sandbox_actor_archive.py b/tests/unit/sandbox/archive/test_sandbox_actor_archive.py new file mode 100644 index 0000000000..59e5b67e77 --- /dev/null +++ b/tests/unit/sandbox/archive/test_sandbox_actor_archive.py @@ -0,0 +1,249 @@ +"""Unit tests for SandboxActor.archive() method (mocked, no Ray).""" + +import os +import tempfile +from unittest.mock import AsyncMock, MagicMock, patch + +import pytest + + +@pytest.fixture +def actor(): + """Create a SandboxActor instance without Ray, using the underlying class.""" + from rock.sandbox.sandbox_actor import SandboxActor + + ActorClass = SandboxActor.__ray_actor_class__ + + config = MagicMock() + config.container_name = "sbx-test-123" + deployment = MagicMock() + + instance = ActorClass.__new__(ActorClass) + instance._config = config + instance._deployment = deployment + instance._run_shell_command = AsyncMock() + return instance + + +@pytest.fixture +def dir_storage_config(): + return { + "endpoint": "https://oss-cn-hangzhou.aliyuncs.com", + "bucket": "test-bucket", + "access_key_id": "test-ak", + "access_key_secret": "test-sk", + "region": "cn-hangzhou", + } + + +@pytest.fixture +def image_storage_config(): + return { + "registry_url": "rock-registry.cn-shanghai.cr.aliyuncs.com", + "username": "test-user", + "password": "test-pass", + } + + +class TestSandboxActorArchive: + @patch("rock.sandbox.archive.oss_storage.OssDirStorage.upload_dir", new_callable=AsyncMock) + @patch("rock.sandbox.archive.registry_v2.DockerRegistryV2ImageStorage.push_from_local", new_callable=AsyncMock) + async def test_happy_path( + self, mock_push, mock_upload, actor, dir_storage_config, image_storage_config, monkeypatch + ): + import rock.env_vars as _env_vars + + with tempfile.TemporaryDirectory() as tmpdir: + monkeypatch.setattr(_env_vars, "ROCK_LOGGING_PATH", tmpdir) + log_dir = os.path.join(tmpdir, "sbx-test-123") + os.makedirs(log_dir) + + await actor.archive(dir_storage_config, image_storage_config) + + actor._run_shell_command.assert_any_call( + "docker", "commit", "sbx-test-123", "archive-staging-sbx-test-123:latest" + ) + mock_push.assert_called_once() + mock_upload.assert_called_once() + call_args = mock_upload.call_args + assert "sbx-test-123" in call_args[0][0] + assert "sbx-test-123" in call_args[0][1] + + @patch("rock.sandbox.archive.oss_storage.OssDirStorage.upload_dir", new_callable=AsyncMock) + @patch( + "rock.sandbox.archive.registry_v2.DockerRegistryV2ImageStorage.push_from_local", + new_callable=AsyncMock, + side_effect=RuntimeError("push failed"), + ) + async def test_push_failure_cleans_local_tag( + self, mock_push, mock_upload, actor, dir_storage_config, image_storage_config, monkeypatch + ): + import rock.env_vars as _env_vars + + monkeypatch.setattr(_env_vars, "ROCK_LOGGING_PATH", "/tmp/logs") + + with pytest.raises(RuntimeError, match="push failed"): + await actor.archive(dir_storage_config, image_storage_config) + + actor._run_shell_command.assert_any_call("docker", "rmi", "archive-staging-sbx-test-123:latest", check=False) + mock_upload.assert_not_called() + + @patch( + "rock.sandbox.archive.oss_storage.OssDirStorage.upload_dir", + new_callable=AsyncMock, + side_effect=RuntimeError("upload failed"), + ) + @patch("rock.sandbox.archive.registry_v2.DockerRegistryV2ImageStorage.push_from_local", new_callable=AsyncMock) + @patch("rock.sandbox.archive.registry_v2.DockerRegistryV2ImageStorage.delete", new_callable=AsyncMock) + async def test_upload_failure_rolls_back_image( + self, mock_delete_img, mock_push, mock_upload, actor, dir_storage_config, image_storage_config, monkeypatch + ): + import rock.env_vars as _env_vars + + with tempfile.TemporaryDirectory() as tmpdir: + monkeypatch.setattr(_env_vars, "ROCK_LOGGING_PATH", tmpdir) + log_dir = os.path.join(tmpdir, "sbx-test-123") + os.makedirs(log_dir) + + with pytest.raises(RuntimeError, match="upload failed"): + await actor.archive(dir_storage_config, image_storage_config) + + mock_push.assert_called_once() + mock_delete_img.assert_called_once() + ref_arg = mock_delete_img.call_args[0][0] + assert "sbx-test-123" in ref_arg + + @patch("rock.sandbox.archive.registry_v2.DockerRegistryV2ImageStorage.push_from_local", new_callable=AsyncMock) + async def test_no_logging_path_skips_log_archive( + self, mock_push, actor, dir_storage_config, image_storage_config, monkeypatch + ): + import rock.env_vars as _env_vars + + monkeypatch.setattr(_env_vars, "ROCK_LOGGING_PATH", None) + + await actor.archive(dir_storage_config, image_storage_config) + + mock_push.assert_called_once() + + @patch("rock.sandbox.archive.oss_storage.OssDirStorage.upload_dir", new_callable=AsyncMock) + @patch("rock.sandbox.archive.registry_v2.DockerRegistryV2ImageStorage.push_from_local", new_callable=AsyncMock) + async def test_missing_log_dir_skips_upload( + self, mock_push, mock_upload, actor, dir_storage_config, image_storage_config, monkeypatch + ): + import rock.env_vars as _env_vars + + monkeypatch.setattr(_env_vars, "ROCK_LOGGING_PATH", "/nonexistent") + + await actor.archive(dir_storage_config, image_storage_config) + + mock_push.assert_called_once() + mock_upload.assert_not_called() + + +class TestArchiveSizeLimits: + """Tests for archive size limit enforcement.""" + + @patch("rock.sandbox.archive.registry_v2.DockerRegistryV2ImageStorage.push_from_local", new_callable=AsyncMock) + async def test_image_exceeds_limit_raises( + self, mock_push, actor, dir_storage_config, image_storage_config, monkeypatch + ): + import subprocess + + import rock.env_vars as _env_vars + + monkeypatch.setattr(_env_vars, "ROCK_LOGGING_PATH", None) + + inspect_result = subprocess.CompletedProcess(args=(), returncode=0, stdout=b"20000000000", stderr=b"") + actor._run_shell_command = AsyncMock(return_value=inspect_result) + + limits = {"max_image_push_size": "16g", "max_dir_upload_size": "16g"} + + with pytest.raises(RuntimeError, match="exceeds limit"): + await actor.archive(dir_storage_config, image_storage_config, limits) + + mock_push.assert_not_called() + + @patch("rock.sandbox.archive.oss_storage.OssDirStorage.upload_dir", new_callable=AsyncMock) + @patch("rock.sandbox.archive.registry_v2.DockerRegistryV2ImageStorage.push_from_local", new_callable=AsyncMock) + async def test_image_within_limit_proceeds( + self, mock_push, mock_upload, actor, dir_storage_config, image_storage_config, monkeypatch + ): + import subprocess + + import rock.env_vars as _env_vars + + monkeypatch.setattr(_env_vars, "ROCK_LOGGING_PATH", None) + + inspect_result = subprocess.CompletedProcess(args=(), returncode=0, stdout=b"1000000000", stderr=b"") + actor._run_shell_command = AsyncMock(return_value=inspect_result) + + limits = {"max_image_push_size": "16g", "max_dir_upload_size": "16g"} + await actor.archive(dir_storage_config, image_storage_config, limits) + + mock_push.assert_called_once() + + @patch("rock.sandbox.archive.oss_storage.OssDirStorage.upload_dir", new_callable=AsyncMock) + @patch("rock.sandbox.archive.registry_v2.DockerRegistryV2ImageStorage.push_from_local", new_callable=AsyncMock) + async def test_dir_exceeds_limit_raises( + self, mock_push, mock_upload, actor, dir_storage_config, image_storage_config, monkeypatch + ): + import subprocess + + import rock.env_vars as _env_vars + + with tempfile.TemporaryDirectory() as tmpdir: + monkeypatch.setattr(_env_vars, "ROCK_LOGGING_PATH", tmpdir) + log_dir = os.path.join(tmpdir, "sbx-test-123") + os.makedirs(log_dir) + + image_result = subprocess.CompletedProcess(args=(), returncode=0, stdout=b"1000000000", stderr=b"") + du_result = subprocess.CompletedProcess(args=(), returncode=0, stdout=b"20000000000\t/tmp/logs", stderr=b"") + + async def side_effect(*args, **kwargs): + if args[0] == "du": + return du_result + return image_result + + actor._run_shell_command = AsyncMock(side_effect=side_effect) + + limits = {"max_image_push_size": "16g", "max_dir_upload_size": "16g"} + with pytest.raises(RuntimeError, match="exceeds limit"): + await actor.archive(dir_storage_config, image_storage_config, limits) + + mock_push.assert_called_once() + mock_upload.assert_not_called() + + @patch("rock.sandbox.archive.oss_storage.OssDirStorage.upload_dir", new_callable=AsyncMock) + @patch("rock.sandbox.archive.registry_v2.DockerRegistryV2ImageStorage.push_from_local", new_callable=AsyncMock) + async def test_no_limits_skips_checks( + self, mock_push, mock_upload, actor, dir_storage_config, image_storage_config, monkeypatch + ): + import rock.env_vars as _env_vars + + with tempfile.TemporaryDirectory() as tmpdir: + monkeypatch.setattr(_env_vars, "ROCK_LOGGING_PATH", tmpdir) + log_dir = os.path.join(tmpdir, "sbx-test-123") + os.makedirs(log_dir) + + await actor.archive(dir_storage_config, image_storage_config) + + mock_push.assert_called_once() + mock_upload.assert_called_once() + + @patch("rock.sandbox.archive.oss_storage.OssDirStorage.upload_dir", new_callable=AsyncMock) + @patch("rock.sandbox.archive.registry_v2.DockerRegistryV2ImageStorage.push_from_local", new_callable=AsyncMock) + async def test_empty_limit_string_skips_check( + self, mock_push, mock_upload, actor, dir_storage_config, image_storage_config, monkeypatch + ): + import rock.env_vars as _env_vars + + with tempfile.TemporaryDirectory() as tmpdir: + monkeypatch.setattr(_env_vars, "ROCK_LOGGING_PATH", tmpdir) + log_dir = os.path.join(tmpdir, "sbx-test-123") + os.makedirs(log_dir) + + limits = {"max_image_push_size": "", "max_dir_upload_size": ""} + await actor.archive(dir_storage_config, image_storage_config, limits) + + mock_push.assert_called_once() + mock_upload.assert_called_once() diff --git a/tests/unit/sandbox/archive/test_sandbox_actor_restore.py b/tests/unit/sandbox/archive/test_sandbox_actor_restore.py new file mode 100644 index 0000000000..d3ab32be24 --- /dev/null +++ b/tests/unit/sandbox/archive/test_sandbox_actor_restore.py @@ -0,0 +1,160 @@ +"""Unit tests for SandboxActor.restore() method (mocked, no Ray).""" + +from unittest.mock import AsyncMock, MagicMock, create_autospec, patch + +import pytest + +from rock.deployments.docker import DockerDeployment + + +@pytest.fixture +def actor(): + from rock.sandbox.sandbox_actor import SandboxActor + + ActorClass = SandboxActor.__ray_actor_class__ + + config = MagicMock() + config.container_name = "sbx-test-123" + deployment = create_autospec(DockerDeployment, instance=True) + deployment.restart = AsyncMock() + deployment.restart_from_image = AsyncMock() + + instance = ActorClass.__new__(ActorClass) + instance._config = config + instance._deployment = deployment + instance._run_shell_command = AsyncMock() + instance._clean_container_background = MagicMock() + instance._setup_monitor = AsyncMock() + return instance + + +@pytest.fixture +def dir_storage_config(): + return { + "endpoint": "https://oss-cn-hangzhou.aliyuncs.com", + "bucket": "test-bucket", + "access_key_id": "test-ak", + "access_key_secret": "test-sk", + "region": "cn-hangzhou", + } + + +@pytest.fixture +def image_storage_config(): + return { + "registry_url": "rock-registry.cn-shanghai.cr.aliyuncs.com", + "username": "test-user", + "password": "test-pass", + } + + +class TestSandboxActorRestore: + @patch("rock.sandbox.archive.oss_storage.OssDirStorage.exists", new_callable=AsyncMock, return_value=True) + @patch("rock.sandbox.archive.oss_storage.OssDirStorage.download_to_dir", new_callable=AsyncMock) + @patch("rock.sandbox.archive.registry_v2.DockerRegistryV2ImageStorage.pull_to_local", new_callable=AsyncMock) + async def test_happy_path( + self, mock_pull, mock_download, mock_exists, actor, dir_storage_config, image_storage_config, monkeypatch + ): + import rock.env_vars as _env_vars + + monkeypatch.setattr(_env_vars, "ROCK_LOGGING_PATH", "/tmp/logs") + + await actor.restore_and_start(dir_storage_config, image_storage_config) + + mock_pull.assert_called_once() + ref_arg = mock_pull.call_args[0][0] + assert "sbx-test-123" in ref_arg + + mock_download.assert_called_once() + key_arg = mock_download.call_args[0][0] + dir_arg = mock_download.call_args[0][1] + assert "sbx-test-123" in key_arg + assert dir_arg == "/tmp/logs/sbx-test-123" + + actor._deployment.restart_from_image.assert_called_once_with(ref_arg) + actor._deployment.restart.assert_not_called() + + @patch( + "rock.sandbox.archive.registry_v2.DockerRegistryV2ImageStorage.pull_to_local", + new_callable=AsyncMock, + side_effect=RuntimeError("pull failed"), + ) + async def test_pull_failure_no_download( + self, mock_pull, actor, dir_storage_config, image_storage_config, monkeypatch + ): + import rock.env_vars as _env_vars + + monkeypatch.setattr(_env_vars, "ROCK_LOGGING_PATH", "/tmp/logs") + + with pytest.raises(RuntimeError, match="pull failed"): + await actor.restore_and_start(dir_storage_config, image_storage_config) + + actor._run_shell_command.assert_not_called() + + @patch("rock.sandbox.archive.oss_storage.OssDirStorage.exists", new_callable=AsyncMock, return_value=True) + @patch( + "rock.sandbox.archive.oss_storage.OssDirStorage.download_to_dir", + new_callable=AsyncMock, + side_effect=RuntimeError("download failed"), + ) + @patch("rock.sandbox.archive.registry_v2.DockerRegistryV2ImageStorage.pull_to_local", new_callable=AsyncMock) + async def test_download_failure_warns_and_continues( + self, mock_pull, mock_download, mock_exists, actor, dir_storage_config, image_storage_config, monkeypatch + ): + import rock.env_vars as _env_vars + + monkeypatch.setattr(_env_vars, "ROCK_LOGGING_PATH", "/tmp/logs") + + await actor.restore_and_start(dir_storage_config, image_storage_config) + + mock_pull.assert_called_once() + actor._deployment.restart_from_image.assert_called_once() + + @patch("rock.sandbox.archive.oss_storage.OssDirStorage.exists", new_callable=AsyncMock, return_value=True) + @patch("rock.sandbox.archive.oss_storage.OssDirStorage.download_to_dir", new_callable=AsyncMock) + @patch("rock.sandbox.archive.registry_v2.DockerRegistryV2ImageStorage.pull_to_local", new_callable=AsyncMock) + @patch("rock.sandbox.archive.registry_v2.DockerRegistryV2ImageStorage.delete", new_callable=AsyncMock) + async def test_restore_does_not_delete_remote( + self, + mock_delete, + mock_pull, + mock_download, + mock_exists, + actor, + dir_storage_config, + image_storage_config, + monkeypatch, + ): + import rock.env_vars as _env_vars + + monkeypatch.setattr(_env_vars, "ROCK_LOGGING_PATH", "/tmp/logs") + + await actor.restore_and_start(dir_storage_config, image_storage_config) + mock_delete.assert_not_called() + + @patch("rock.sandbox.archive.registry_v2.DockerRegistryV2ImageStorage.pull_to_local", new_callable=AsyncMock) + async def test_no_logging_path_skips_log_restore( + self, mock_pull, actor, dir_storage_config, image_storage_config, monkeypatch + ): + import rock.env_vars as _env_vars + + monkeypatch.setattr(_env_vars, "ROCK_LOGGING_PATH", None) + + await actor.restore_and_start(dir_storage_config, image_storage_config) + + mock_pull.assert_called_once() + actor._deployment.restart_from_image.assert_called_once() + + @patch("rock.sandbox.archive.oss_storage.OssDirStorage.exists", new_callable=AsyncMock, return_value=False) + @patch("rock.sandbox.archive.registry_v2.DockerRegistryV2ImageStorage.pull_to_local", new_callable=AsyncMock) + async def test_missing_key_warns_and_continues( + self, mock_pull, mock_exists, actor, dir_storage_config, image_storage_config, monkeypatch + ): + import rock.env_vars as _env_vars + + monkeypatch.setattr(_env_vars, "ROCK_LOGGING_PATH", "/tmp/logs") + + await actor.restore_and_start(dir_storage_config, image_storage_config) + + mock_pull.assert_called_once() + actor._deployment.restart_from_image.assert_called_once() diff --git a/tests/unit/sandbox/archive/test_state_machine_archive.py b/tests/unit/sandbox/archive/test_state_machine_archive.py new file mode 100644 index 0000000000..da2806abcd --- /dev/null +++ b/tests/unit/sandbox/archive/test_state_machine_archive.py @@ -0,0 +1,205 @@ +"""Tests for archive-related state machine transitions.""" + +from unittest.mock import AsyncMock + +import pytest +from statemachine.exceptions import TransitionNotAllowed + +from rock.actions.sandbox.response import State +from rock.sandbox.sandbox_statemachine import SandboxStateMachine + + +@pytest.fixture +def meta_store(): + m = AsyncMock() + m.update = AsyncMock() + return m + + +class TestArchiveTransitions: + async def test_stopped_to_archiving(self, meta_store): + sm = await SandboxStateMachine.from_state_value(State.STOPPED, {}) + await sm.send("archive", sandbox_id="sbx-1", meta_store=meta_store) + assert sm.current_state.value == State.ARCHIVING + + async def test_archiving_to_archived(self, meta_store): + sm = await SandboxStateMachine.from_state_value(State.ARCHIVING, {"archive_time": "t1"}) + await sm.send("archive_done", sandbox_id="sbx-1", meta_store=meta_store) + assert sm.current_state.value == State.ARCHIVED + + async def test_archiving_to_stopped_on_failure(self, meta_store): + sm = await SandboxStateMachine.from_state_value(State.ARCHIVING, {"archive_time": "t1"}) + await sm.send("archive_failed", sandbox_id="sbx-1", meta_store=meta_store, reason="timeout") + assert sm.current_state.value == State.STOPPED + + async def test_archived_to_pending_on_restore(self, meta_store): + sm = await SandboxStateMachine.from_state_value(State.ARCHIVED, {"archive_time": "t1"}) + await sm.send("restore", sandbox_id="sbx-1", meta_store=meta_store) + assert sm.current_state.value == State.PENDING + + async def test_pending_to_running_on_alive_after_restore(self, meta_store): + sm = await SandboxStateMachine.from_state_value(State.PENDING, {}) + await sm.send("alive", sandbox_id="sbx-1", meta_store=meta_store, sandbox_info={"host_ip": "10.0.0.1"}) + assert sm.current_state.value == State.RUNNING + + async def test_pending_to_archived_on_restore_failed(self, meta_store): + sm = await SandboxStateMachine.from_state_value(State.PENDING, {"archive_time": "t1"}) + await sm.send("restore_failed", sandbox_id="sbx-1", meta_store=meta_store, reason="timeout") + assert sm.current_state.value == State.ARCHIVED + + async def test_archived_to_deleted(self, meta_store): + sm = await SandboxStateMachine.from_state_value(State.ARCHIVED, {"archive_time": "t1"}) + await sm.send( + "delete", + sandbox_id="sbx-1", + operator=AsyncMock(), + meta_store=meta_store, + reason=AsyncMock(), + ) + assert sm.current_state.value == State.DELETED + + +class TestArchiveTransitionsRejected: + async def test_running_cannot_archive(self, meta_store): + sm = await SandboxStateMachine.from_state_value(State.RUNNING, {}) + with pytest.raises(TransitionNotAllowed): + await sm.send("archive", sandbox_id="sbx-1", meta_store=meta_store) + + async def test_archiving_cannot_archive(self, meta_store): + sm = await SandboxStateMachine.from_state_value(State.ARCHIVING, {}) + with pytest.raises(TransitionNotAllowed): + await sm.send("archive", sandbox_id="sbx-1", meta_store=meta_store) + + async def test_archived_cannot_restart_directly(self, meta_store): + sm = await SandboxStateMachine.from_state_value(State.ARCHIVED, {}) + with pytest.raises(TransitionNotAllowed): + await sm.send("restart", sandbox_id="sbx-1", operator=AsyncMock(), meta_store=meta_store) + + async def test_stopped_cannot_restore(self, meta_store): + sm = await SandboxStateMachine.from_state_value(State.STOPPED, {}) + with pytest.raises(TransitionNotAllowed): + await sm.send("restore", sandbox_id="sbx-1", meta_store=meta_store) + + async def test_archiving_cannot_stop(self, meta_store): + sm = await SandboxStateMachine.from_state_value(State.ARCHIVING, {}) + with pytest.raises(TransitionNotAllowed): + await sm.send("stop", sandbox_id="sbx-1", operator=AsyncMock(), meta_store=meta_store) + + async def test_archiving_cannot_delete(self, meta_store): + sm = await SandboxStateMachine.from_state_value(State.ARCHIVING, {}) + with pytest.raises(TransitionNotAllowed): + await sm.send( + "delete", + sandbox_id="sbx-1", + operator=AsyncMock(), + meta_store=meta_store, + reason=AsyncMock(), + ) + + +class TestArchiveCleanupInCallbacks: + async def test_on_delete_cleans_archive_artifacts(self, meta_store): + dir_storage = AsyncMock() + image_storage = AsyncMock() + image_storage.registry_url = "localhost:5000" + operator = AsyncMock() + sm = await SandboxStateMachine.from_state_value( + State.ARCHIVED, + { + "sandbox_id": "sbx-1", + "archive_time": "2026-01-01T000000Z", + "spec": {"container_name": "sbx-1", "image": "img:latest"}, + }, + ) + await sm.send( + "delete", + sandbox_id="sbx-1", + operator=operator, + meta_store=meta_store, + dir_storage=dir_storage, + image_storage=image_storage, + ) + assert sm.current_state.value == State.DELETED + dir_storage.delete.assert_called_once() + assert "sbx-1" in dir_storage.delete.call_args[0][0] + image_storage.delete.assert_called_once() + assert "sbx-1" in image_storage.delete.call_args[0][0] + + async def test_on_delete_no_archive_time_skips_cleanup(self, meta_store): + dir_storage = AsyncMock() + image_storage = AsyncMock() + image_storage.registry_url = "localhost:5000" + operator = AsyncMock() + sm = await SandboxStateMachine.from_state_value( + State.STOPPED, + {"sandbox_id": "sbx-1", "spec": {"container_name": "sbx-1", "image": "img:latest"}}, + ) + await sm.send( + "delete", + sandbox_id="sbx-1", + operator=operator, + meta_store=meta_store, + dir_storage=dir_storage, + image_storage=image_storage, + ) + assert sm.current_state.value == State.DELETED + dir_storage.delete.assert_not_called() + image_storage.delete.assert_not_called() + + async def test_on_delete_without_storage_skips_cleanup(self, meta_store): + operator = AsyncMock() + sm = await SandboxStateMachine.from_state_value( + State.ARCHIVED, + { + "sandbox_id": "sbx-1", + "archive_time": "2026-01-01T000000Z", + "spec": {"container_name": "sbx-1", "image": "img:latest"}, + }, + ) + await sm.send( + "delete", + sandbox_id="sbx-1", + operator=operator, + meta_store=meta_store, + ) + assert sm.current_state.value == State.DELETED + + +class TestArchiveHookSideEffects: + async def test_on_archive_sets_fields(self, meta_store): + sm = await SandboxStateMachine.from_state_value(State.STOPPED, {}) + await sm.send("archive", sandbox_id="sbx-1", meta_store=meta_store) + info = sm.sandbox_info + assert "archive_time" not in info + assert info["state"] == State.ARCHIVING + # state_history records the transition with timestamp + assert any(r["to_state"] == "archiving" for r in info.get("state_history", [])) + + async def test_on_archive_done_sets_archive_time(self, meta_store): + sm = await SandboxStateMachine.from_state_value(State.ARCHIVING, {}) + await sm.send("archive_done", sandbox_id="sbx-1", meta_store=meta_store) + info = sm.sandbox_info + assert info["state"] == State.ARCHIVED + assert info.get("archive_time") is not None + + async def test_on_archive_failed_clears_archive_time(self, meta_store): + sm = await SandboxStateMachine.from_state_value(State.ARCHIVING, {"archive_time": "t1"}) + await sm.send("archive_failed", sandbox_id="sbx-1", meta_store=meta_store, reason="timeout") + info = sm.sandbox_info + assert info["state"] == State.STOPPED + assert "archive_time" not in info + + async def test_on_restore_sets_pending_and_keeps_archive_time(self, meta_store): + sm = await SandboxStateMachine.from_state_value(State.ARCHIVED, {"archive_time": "t1"}) + await sm.send("restore", sandbox_id="sbx-1", meta_store=meta_store) + info = sm.sandbox_info + assert info["state"] == State.PENDING + assert "archive_time" in info + # state_history records the transition + assert any(r["to_state"] == "pending" for r in info.get("state_history", [])) + + async def test_on_restore_failed_rolls_back_to_archived(self, meta_store): + sm = await SandboxStateMachine.from_state_value(State.PENDING, {"archive_time": "t1"}) + await sm.send("restore_failed", sandbox_id="sbx-1", meta_store=meta_store, reason="timeout") + info = sm.sandbox_info + assert info["state"] == State.ARCHIVED diff --git a/tests/unit/sandbox/archive/test_storage_interface.py b/tests/unit/sandbox/archive/test_storage_interface.py new file mode 100644 index 0000000000..c967bb2e84 --- /dev/null +++ b/tests/unit/sandbox/archive/test_storage_interface.py @@ -0,0 +1,84 @@ +import pytest + +from rock.sandbox.archive.abstract import AbstractDirStorage + + +class TestAbstractDirStorageCannotInstantiate: + def test_cannot_instantiate(self): + with pytest.raises(TypeError): + AbstractDirStorage() + + +class InMemoryFakeDirStorage(AbstractDirStorage): + """In-memory fake for testing callers of AbstractDirStorage.""" + + def __init__(self): + self._store: dict[str, bytes] = {} + + async def upload_dir(self, local_dir: str, key: str) -> None: + self._store[key] = b"fake-tar-content" + + async def download_to_dir(self, key: str, local_dir: str) -> None: + if key not in self._store: + raise FileNotFoundError(key) + + async def delete(self, key: str) -> bool: + if key in self._store: + del self._store[key] + return True + return False + + async def exists(self, key: str) -> bool: + return key in self._store + + +class TestInMemoryFakeDirStorage: + @pytest.fixture + def storage(self): + return InMemoryFakeDirStorage() + + async def test_upload_then_exists(self, storage): + await storage.upload_dir("/tmp/fake", "mykey") + assert await storage.exists("mykey") is True + + async def test_delete(self, storage): + await storage.upload_dir("/tmp/fake", "mykey") + assert await storage.delete("mykey") is True + assert await storage.exists("mykey") is False + + async def test_delete_nonexistent(self, storage): + assert await storage.delete("nope") is False + + async def test_download_nonexistent_raises(self, storage): + with pytest.raises(FileNotFoundError): + await storage.download_to_dir("nope", "/tmp/out") + + +class TestOssDirStorageInterface: + def test_instantiates(self): + from rock.sandbox.archive.oss_storage import OssDirStorage + + s = OssDirStorage( + endpoint="https://oss-cn-hangzhou.aliyuncs.com", + bucket="test-bucket", + access_key_id="ak", + access_key_secret="sk", + region="cn-hangzhou", + ) + assert isinstance(s, AbstractDirStorage) + + def test_client_config(self): + from rock.sandbox.archive.oss_storage import OssDirStorage + + s = OssDirStorage( + endpoint="https://oss-cn-hangzhou.aliyuncs.com", + bucket="test-bucket", + access_key_id="ak", + access_key_secret="sk", + region="cn-hangzhou", + ) + cfg = s.client_config + assert cfg["endpoint"] == "https://oss-cn-hangzhou.aliyuncs.com" + assert cfg["bucket"] == "test-bucket" + assert cfg["access_key_id"] == "ak" + assert cfg["region"] == "cn-hangzhou" diff --git a/tests/unit/sandbox/operator/test_k8s_operator.py b/tests/unit/sandbox/operator/test_k8s_operator.py index d0dcb784fa..ec1dd1ba7e 100644 --- a/tests/unit/sandbox/operator/test_k8s_operator.py +++ b/tests/unit/sandbox/operator/test_k8s_operator.py @@ -244,6 +244,7 @@ async def get_current_statemachine(sandbox_id: str): return await SandboxStateMachine.from_state_value(info.get("state"), sandbox_info=info) m._get_current_statemachine = AsyncMock(side_effect=get_current_statemachine) + m._try_advance_pending = SandboxManager._try_advance_pending.__get__(m, SandboxManager) m.get_status = SandboxManager.get_status.__get__(m, SandboxManager) m._refresh_timeout = AsyncMock() return m diff --git a/tests/unit/sandbox/test_collect_sandbox_meta.py b/tests/unit/sandbox/test_collect_sandbox_meta.py index c2baff5db2..0dc1561bbf 100644 --- a/tests/unit/sandbox/test_collect_sandbox_meta.py +++ b/tests/unit/sandbox/test_collect_sandbox_meta.py @@ -36,8 +36,15 @@ def base_manager(meta_store): """Minimal BaseManager with real meta_store, everything else mocked.""" from rock.sandbox.base_manager import BaseManager + class _ConcreteManager(BaseManager): + async def _check_job_background(self): + ... + + async def _reconcile(self): + ... + with patch.object(BaseManager, "__init__", lambda self, *a, **kw: None): - mgr = BaseManager.__new__(BaseManager) + mgr = _ConcreteManager.__new__(_ConcreteManager) mgr._meta_store = meta_store mgr._sandbox_meta = {} # old code needs this; proves empty dict → "default" return mgr diff --git a/tests/unit/sandbox/test_sandbox_statemachine.py b/tests/unit/sandbox/test_sandbox_statemachine.py index 54f93b4952..31e765320a 100644 --- a/tests/unit/sandbox/test_sandbox_statemachine.py +++ b/tests/unit/sandbox/test_sandbox_statemachine.py @@ -298,6 +298,16 @@ async def test_updates_state_to_pending(self, mock_meta_store): updated_info = mock_meta_store.update.call_args[0][1] assert updated_info["state"] == State.PENDING + @pytest.mark.asyncio + async def test_clears_stale_phases_on_restart(self, mock_meta_store): + info = dict(_VALID_RESTART_INFO) + info["phases"] = {"image_pull": {"status": "done", "message": "ok"}} + info["stop_time"] = "2026-01-01T01:00:00+08:00" + await self._send_restart(mock_meta_store, sandbox_info=info) + updated_info = mock_meta_store.update.call_args[0][1] + assert "phases" not in updated_info + assert "stop_time" not in updated_info + @pytest.mark.asyncio async def test_writes_timeout_built_from_spec(self, mock_meta_store): # auto_clear_time_minutes=30 in spec → make_timeout_info uses 30 @@ -411,3 +421,98 @@ async def test_missing_spec_skips_operator_but_still_archives(self, mock_meta_st async def test_restores_deleted_from_state_value(self): sm = await SandboxStateMachine.from_state_value(State.DELETED, sandbox_info={}) assert sm.deleted.is_active + + +# --------------------------------------------------------------------------- +# state_history recording +# --------------------------------------------------------------------------- + + +class TestStateHistory: + @pytest.mark.asyncio + async def test_alive_records_history(self): + sm = SandboxStateMachine(sandbox_info={"sandbox_id": "sb-1"}) + await sm.activate_initial_state() + await sm.send("alive", sandbox_id="sb-1", meta_store=AsyncMock(), sandbox_info={}) + history = sm.sandbox_info["state_history"] + assert len(history) == 1 + assert history[0]["from_state"] == "pending" + assert history[0]["to_state"] == "running" + assert history[0]["event"] == "alive" + assert "timestamp" in history[0] + + @pytest.mark.asyncio + async def test_stop_records_history(self): + sm = await SandboxStateMachine.from_state_value(State.RUNNING, sandbox_info={"sandbox_id": "sb-1"}) + await sm.send("stop", sandbox_id="sb-1", operator=AsyncMock(), meta_store=AsyncMock()) + history = sm.sandbox_info["state_history"] + assert len(history) == 1 + assert history[0]["from_state"] == "running" + assert history[0]["to_state"] == "stopped" + assert history[0]["event"] == "stop" + + @pytest.mark.asyncio + async def test_full_lifecycle_accumulates_history(self): + sm = SandboxStateMachine(sandbox_info={"sandbox_id": "sb-1"}) + await sm.activate_initial_state() + await sm.send("alive", sandbox_id="sb-1", meta_store=AsyncMock(), sandbox_info={}) + await sm.send("stop", sandbox_id="sb-1", operator=AsyncMock(), meta_store=AsyncMock()) + history = sm.sandbox_info["state_history"] + assert len(history) == 2 + assert history[0]["from_state"] == "pending" + assert history[0]["to_state"] == "running" + assert history[1]["from_state"] == "running" + assert history[1]["to_state"] == "stopped" + + @pytest.mark.asyncio + async def test_stop_noop_skips_history(self): + sm = await SandboxStateMachine.from_state_value(State.STOPPED, sandbox_info={"sandbox_id": "sb-1"}) + await sm.send("stop_noop", sandbox_id="sb-1") + history = sm.sandbox_info.get("state_history", []) + assert len(history) == 0 + + @pytest.mark.asyncio + async def test_restart_records_history(self): + sm = await SandboxStateMachine.from_state_value(State.STOPPED, sandbox_info=dict(_VALID_RESTART_INFO)) + await sm.send("restart", sandbox_id="sb-1", operator=AsyncMock(), meta_store=AsyncMock()) + history = sm.sandbox_info["state_history"] + assert len(history) == 1 + assert history[0]["from_state"] == "stopped" + assert history[0]["to_state"] == "pending" + assert history[0]["event"] == "restart" + + @pytest.mark.asyncio + async def test_delete_records_history(self): + sm = await SandboxStateMachine.from_state_value(State.STOPPED, sandbox_info=dict(_VALID_DELETE_INFO)) + await sm.send("delete", sandbox_id="sb-1", operator=AsyncMock(), meta_store=AsyncMock()) + history = sm.sandbox_info["state_history"] + assert len(history) == 1 + assert history[0]["from_state"] == "stopped" + assert history[0]["to_state"] == "deleted" + assert history[0]["event"] == "delete" + + @pytest.mark.asyncio + async def test_history_preserved_from_state_value(self): + existing_history = [ + {"from_state": "pending", "to_state": "running", "event": "alive", "timestamp": "2026-01-01T00:00:00+08:00"} + ] + sm = await SandboxStateMachine.from_state_value( + State.RUNNING, sandbox_info={"sandbox_id": "sb-1", "state_history": existing_history} + ) + await sm.send("stop", sandbox_id="sb-1", operator=AsyncMock(), meta_store=AsyncMock()) + history = sm.sandbox_info["state_history"] + assert len(history) == 2 + assert history[0]["event"] == "alive" + assert history[1]["event"] == "stop" + + @pytest.mark.asyncio + async def test_alive_copies_history_to_operator_sandbox_info(self): + sm = SandboxStateMachine(sandbox_info={"sandbox_id": "sb-1"}) + await sm.activate_initial_state() + operator_info = {"sandbox_id": "sb-1"} + mock_meta_store = AsyncMock() + await sm.send("alive", sandbox_id="sb-1", meta_store=mock_meta_store, sandbox_info=operator_info) + updated_info = mock_meta_store.update.call_args[0][1] + assert "state_history" in updated_info + assert len(updated_info["state_history"]) == 1 + assert updated_info["state_history"][0]["event"] == "alive" diff --git a/tests/unit/sandbox/test_sandbox_transitions.py b/tests/unit/sandbox/test_sandbox_transitions.py index ce2756d412..83d4a81da8 100644 --- a/tests/unit/sandbox/test_sandbox_transitions.py +++ b/tests/unit/sandbox/test_sandbox_transitions.py @@ -61,6 +61,7 @@ async def get_current_statemachine(sandbox_id: str) -> SandboxStateMachine | Non m._get_current_statemachine = AsyncMock(side_effect=get_current_statemachine) m.stop = SandboxManager.stop.__get__(m, SandboxManager) + m._try_advance_pending = SandboxManager._try_advance_pending.__get__(m, SandboxManager) m.get_status = SandboxManager.get_status.__get__(m, SandboxManager) m._refresh_timeout = AsyncMock() return m diff --git a/tests/unit/test_config.py b/tests/unit/test_config.py index eab2f16441..7e50df9077 100644 --- a/tests/unit/test_config.py +++ b/tests/unit/test_config.py @@ -603,7 +603,7 @@ async def test_nacos_lifecycle_partial_update_preserves_yaml_values(): await rock_config.update() assert rock_config.lifecycle.default_startup_timeout_seconds == 1200 - assert rock_config.lifecycle.min_startup_timeout_seconds == 300 # preserved, not reset to 600 + assert rock_config.lifecycle.min_startup_timeout_seconds == 300 # preserved, not reset to 600 assert rock_config.lifecycle.max_startup_timeout_seconds == 3600 # preserved, not reset to 1800 @@ -652,7 +652,9 @@ async def test_nacos_lifecycle_nested_dict_coerced_via_post_init(): rock_config.nacos_provider.get_config = AsyncMock( return_value={ "lifecycle": { - "archive": {"scan_interval_sec": 99, "timeout_sec": 500}, + "reconcile_interval_seconds": 99, + "archive_timeout_seconds": 500, + "archive": {"max_retries": 5}, } } ) @@ -660,8 +662,9 @@ async def test_nacos_lifecycle_nested_dict_coerced_via_post_init(): await rock_config.update() assert isinstance(rock_config.lifecycle.archive, ArchiveConfig) - assert rock_config.lifecycle.archive.scan_interval_sec == 99 - assert rock_config.lifecycle.archive.timeout_sec == 500 + assert rock_config.lifecycle.reconcile_interval_seconds == 99 + assert rock_config.lifecycle.archive_timeout_seconds == 500 + assert rock_config.lifecycle.archive.max_retries == 5 @pytest.mark.asyncio @@ -697,7 +700,8 @@ def test_base_inheritance_deep_merges(self, tmp_path: Path): """Child config inherits and overrides base via _deep_merge.""" base_file = tmp_path / "base.yml" base_file.write_text( - textwrap.dedent("""\ + textwrap.dedent( + """\ ray: namespace: "base-ns" runtime_env: @@ -705,16 +709,19 @@ def test_base_inheritance_deep_merges(self, tmp_path: Path): warmup: images: - "python:3.11" - """) + """ + ) ) child_file = tmp_path / "child.yml" child_file.write_text( - textwrap.dedent("""\ + textwrap.dedent( + """\ _base: base.yml ray: namespace: "child-ns" - """) + """ + ) ) config = RockConfig.from_env(config_path=str(child_file)) @@ -728,7 +735,8 @@ def test_base_inheritance_scheduler_tasks_merge(self, tmp_path: Path): """Scheduler tasks list is merged by task_class key.""" base_file = tmp_path / "base.yml" base_file.write_text( - textwrap.dedent("""\ + textwrap.dedent( + """\ scheduler: tasks: - task_class: "rock.admin.scheduler.tasks.cleanup.CleanupTask" @@ -737,12 +745,14 @@ def test_base_inheritance_scheduler_tasks_merge(self, tmp_path: Path): - task_class: "rock.admin.scheduler.tasks.report.ReportTask" enabled: true interval_seconds: 300 - """) + """ + ) ) child_file = tmp_path / "child.yml" child_file.write_text( - textwrap.dedent("""\ + textwrap.dedent( + """\ _base: base.yml scheduler: tasks: @@ -751,7 +761,8 @@ def test_base_inheritance_scheduler_tasks_merge(self, tmp_path: Path): - task_class: "rock.admin.scheduler.tasks.audit.AuditTask" enabled: true interval_seconds: 600 - """) + """ + ) ) config = RockConfig.from_env(config_path=str(child_file)) @@ -769,11 +780,13 @@ def test_base_inheritance_scheduler_tasks_merge(self, tmp_path: Path): def test_base_not_found_raises(self, tmp_path: Path): child_file = tmp_path / "child.yml" child_file.write_text( - textwrap.dedent("""\ + textwrap.dedent( + """\ _base: nonexistent.yml ray: namespace: "ns" - """) + """ + ) ) with pytest.raises(Exception, match="base config file.*not found"): RockConfig.from_env(config_path=str(child_file))