Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions rock/actions/sandbox/response.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ class State(str, Enum):
PENDING = "pending"
RUNNING = "running"
STOPPED = "stopped"
ARCHIVING = "archiving"
ARCHIVED = "archived"
DELETED = "deleted"


Expand Down
2 changes: 2 additions & 0 deletions rock/actions/sandbox/sandbox_info.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ class SandboxInfo(TypedDict, total=False):
start_time: str
stop_time: str
delete_time: str
archive_time: str
state_enter_time: str
extended_params: dict[str, str]


Expand Down
3 changes: 3 additions & 0 deletions rock/admin/core/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,9 @@ class SandboxRecord(Base):
cpus = Column(Float, nullable=True)
memory = Column(String(64), nullable=True)
create_user_gray_flag = Column(Boolean, nullable=True)
archive_time = Column(String(64), nullable=True)
state_enter_time = Column(String(64), nullable=True)
delete_time = Column(String(64), nullable=True)
phases = Column(_JSONB_VARIANT, nullable=True)
port_mapping = Column(_JSONB_VARIANT, nullable=True)
spec = Column(_JSONB_VARIANT, nullable=True)
Expand Down
26 changes: 24 additions & 2 deletions rock/admin/entrypoints/sandbox_api.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import math
from typing import Annotated, Any

from fastapi import APIRouter, Body, Depends, File, Form, UploadFile
from fastapi import APIRouter, Body, Depends, File, Form, Header, UploadFile

from rock.actions import (
BashObservation,
Expand Down Expand Up @@ -117,7 +117,7 @@ async def _apply_accelerator_type_validation(config: DockerDeploymentConfig) ->

if config.accelerator_type not in allowed:
raise BadRequestRockError(
f"Invalid accelerator_type {config.accelerator_type!r}. " f"Allowed values: {sorted(allowed)}"
f"Invalid accelerator_type {config.accelerator_type!r}. Allowed values: {sorted(allowed)}"
)


Expand Down Expand Up @@ -158,10 +158,18 @@ async def _apply_cpu_overcommit_default(config: DockerDeploymentConfig, rock_aut
config.limit_cpus = min(2 * config.cpus, config.cpus + headroom)


def _apply_auto_clear_default(config: DockerDeploymentConfig, request: SandboxStartRequest) -> None:
"""Use lifecycle.auto_clear_default_sec when SDK did not explicitly set auto_clear_time_minutes."""
if "auto_clear_time_minutes" not in request.model_fields_set:
default_sec = sandbox_manager.rock_config.lifecycle.auto_clear_default_sec
config.auto_clear_time_minutes = default_sec // 60


@sandbox_router.post("/start")
@handle_exceptions(error_message="start sandbox failed")
async def start(request: SandboxStartRequest) -> RockResponse[SandboxStartResponse]:
config = DockerDeploymentConfig.from_request(request)
_apply_auto_clear_default(config, request)
await _apply_accelerator_type_validation(config)
await _apply_kata_runtime_switch(config)
await _apply_kata_disk_size(config)
Expand All @@ -177,6 +185,7 @@ async def start_async(
headers: Annotated[StartHeaders, Depends()],
) -> RockResponse[SandboxStartResponse]:
config = DockerDeploymentConfig.from_request(request)
_apply_auto_clear_default(config, request)
await _apply_accelerator_type_validation(config)
await _apply_kata_runtime_switch(config)
await _apply_kata_disk_size(config)
Expand Down Expand Up @@ -296,6 +305,19 @@ async def delete(sandbox_id: str = Body(..., embed=True)) -> RockResponse:
return RockResponse(result=f"{sandbox_id} deleted")


@sandbox_router.post("/archive")
@handle_exceptions(error_message="archive sandbox failed")
async def archive(
sandbox_id: str = Body(..., embed=True),
rock_authorization: str | None = Header(default=None, alias="X-Key"),
) -> RockResponse:
archive_cfg = sandbox_manager.rock_config.lifecycle.archive
if not archive_cfg.is_allowed(rock_authorization):
raise BadRequestRockError("archive not allowed for this authorization key")
await sandbox_manager.archive_sandbox(sandbox_id)
return RockResponse(result=f"{sandbox_id} archiving")


@sandbox_router.post("/restart")
@handle_exceptions(error_message="restart sandbox failed")
async def restart(sandbox_id: str = Body(..., embed=True)) -> RockResponse[SandboxStartResponse]:
Expand Down
48 changes: 43 additions & 5 deletions rock/admin/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,11 @@
)
from rock.admin.service.ops_service import OpsService
from rock.common.exception import request_validation_exception_handler
from rock.config import DatabaseConfig, RockConfig, SchedulerConfig
from rock.config import DatabaseConfig, RockConfig, SandboxLifecycleConfig, SchedulerConfig
from rock.logger import init_logger
from rock.sandbox.archive.oss_storage import OssDirStorage
from rock.sandbox.archive.registry_v2 import DockerRegistryV2ImageStorage
from rock.sandbox.archive.s3_storage import S3DirStorage
from rock.sandbox.gem_manager import GemManager
from rock.sandbox.operator.factory import OperatorContext, OperatorFactory
from rock.sandbox.sandbox_meta_store import SandboxMetaStore
Expand Down Expand Up @@ -93,12 +96,16 @@ async def lifespan(app: FastAPI):
)
rock_config = RockConfig.from_env(config_file_path)

# Override scheduler config from Nacos if available
# Override configs from Nacos if available
if rock_config.nacos_provider:
nacos_config = await rock_config.nacos_provider.get_config()
if nacos_config and "scheduler" in nacos_config:
rock_config.scheduler = SchedulerConfig(**nacos_config["scheduler"])
logger.info(f"Overrode scheduler config from Nacos with {len(rock_config.scheduler.tasks)} tasks")
if nacos_config:
if "scheduler" in nacos_config:
rock_config.scheduler = SchedulerConfig(**nacos_config["scheduler"])
logger.info(f"Overrode scheduler config from Nacos with {len(rock_config.scheduler.tasks)} tasks")
if "lifecycle" in nacos_config:
rock_config.lifecycle = SandboxLifecycleConfig(**nacos_config["lifecycle"])
logger.info(f"Overrode lifecycle config from Nacos: {rock_config.lifecycle}")

env_vars.ROCK_ADMIN_ENV = args.env
env_vars.ROCK_ADMIN_ROLE = args.role
Expand Down Expand Up @@ -179,6 +186,37 @@ async def lifespan(app: FastAPI):
meta_store=meta_store,
)
set_sandbox_manager(sandbox_manager)

archive_cfg = rock_config.lifecycle.archive
if archive_cfg.enabled:
acr = archive_cfg.acr
ds = archive_cfg.dir_storage
acr_ready = acr.registry_url and acr.username and acr.password
ds_ready = ds.endpoint and ds.access_key_id and ds.access_key_secret
if not (acr_ready and ds_ready):
raise RuntimeError("archive.enabled=true but ACR or dir_storage credentials are missing")
if ds.type == "s3":
sandbox_manager._dir_storage = S3DirStorage(
endpoint=ds.endpoint,
bucket=ds.bucket,
access_key_id=ds.access_key_id,
access_key_secret=ds.access_key_secret,
region=ds.region,
)
else:
sandbox_manager._dir_storage = OssDirStorage(
endpoint=ds.endpoint,
bucket=ds.bucket,
access_key_id=ds.access_key_id,
access_key_secret=ds.access_key_secret,
region=ds.region,
)
sandbox_manager._image_storage = DockerRegistryV2ImageStorage(
registry_url=acr.registry_url,
username=acr.username,
password=acr.password,
)

warmup_service = WarmupService(rock_config.warmup)
await warmup_service.init()
set_warmup_service(warmup_service)
Expand Down
68 changes: 67 additions & 1 deletion rock/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,66 @@ def __post_init__(self):
self.primary = OssAccountConfig(**self.primary)


@dataclass
class ArchiveDirStorageConfig:
type: str = "oss"
endpoint: str = ""
bucket: str = ""
access_key_id: str = ""
access_key_secret: str = ""
region: str = ""


@dataclass
class ArchiveAcrConfig:
registry_url: str = ""
username: str = ""
password: str = ""
namespace: str = "sandbox_archive"


@dataclass
class ArchiveConfig:
enabled: bool = False
allowed_keys: list[str] | None = None
dir_storage: ArchiveDirStorageConfig = field(default_factory=ArchiveDirStorageConfig)
acr: ArchiveAcrConfig = field(default_factory=ArchiveAcrConfig)
max_retries: int = 3
prefix: str = "rock-archives/"
max_image_push_size: str = "16g"
max_dir_upload_size: str = "16g"

def is_allowed(self, rock_authorization: str | None) -> bool:
"""Check if the caller is allowed to use archive. None = open to all."""
if self.allowed_keys is None:
return True
if not isinstance(self.allowed_keys, list):
return False
return rock_authorization in self.allowed_keys

def __post_init__(self):
if isinstance(self.dir_storage, dict):
self.dir_storage = ArchiveDirStorageConfig(**self.dir_storage)
if isinstance(self.acr, dict):
self.acr = ArchiveAcrConfig(**self.acr)


@dataclass
class SandboxLifecycleConfig:
auto_transition_interval_sec: int = 180
reconcile_interval_sec: int = 30
archive_timeout_sec: int = 1800
restore_timeout_sec: int = 1800
auto_archive_after_sec: int = 0
auto_delete_after_sec: int = 0
auto_clear_default_sec: int = 21600
archive: ArchiveConfig = field(default_factory=ArchiveConfig)

def __post_init__(self):
if isinstance(self.archive, dict):
self.archive = ArchiveConfig(**self.archive)


@dataclass
class ProxyServiceConfig:
timeout: float = 180.0
Expand Down Expand Up @@ -333,6 +393,7 @@ class RockConfig:
redis: RedisConfig = field(default_factory=RedisConfig)
sandbox_config: SandboxConfig = field(default_factory=SandboxConfig)
oss: OssConfig = field(default_factory=OssConfig)
lifecycle: SandboxLifecycleConfig = field(default_factory=SandboxLifecycleConfig)
runtime: RuntimeConfig = field(default_factory=RuntimeConfig)
proxy_service: ProxyServiceConfig = field(default_factory=ProxyServiceConfig)
scheduler: SchedulerConfig = field(default_factory=SchedulerConfig)
Expand Down Expand Up @@ -384,6 +445,8 @@ def from_env(cls, config_path: str | None = None):
kwargs["sandbox_config"] = SandboxConfig(**config["sandbox_config"])
if "oss" in config:
kwargs["oss"] = OssConfig(**config["oss"])
if "lifecycle" in config:
kwargs["lifecycle"] = SandboxLifecycleConfig(**config["lifecycle"])
if "runtime" in config:
kwargs["runtime"] = RuntimeConfig(**config["runtime"])
if "proxy_service" in config:
Expand Down Expand Up @@ -482,6 +545,7 @@ async def update(self):
config_map = {
"sandbox_config": (SandboxConfig, "sandbox_config"),
"proxy_service": (ProxyServiceConfig, "proxy_service"),
"lifecycle": (SandboxLifecycleConfig, "lifecycle"),
}

# Update configs that are present in nacos_result
Expand All @@ -490,5 +554,7 @@ async def update(self):
setattr(self, attr_name, config_class(**nacos_result[key]))

logger.info(
f"Updated config from Nacos: sandbox_config={self.sandbox_config}, proxy_service={self.proxy_service}"
f"Updated config from Nacos: sandbox_config={self.sandbox_config},"
f" proxy_service={self.proxy_service},"
f" lifecycle={self.lifecycle}"
)
112 changes: 112 additions & 0 deletions rock/deployments/docker.py
Original file line number Diff line number Diff line change
Expand Up @@ -727,6 +727,118 @@ async def restart(self):

logger.info(f"Container {self._container_name} restarted successfully")

def _container_exists(self) -> bool:
result = subprocess.run(
["docker", "inspect", self._container_name],
capture_output=True,
timeout=5,
)
return result.returncode == 0

async def restart_from_image(self, image_ref: str):
"""Restart a container, recreating it from a committed image if it no longer exists.

Used by the archive-restore flow: the image was previously committed
and pushed to a registry. If the original container still exists,
behaves like restart(). If not, creates a new container from
*image_ref* with the same config (ports, memory, cpus, volumes).
"""
if self._container_exists():
await self.restart()
return

logger.info(f"Container {self._container_name} not found, recreating from {image_ref}")

executor = get_executor()
loop = asyncio.get_running_loop()

# Recover port mappings from persisted file, or allocate new ones.
status_path = PersistedServiceStatus.gen_service_status_path(self._container_name)
self._service_status.set_sandbox_id(self._container_name)
if os.path.exists(status_path):
with open(status_path) as f:
data = json.load(f)
for port_value, mapping in data.get("port_mapping", {}).items():
self._service_status.add_port_mapping(int(port_value), mapping)
if self._config.port is None:
self._config.port = self._service_status.port_mapping.get(Port.PROXY)

if self._config.port is None:
await self.do_port_mapping()

# Build docker create command from archived image.
env_arg = ["-e", f"ROCK_TIME_ZONE={env_vars.ROCK_TIME_ZONE}"]
volume_args = self._prepare_volume_mounts()
if env_vars.ROCK_LOGGING_PATH:
log_file_path = f"{env_vars.ROCK_LOGGING_PATH}/{self._container_name}"
os.makedirs(log_file_path, exist_ok=True)
os.chmod(log_file_path, 0o777)
volume_args.extend(["-v", f"{log_file_path}:{env_vars.ROCK_LOGGING_PATH}"])
env_arg.extend(
[
"-e",
f"ROCK_LOGGING_PATH={env_vars.ROCK_LOGGING_PATH}",
"-e",
f"ROCK_LOGGING_LEVEL={env_vars.ROCK_LOGGING_LEVEL}",
]
)
volume_args.extend(self._prepare_timezone_mount())
runtime_args = self._build_runtime_args()

cmds = [
"docker",
"create",
"--entrypoint",
"",
*env_arg,
*volume_args,
*runtime_args,
"-p",
f"{self._config.port}:{Port.PROXY}",
"-p",
f"{self._service_status.get_mapped_port(Port.SERVER)}:8080",
"-p",
f"{self._service_status.get_mapped_port(Port.SSH)}:22",
*self._memory(),
*self._cpus(),
*self._storage_opts(),
*self._config.docker_args,
"--name",
self._container_name,
image_ref,
*self._get_rocklet_start_cmd(),
]

cmd_str = shlex.join(cmds)
logger.info(f"Recreating container {self._container_name} from archived image {image_ref}")
logger.info(f"Command: {cmd_str!r}")

await loop.run_in_executor(executor, self._docker_create, cmds)
try:
self._container_process = await loop.run_in_executor(executor, self._docker_start)
except Exception:
DockerUtil.remove_container_force(self._container_name)
raise

if self._config.port is None:
self._config.port = self._service_status.port_mapping.get(Port.PROXY)
if self._config.port is None:
raise Exception(f"Cannot determine rocklet port for container {self._container_name}")

logger.info(f"Starting runtime at {self._config.port}")
self._runtime = RemoteSandboxRuntime.from_config(
RemoteSandboxRuntimeConfig(port=self._config.port, timeout=self._runtime_timeout)
)
self._runtime.set_executor(executor)

with StageTimer("startup_timing", f"[{self._container_name}] Wait until alive", logger):
await self._wait_until_alive(timeout=self._config.startup_timeout)

if self._config.enable_auto_clear:
self._check_stop_task = asyncio.create_task(self._check_stop())

logger.info(f"Container {self._container_name} recreated from {image_ref} successfully")

async def delete(self) -> None:
"""Remove the container via ``docker rm -f``.

Expand Down
Empty file.
6 changes: 6 additions & 0 deletions rock/sandbox/archive/_constants.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
def dir_archive_key(sandbox_id: str, prefix: str) -> str:
return f"{prefix}{sandbox_id}.tar.gz"


def image_ref(sandbox_id: str, registry_url: str, namespace: str) -> str:
return f"{registry_url}/{namespace}/sandbox_archived:{sandbox_id}"
Loading