Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions rock/actions/sandbox/sandbox_info.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ class SandboxInfo(TypedDict, total=False):
stop_time: str
delete_time: str
extended_params: dict[str, str]
operator_name: str


_SANDBOX_INFO_KEYS = frozenset(SandboxInfo.__annotations__.keys())
Expand Down
45 changes: 36 additions & 9 deletions rock/admin/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@
from rock.logger import init_logger
from rock.sandbox.gem_manager import GemManager
from rock.sandbox.operator.factory import OperatorContext, OperatorFactory
from rock.sandbox.operator.registry import OperatorRegistry
from rock.sandbox.operator.routing import Router
from rock.sandbox.sandbox_meta_store import SandboxMetaStore
from rock.sandbox.service.sandbox_proxy_service import SandboxProxyService
from rock.sandbox.service.warmup_service import WarmupService
Expand Down Expand Up @@ -145,19 +147,44 @@ async def lifespan(app: FastAPI):

# init sandbox service
if args.role == "admin":
# init ray service
ray_service = RayService(rock_config.ray)
ray_service.init()

# create operator using factory with context pattern
# Configuration-driven loading: only initialize what the YAML
# actually declared via top-level operator keys.
present = rock_config.present_operator_keys or {rock_config.runtime.operator_type}
ray_service: RayService | None = None
if "ray" in present:
ray_service = RayService(rock_config.ray)
ray_service.init()

# build OperatorContext once with all available dependencies; the
# Factory.build call only consumes the ones each operator needs.
operator_context = OperatorContext(
runtime_config=rock_config.runtime,
ray_service=ray_service,
redis_provider=redis_provider,
nacos_provider=rock_config.nacos_provider,
k8s_config=rock_config.k8s,
k8s_config=rock_config.k8s if "k8s" in present else None,
remote_config=rock_config.remote if "remote" in present else None,
)
operator = OperatorFactory.create_operator(operator_context)

default_operator_name = rock_config.runtime.operator_type
if default_operator_name not in present:
raise ValueError(
f"runtime.operator_type={default_operator_name!r} but its config block is"
f" not present in YAML. Loaded operator keys: {sorted(present)}"
)
operator_registry = OperatorRegistry(default_name=default_operator_name)
for name in sorted(present):
operator_registry.register(name, OperatorFactory.build(name, operator_context))
if rock_config.runtime.operator_routing:
router = Router.from_config(
rock_config.runtime.operator_routing,
fallback_default=default_operator_name,
loaded_operators=operator_registry.loaded_names,
)
operator_registry.set_router(router)
logger.info("operator router enabled with %d rule(s)", len(router.rules))
else:
logger.info("no operator_routing configured; all submits go to default=%s", default_operator_name)

# init service
if rock_config.runtime.enable_auto_clear:
Expand All @@ -166,7 +193,7 @@ async def lifespan(app: FastAPI):
ray_namespace=rock_config.ray.namespace,
ray_service=ray_service,
enable_runtime_auto_clear=True,
operator=operator,
registry=operator_registry,
meta_store=meta_store,
)
else:
Expand All @@ -175,7 +202,7 @@ async def lifespan(app: FastAPI):
ray_namespace=rock_config.ray.namespace,
ray_service=ray_service,
enable_runtime_auto_clear=False,
operator=operator,
registry=operator_registry,
meta_store=meta_store,
)
set_sandbox_manager(sandbox_manager)
Expand Down
52 changes: 51 additions & 1 deletion rock/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,30 @@ def __post_init__(self):
self.ports[key] = value


@dataclass
class RemoteConfig:
"""Configuration for the Remote operator."""

api_endpoint: str = ""

api_key: str = ""

rocklet_port: int = 8000

header_sandbox_id: str = "X-Sandbox-Id"

header_sandbox_port: str = "X-Sandbox-Port"

def resolved_api_key(self) -> str:
"""Return the effective api_key, env var ROCK_REMOTE_API_KEY takes precedence."""
import os

value = os.environ.get("ROCK_REMOTE_API_KEY", "")
if value:
return value
return self.api_key


@dataclass
class K8sConfig:
"""Kubernetes configuration for K8s operator."""
Expand Down Expand Up @@ -257,6 +281,16 @@ class RuntimeConfig:
python_env_path: str = field(default_factory=lambda: env_vars.ROCK_PYTHON_ENV_PATH)
envhub_db_url: str = field(default_factory=lambda: env_vars.ROCK_ENVHUB_DB_URL)
operator_type: str = "ray"
operator_routing: dict | None = None
"""Routing rules consumed by Router.from_config. Kept as a raw dict so the
routing layer owns its schema. Example::

operator_routing:
default: ray # optional; falls back to operator_type
rules:
- match: {image_prefix: "reg.example.com/remote/"}
target: remote
"""
standard_spec: StandardSpec = field(default_factory=StandardSpec)
max_allowed_spec: StandardSpec = field(default_factory=lambda: StandardSpec(cpus=16, memory="64g"))
use_standard_spec_only: bool = False
Expand Down Expand Up @@ -324,10 +358,16 @@ def _resolve_k8s_template_includes(k8s_dict: dict, base_dir: Path) -> None:
k8s_dict["templates"] = merged


# Top-level YAML keys whose presence triggers loading of the corresponding
# operator. Single source of truth for the configuration-driven loader.
OPERATOR_CONFIG_KEYS: tuple[str, ...] = ("ray", "k8s", "remote")


@dataclass
class RockConfig:
ray: RayConfig = field(default_factory=RayConfig)
k8s: K8sConfig = field(default_factory=K8sConfig)
remote: RemoteConfig | None = None
warmup: WarmupConfig = field(default_factory=WarmupConfig)
nacos: NacosConfig = field(default_factory=NacosConfig)
redis: RedisConfig = field(default_factory=RedisConfig)
Expand All @@ -338,6 +378,10 @@ class RockConfig:
scheduler: SchedulerConfig = field(default_factory=SchedulerConfig)
database: DatabaseConfig = field(default_factory=DatabaseConfig)
nacos_provider: NacosConfigProvider | None = None
# Operator config keys actually present in the loaded YAML. Populated by
# from_env after merging _base. Drives configuration-driven operator
# loading: an operator is registered iff its key is in this set.
present_operator_keys: set[str] = field(default_factory=set, init=False)

@classmethod
def from_env(cls, config_path: str | None = None):
Expand Down Expand Up @@ -374,6 +418,8 @@ def from_env(cls, config_path: str | None = None):
if "k8s" in config:
_resolve_k8s_template_includes(config["k8s"], config_file.parent)
kwargs["k8s"] = K8sConfig(**config["k8s"])
if "remote" in config:
kwargs["remote"] = RemoteConfig(**config["remote"])
if "warmup" in config:
kwargs["warmup"] = WarmupConfig(**config["warmup"])
if "nacos" in config:
Expand All @@ -393,7 +439,11 @@ def from_env(cls, config_path: str | None = None):
if "database" in config:
kwargs["database"] = DatabaseConfig(**config["database"])

return cls(**kwargs)
instance = cls(**kwargs)
# Record which operator config blocks were actually present so the
# Registry can load exactly what the YAML declares.
instance.present_operator_keys = {k for k in OPERATOR_CONFIG_KEYS if k in config}
return instance

# ============================================================================
# Merging Rules:
Expand Down
7 changes: 4 additions & 3 deletions rock/sandbox/gem_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
from rock.admin.proto.response import SandboxStartResponse, SandboxStatusResponse
from rock.config import RockConfig
from rock.deployments.config import DockerDeploymentConfig
from rock.sandbox.operator.registry import OperatorRegistry
from rock.sandbox.sandbox_actor import SandboxActor
from rock.sandbox.sandbox_manager import SandboxManager
from rock.sandbox.sandbox_meta_store import SandboxMetaStore
Expand All @@ -25,19 +26,19 @@ class GemManager(SandboxManager):
def __init__(
self,
rock_config: RockConfig,
meta_store: SandboxMetaStore | None = None,
meta_store: SandboxMetaStore,
registry: OperatorRegistry,
ray_namespace: str = env_vars.ROCK_RAY_NAMESPACE,
ray_service: RayService | None = None,
enable_runtime_auto_clear: bool = False,
operator=None,
):
super().__init__(
rock_config,
meta_store=meta_store,
registry=registry,
ray_namespace=ray_namespace,
ray_service=ray_service,
enable_runtime_auto_clear=enable_runtime_auto_clear,
operator=operator,
)

async def env_make(self, env_id: str) -> EnvMakeResponse:
Expand Down
46 changes: 30 additions & 16 deletions rock/sandbox/operator/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from typing import Any

from rock.admin.core.ray_service import RayService
from rock.config import K8sConfig, RuntimeConfig
from rock.config import K8sConfig, RemoteConfig, RuntimeConfig
from rock.logger import init_logger
from rock.sandbox.operator.abstract import AbstractOperator
from rock.sandbox.operator.k8s.operator import K8sOperator
Expand All @@ -30,6 +30,8 @@ class OperatorContext:
# K8s operator dependencies
k8s_config: K8sConfig | None = None
nacos_provider: NacosConfigProvider | None = None
# Remote operator dependencies
remote_config: RemoteConfig | None = None
# Future operator dependencies can be added here without breaking existing code
extra_params: dict[str, Any] = field(default_factory=dict)

Expand All @@ -38,25 +40,21 @@ class OperatorFactory:
"""Factory class for creating operator instances.

Uses the Context Object pattern to avoid parameter explosion as new
operator types are added.
operator types are added. ``build`` constructs a single operator by name,
while ``create_operator`` is kept for backward compatibility (loads the
single operator selected by ``runtime_config.operator_type``).
"""

@staticmethod
def create_operator(context: OperatorContext) -> AbstractOperator:
"""Create an operator instance based on the runtime configuration.

Args:
context: OperatorContext containing all necessary dependencies
def build(name: str, context: OperatorContext) -> AbstractOperator:
"""Construct one operator instance by config-key name.

Returns:
AbstractOperator: The created operator instance

Raises:
ValueError: If operator_type is not supported or required dependencies are missing
``name`` matches the top-level YAML key that triggered loading
(one of ``OPERATOR_CONFIG_KEYS``: ``ray``/``k8s``/``remote``).
"""
operator_type = context.runtime_config.operator_type.lower()
key = name.lower()

if operator_type == "ray":
if key == "ray":
if context.ray_service is None:
raise ValueError("RayService is required for RayOperator")
logger.info("Creating RayOperator")
Expand All @@ -66,7 +64,7 @@ def create_operator(context: OperatorContext) -> AbstractOperator:
if context.nacos_provider is not None:
ray_operator.set_nacos_provider(context.nacos_provider)
return ray_operator
elif operator_type == "k8s":
elif key == "k8s":
if context.k8s_config is None:
raise ValueError("K8sConfig is required for K8sOperator")
logger.info("Creating K8sOperator")
Expand All @@ -76,5 +74,21 @@ def create_operator(context: OperatorContext) -> AbstractOperator:
if context.nacos_provider is not None:
k8s_operator.set_nacos_provider(context.nacos_provider)
return k8s_operator
elif key == "remote":
if context.remote_config is None:
raise ValueError("RemoteConfig is required for RemoteOperator")
# Lazy import to avoid pulling httpx into ray/k8s-only deployments.
from rock.sandbox.operator.remote.operator import RemoteOperator

logger.info("Creating RemoteOperator endpoint=%s", context.remote_config.api_endpoint)
remote_operator = RemoteOperator(remote_config=context.remote_config)
if context.redis_provider is not None:
remote_operator.set_redis_provider(context.redis_provider)
return remote_operator
else:
raise ValueError(f"Unsupported operator type: {operator_type}. Supported types: ray, kubernetes")
raise ValueError(f"Unsupported operator name: {name!r}. Supported: ray, k8s, remote")

@staticmethod
def create_operator(context: OperatorContext) -> AbstractOperator:
"""Backward-compatible single-operator creation by ``operator_type``."""
return OperatorFactory.build(context.runtime_config.operator_type, context)
79 changes: 79 additions & 0 deletions rock/sandbox/operator/registry.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
"""Operator registry: name → AbstractOperator instance.

Two dispatch paths share the registry:
* Submit path: ``resolve(RouteContext)`` → router → operator
* Operate path: ``get(operator_name)`` → operator (name from sandbox meta)
"""

from __future__ import annotations

from rock.logger import init_logger
from rock.sandbox.operator.abstract import AbstractOperator
from rock.sandbox.operator.routing.context import RouteContext
from rock.sandbox.operator.routing.router import Router

logger = init_logger(__name__)


class OperatorRegistry:
"""Holds all loaded operators and dispatches by name or by route ctx.

Built once at startup. ``register`` is intended to be called only
during initialization; runtime mutation is not supported.
"""

def __init__(self, default_name: str) -> None:
self._default_name: str = default_name
self._operators: dict[str, AbstractOperator] = {}
self._router: Router | None = None

# ------------------------------------------------------------------
# Registration (startup-only)
# ------------------------------------------------------------------

def register(self, name: str, operator: AbstractOperator) -> None:
if name in self._operators:
raise ValueError(f"operator {name!r} already registered")
self._operators[name] = operator
logger.info("Operator registered: %s (%s)", name, type(operator).__name__)

def set_router(self, router: Router) -> None:
self._router = router

# ------------------------------------------------------------------
# Lookup
# ------------------------------------------------------------------

@property
def default_name(self) -> str:
return self._default_name

@property
def loaded_names(self) -> set[str]:
return set(self._operators)

def get(self, name: str | None) -> AbstractOperator:
"""Resolve operator by name; empty/None → default.

Raises ``KeyError`` when the name was never loaded — callers
should let this surface to alert ops that a sandbox's bound
operator has been removed from config.
"""
effective = name or self._default_name
op = self._operators.get(effective)
if op is None:
raise KeyError(
f"operator {effective!r} is not loaded; "
f"loaded={sorted(self._operators)}"
)
return op

def resolve(self, ctx: RouteContext) -> tuple[str, AbstractOperator]:
"""Submit-time entry: route by ctx, return (name, operator)."""
if self._router is None:
# No routing configured → everything goes to default. This is
# the legacy single-operator path.
return self._default_name, self.get(self._default_name)
name, routed_by = self._router.route(ctx)
logger.info("routing: operator_name=%s routed_by=%s", name, routed_by)
return name, self.get(name)
5 changes: 5 additions & 0 deletions rock/sandbox/operator/remote/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
"""RemoteOperator: delegates sandbox lifecycle to an external sandbox API."""

from rock.sandbox.operator.remote.operator import RemoteOperator

__all__ = ["RemoteOperator"]
Loading
Loading