diff --git a/docs/_specs/proxy-multicore/01_requirement.md b/docs/_specs/proxy-multicore/01_requirement.md new file mode 100644 index 0000000000..1c14d75478 --- /dev/null +++ b/docs/_specs/proxy-multicore/01_requirement.md @@ -0,0 +1,67 @@ +# sandbox_proxy_router 多核化 — 需求与决策 + +> 日期:2026-06-22 · 状态:设计已确认,待出实施计划 + +## 1. 背景 + +`sandbox_proxy_router`(`admin --role proxy`)当前通过 `rock/admin/main.py:310` 的 `uvicorn.run(app, ...)` 以**单进程、单事件循环**运行,在多核机器上只能吃满一个核心。proxy role 是数据/控制面的转发层,职责为: + +- 把 HTTP / WebSocket / SSE 请求转发到 sandbox(rocklet); +- 从 Redis/DB(`meta_store`)读取 sandbox metadata; +- 生成 OSS STS token。 + +proxy 进程内**无跨请求会话状态**(bash session 在 rocklet;WebSocket 连接天然绑定单连接),因此适合多进程横向扩展。 + +## 2. 目标 + +1. 让 `sandbox_proxy_router` 在单 Pod 内充分利用多核心(吞吐随核数近似线性提升)。 +2. 复用 httpx 连接池,消除转发热路径上的每请求建连开销。 +3. 在不打爆后端(PostgreSQL)、不失控内存的前提下安全多进程化。 + +## 3. 关键决策(已与需求方确认) + +| 决策项 | 选择 | 理由 | +|--------|------|------| +| 扩展方式 | **单 Pod 内多进程**(进程内扩展) | 直接吃满本机核心,不依赖 k8s 扩副本 | +| 进程管理 | **uvicorn 原生 `--workers`** | 改动最小,内置 master 管理子进程,原生支持 ASGI/WebSocket | +| 多进程范围 | **仅 proxy role**;admin role 恒 `workers=1` | admin 持有 scheduler 线程(`is_primary_pod()` 为 Pod 级)与 Ray/单例,多进程会重复调度 | +| 优化范围 | **httpx 连接池复用** | 修复 `http_proxy`/`host_proxy` 每请求新建 `AsyncClient` | +| 连接池/Metrics 治理 | 作为多进程化的**必要正确性项**纳入 | 多 worker 必然放大连接数、Metrics 标签冲突 | + +**明确不做(本期)**:访问日志中间件瘦身、sandbox 状态查询缓存(需求方未勾选)。但留观测点——多 worker 后日志中间件的 `await request.json()+json.dumps(indent=2)` CPU 开销会在每个核各付一份,若压测发现单核仍被日志吃满需回头处理。 + +## 4. 约束与硬前提 + +### 4.1 argv 不可跨 worker(为什么必须改 env 传参) + +`uvicorn.run("module:app", workers=N)` 会 fork/spawn 出 N 个 worker 子进程,**每个子进程重新 import 模块构建 app**。当前 `main.py` 顶层 `args = parser.parse_args()` 在 import 期执行,而 worker 子进程(尤其 spawn 模式)的 `sys.argv` **不是**启动命令的 argv,会退回 argparse 默认值(`role=admin`!)甚至 `SystemExit`。 + +→ 必须改为:**主进程 `main()` 解析 argv 并写入 env;`create_app()`/`lifespan`(每 worker 执行)只读 env**。顶层 `parse_args()` 必须删除。env 是进程继承属性,fork/spawn 都能正确继承。 + +### 4.2 连接池不能跨进程共享 + +Redis/DB 连接池 = TCP socket + 协议状态机 + 绑定的 event loop,三者都绑死单进程单 loop:同一条连接被多进程并发读写会导致协议帧错乱;asyncio 连接对象持有当前 loop 引用,跨进程 loop 无法驱动;spawn 下连 fd 都不继承。 + +→ **每个 worker 必须各自创建 Redis/DB/httpx 池**。共享的是后端服务实例本身(同一个 Redis / 同一个 PG),不是连接对象。能做的是**调小单池 + 文档写明总量预算**。 + +### 4.3 当前 DB 池对多 worker 是危险默认 + +`rock/admin/core/db_provider.py:43-45`(仅 PostgreSQL 生效,硬编码、不可配): + +```python +pool_size = 100 # 每进程最多 100 条常驻连接 +max_overflow = 0 # 100 为硬上限 +pool_timeout = 120 +``` + +乘法风险:`8 worker × 100 = 800`、`3 pod × 8 worker × 100 = 2400`,远超 PG 默认 `max_connections=100`。**第一个倒下的不是 CPU,而是 PostgreSQL**。 + +→ 多 worker 上线的**前置硬条件**:`pool_size` 改为可配,并按 `pool_size × workers × pods ≤ PG_max_connections` 反推。proxy 对 DB 基本只读 metadata、热路径走 Redis,可设很小。 + +## 5. 成功标准 + +- proxy role 以 `workers=N` 启动,N 个进程均分负载(SO_REUSEPORT/uvicorn master 分发),压测吞吐随 worker 数近似线性提升至核数上限。 +- worker 子进程构建出**正确的 role 路由集合**(不会因 argv 丢失退回 admin)。 +- admin role 仍为单进程,scheduler 不重复启动。 +- `http_proxy`/`host_proxy` 复用共享 httpx client,不再每请求建连;共享 client 生命周期=进程,流式响应只关 response 不关 client。 +- 单 Pod PG 连接总量 ≤ 现状(`pool_size = max(2, base // workers)` 保证不退化)。 diff --git a/docs/_specs/proxy-multicore/02_design.md b/docs/_specs/proxy-multicore/02_design.md new file mode 100644 index 0000000000..023e82ee61 --- /dev/null +++ b/docs/_specs/proxy-multicore/02_design.md @@ -0,0 +1,163 @@ +# sandbox_proxy_router 多核化 — 设计 + +> 配套 `01_requirement.md`。本文给出四部分设计 + 容量预算 + 测试策略。 + +## 第 1 部分:多 worker 启动(app-factory + env 传参) + +改造 `rock/admin/main.py`: + +```python +# 顶层不再有 args = parse_args()(消除 import 期对 argv 的依赖) + +def create_app() -> FastAPI: + """工厂:每个 worker 子进程都会调用,只读 env,不碰 argv。""" + role = env_vars.ROCK_ADMIN_ROLE # 由主进程写入 env + env = env_vars.ROCK_ADMIN_ENV + app = FastAPI(lifespan=lifespan) + # CORS / 异常处理器 / 访问日志中间件 / include_router(按 role) 全部在此 + if role == "admin": + app.include_router(sandbox_router, prefix="/apis/envs/sandbox/v1", tags=["sandbox"]) + app.include_router(admin_ops_router, prefix="/apis/envs/sandbox/v1/ops", tags=["admin-ops"]) + else: + app.include_router(sandbox_proxy_router, prefix="/apis/envs/sandbox/v1", tags=["sandbox"]) + app.include_router(warmup_router, ...) + app.include_router(gem_router, ...) + return app + +def main(): + args = _parse_args() # 仅主进程 + os.environ["ROCK_ADMIN_ROLE"] = args.role + os.environ["ROCK_ADMIN_ENV"] = args.env + workers = resolve_workers(args.role, args.workers, int(os.getenv("ROCK_PROXY_WORKERS", "0"))) + uvicorn.run( + "rock.admin.main:create_app", factory=True, + host="0.0.0.0", port=args.port, workers=workers, + ws_ping_interval=None, ws_ping_timeout=None, timeout_keep_alive=30, + ) + +# rock/utils/worker.py(纯函数 util,无 I/O) +SINGLE_WORKER_ENVS = frozenset({"local", "test", "dev"}) + +def resolve_workers(role, override, env_workers, env=None) -> int: + if role != "proxy": + return 1 # admin 恒单进程(scheduler/Ray 单例) + if env in SINGLE_WORKER_ENVS: + return 1 # local/test/dev 强制单进程(fakeredis/in-mem 状态进程私有,多 worker 会不共享);优先级高于 override + if override and override > 0: + return override + if env_workers and env_workers > 0: + return env_workers + return 1 # 必须显式设置;不做 cpu_count 自动探测 +``` + +要点: +- `resolve_workers` / `compute_pool_size` 放在 `rock/utils/worker.py`(纯函数 util,可单测)。 +- `lifespan` 内所有 `args.env/args.role` 改读 `env_vars.ROCK_ADMIN_ENV/ROCK_ADMIN_ROLE`。 +- 新增 CLI `--workers`(可选,覆盖 env)。 +- `env_vars.py` 新增懒加载默认:`ROCK_PROXY_WORKERS`(默认 `0`)。**worker 数必须显式设置**(`--workers` 或 `ROCK_PROXY_WORKERS`);两者都未设则单 worker(=1),不按 cpu_count 自动探测。 +- 运维侧建议 worker 数 `≤ min(物理核数, 可用内存/单进程RSS)`(见容量预算),由运维显式决定而非进程自选。 + +## 第 2 部分:连接池 / Metrics 治理(必要正确性) + +每个 worker 各跑一遍 `lifespan` → 各自独立的 Redis 池、DB 池、httpx 池、MetricsMonitor。收口: + +### 2.1 DB 池可配 + 按 worker 缩小 + +- `DatabaseConfig` 新增 `pool_size`(env 可覆盖),`db_provider.init()` 不再硬编码 100。 +- proxy role 实际生效值:`pool_size = max(2, base // workers)`,`base` 默认 100。 + - 整除兜底,给下限 2,避免 worker 很大时算出 0/1。 + - 按 Pod 维度,`workers × (base//workers) ≈ base`,**对 PG 的压力与现状单进程一致,不退化**。 + - proxy 几乎只读 metadata,生产可在此基础上进一步下调。 +- admin role 保持单 worker → `pool_size = base`(=100,不变)。 + +### 2.2 Redis 池 + +同理可配、按需调小;Redis 连接成本低(每条几 KB),优先级低于 DB。 + +### 2.3 Metrics 多进程打标 + +多 worker 用相同 `user_defined_tags` 上报会互相覆盖/串味。`create_app()`/`lifespan` 构建 `MetricsMonitor` 时注入 `worker_pid`(`os.getpid()`)标签,使各 worker 指标可区分(或交由后端按 tag 求和聚合)。 + +### 2.4 日志文件并发(部署清空 + 各 worker append) + +现状 `init_file_handler` 用 `mode="w+"`:多 worker 下每个进程启动都 **truncate 同一文件**、各持独立 offset 互相覆盖 → 日志错乱。修法是把"清空"从 FileHandler(每进程各清、会打架)挪到**部署时只清一次**,之后所有 worker 各自 append: + +- `init_file_handler` 模式由新 env `ROCK_LOGGING_APPEND` 决定:置位 → `"a"`(append);**默认 `"w+"` 不变**,故 rocklet / cli 等单进程服务行为不受影响。 +- admin/proxy `main()`(master,spawn worker 之前):先 `reset_log_file()` 清空一次,再 `os.environ["ROCK_LOGGING_APPEND"]="true"`;worker 继承 env → 全部 append。 +- 安全性:master 在 import 期不写日志,清空发生在写入之前,不会在已有 writer offset>0 时 truncate 产生空洞。 +- 残留边界(本地盘 + 正常行可忽略):单条 > BufferedWriter 缓冲(~8KB)的日志(如访问日志 dump 大 body)在多进程 append 下可能交错;NFS 不保证 O_APPEND 原子。需彻底免疫则改 per-pid 文件名(本期不做,需求方仅要"清空一次 + 各 pid append")。 + +## 第 3 部分:httpx 连接池复用(选定优化) + +### 现状 + +- `_send_request`(控制面 JSON RPC)已用池化 `self._httpx_client` ✅。 +- `http_proxy`(`sandbox_proxy_service.py:951`)、`host_proxy`(`:873`)**每请求新建 `AsyncClient`** ❌ → 反复 TCP+TLS 握手、无 keepalive 复用。 + +### 改造:拆两个共享 client + +避免数据面长流(SSE/大响应)阻塞控制面短 RPC: + +| client | 用途 | 超时 | 池 | +|--------|------|------|-----| +| `_rpc_client` | `_send_request` 控制面 | 短(`proxy_config.timeout`) | 小 | +| `_proxy_client` | `http_proxy` / `host_proxy` 数据面(含 SSE 流式) | 读超时放宽/无总超时 | 大 | + +两者均在 `__init__` 创建、随进程存活,在服务关闭时统一 `aclose()`。 + +### 复用正确性要点 + +- 流式:`resp = await self._proxy_client.send(req, stream=True)`;生成器 `finally` **只 `await resp.aclose()`,绝不关闭共享 client**。 +- 非流式:`aread()` 后 `resp.aclose()`;同样不关 client。 +- **每请求超时**经 `build_request(timeout=...)` 覆盖,保留现有语义:SSE 无总超时 / 普通 120s / host_proxy 90s。 +- 池上限 `max_connections`/`max_keepalive` 经 `ProxyServiceConfig` 可配;长连接占池槽会形成期望中的背压,需按并发量设值。 +- WebSocket 走 `websockets.connect`,不受影响。 + +## 第 4 部分:容量预算(capacity budget) + +上线前按本节填数,定 worker 数与各池大小。 + +### 4.1 连接数(可精确控制) + +``` +PG 总连接 = pool_size × workers × pod_数 +约束: ≤ PG_max_connections(留余量给 admin/其他) +默认: pool_size = max(2, 100 // workers) → 每 Pod ≈ 100,与现状一致 +``` + +例:PG `max_connections=500`,留一半给其他,proxy 预算 250;`250 / (8 worker × 3 pod) ≈ 10`,设 proxy `pool_size=10`。 + +### 4.2 内存(需实测校准) + +- **worker 进程内存**:每个 worker 是完整 Python 进程(spawn 下几乎不共享),粗估单进程 RSS 100–300MB。`N worker ≈ N × 单进程RSS`,通常是多 worker 的**主要内存成本**,决定 worker 上限:运维显式设 `workers ≤ min(物理核数, 可用内存 / 单进程RSS)`(代码不自动按 cpu_count 选)。 +- **PG 每连接内存**:常被引用为 5–10MB/连接,但**此为经验估计,非本系统实测**;`top`/`ps` 的 RSS 因含共享内存(shared_buffers)会高估。上线前以私有内存实测校准: + +```bash +# 每个 PG backend 的私有内存(PSS,比 RSS 准) +for pid in $(pgrep -f "postgres:.*"); do + awk '/^Pss:/{s+=$2} END{printf "%.1f MB\n", s/1024}' /proc/$pid/smaps_rollup 2>/dev/null +done +# 连接现状 +# SELECT count(*) FROM pg_stat_activity; SHOW max_connections; +``` + +> 原则:连接数用硬公式精确控制;内存数字标注为经验估计、以实测为准,不拿估计值当拍 worker 数的依据。 + +## 第 5 部分:测试策略(TDD) + +进程 spawn 本身难单测,把可测逻辑抽出: + +1. **`create_app()` 按 role 返回正确路由集**:proxy 含 `sandbox_proxy_router`、不含 `sandbox_router`;admin 反之。 +2. **`resolve_workers(role, override, env_workers, env)`**(`rock/utils/worker.py`):admin 恒 1;**local/test/dev 恒 1(优先级高于 override,避免 fakeredis/in-mem 状态跨 worker 不共享)**;proxy 其余走 override>env>1;无 cpu_count 自动探测。 +3. **DB 池公式**:`max(2, base // workers)` 的边界(workers 极大 → 2;admin → base)。 +4. **httpx 复用**:调用 `http_proxy`/`host_proxy` 后断言**共享 client 未关闭、仍可用**;数据面/控制面使用各自共享实例(可注入 mock client)。 +5. **SSE 流式**:断言生成器结束只 `resp.aclose()`,不 close client。 + +CI 标记:大多为快测;涉及真实转发的归 `integration`。 + +--- +## 实现状态(2026-06-22) + +已实现并通过 627 项 admin+sandbox 单测:env `ROCK_PROXY_WORKERS`;`DatabaseConfig.pool_size` 可配;`resolve_workers`/`compute_pool_size` 纯函数;`main.py` app-factory + 多 worker(仅 proxy role);lifespan 按 worker 算池 + proxy 退出 `aclose`;`SandboxProxyService` 拆 `_rpc_client`/`_proxy_client` + `worker_pid` Metrics 标 + `http_proxy`/`host_proxy` 复用共享 client(不关闭)。 + +实现期连带修复:`tests/unit/sandbox/test_proxy_enhancements.py` 原先 patch `httpx.AsyncClient` 构造器,因 `http_proxy` 不再每请求建连而改为注入 `service._proxy_client`。 diff --git a/docs/_specs/proxy-multicore/03_plan.md b/docs/_specs/proxy-multicore/03_plan.md new file mode 100644 index 0000000000..abc051e0fb --- /dev/null +++ b/docs/_specs/proxy-multicore/03_plan.md @@ -0,0 +1,957 @@ +# sandbox_proxy_router 多核化 Implementation Plan + +> **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking. + +**Goal:** 让 `admin --role proxy`(sandbox_proxy_router)在单 Pod 内以 uvicorn 多 worker 跑满多核,并复用 httpx 连接池、按 worker 收口 DB 连接、给 Metrics 打 pid 标。 + +**Architecture:** `main.py` 改为 app-factory:主进程 `main()` 解析 argv 写入 env 并解析出 worker 数,`uvicorn.run("rock.admin.main:create_app", factory=True, workers=N)`;每个 worker 子进程调 `create_app()` 只读 env 构建路由。worker 数与 DB 池大小用纯函数计算(可单测)。仅 proxy role 多进程,admin role 恒 1。`SandboxProxyService` 拆出 `_rpc_client`(控制面)/`_proxy_client`(数据面),`http_proxy`/`host_proxy` 复用后者且绝不关闭共享 client。 + +**Tech Stack:** Python 3.10–3.12, FastAPI, uvicorn, httpx, SQLAlchemy(asyncpg), pytest(asyncio_mode=auto)。 + +参考 spec:`docs/_specs/proxy-multicore/01_requirement.md`、`02_design.md`。 + +--- + +## File Structure + +| 文件 | 责任 | 动作 | +|------|------|------| +| `rock/env_vars.py` | 新增 `ROCK_PROXY_WORKERS` 懒加载默认 | Modify | +| `rock/config.py` | `DatabaseConfig` 增加 `pool_size` 字段 | Modify | +| `rock/admin/core/db_provider.py` | 池大小从 `DatabaseConfig.pool_size` 读取,不再硬编码 | Modify | +| `rock/admin/worker_config.py` | 纯函数 `resolve_workers` / `compute_pool_size`(可单测) | Create | +| `rock/admin/main.py` | app-factory(`create_app`/`_include_routers`)+ `main()` 写 env、起多 worker;lifespan 改读 env、按 worker 算池、proxy 退出 aclose | Modify | +| `rock/sandbox/service/sandbox_proxy_service.py` | 拆 `_rpc_client`/`_proxy_client`、`aclose()`、Metrics pid 标、`http_proxy`/`host_proxy` 复用 client | Modify | +| `tests/unit/admin/test_worker_config.py` | worker/pool 纯函数测试 | Create | +| `tests/unit/admin/test_create_app_routes.py` | 路由按 role 装配测试 | Create | +| `tests/unit/admin/test_db_provider_pool.py` | DB 池可配测试 | Create | +| `tests/unit/sandbox/test_proxy_httpx_reuse.py` | 两 client 拆分 + 复用不关闭测试 | Create | + +--- + +## Task 1: 新增 `ROCK_PROXY_WORKERS` 环境变量 + +**Files:** +- Modify: `rock/env_vars.py`(`environment_variables` dict,在 `ROCK_ADMIN_ROLE` 之后) +- Test: `tests/unit/admin/test_worker_config.py`(本任务先放一个 env 读取测试) + +- [ ] **Step 1: 写失败测试** + +创建 `tests/unit/admin/test_worker_config.py`: + +```python +import importlib +import os + + +def test_rock_proxy_workers_defaults_to_zero(monkeypatch): + monkeypatch.delenv("ROCK_PROXY_WORKERS", raising=False) + import rock.env_vars as env_vars + + importlib.reload(env_vars) + assert env_vars.ROCK_PROXY_WORKERS == 0 + + +def test_rock_proxy_workers_reads_env(monkeypatch): + monkeypatch.setenv("ROCK_PROXY_WORKERS", "6") + import rock.env_vars as env_vars + + importlib.reload(env_vars) + assert env_vars.ROCK_PROXY_WORKERS == 6 +``` + +- [ ] **Step 2: 运行测试确认失败** + +Run: `uv run pytest tests/unit/admin/test_worker_config.py -v` +Expected: FAIL(`AttributeError: module 'rock.env_vars' has no attribute 'ROCK_PROXY_WORKERS'`) + +- [ ] **Step 3: 实现** + +在 `rock/env_vars.py` 的 `environment_variables` 字典中,`"ROCK_ADMIN_ROLE": ...` 那一行后面加: + +```python + "ROCK_PROXY_WORKERS": lambda: int(os.getenv("ROCK_PROXY_WORKERS", "0")), +``` + +- [ ] **Step 4: 运行测试确认通过** + +Run: `uv run pytest tests/unit/admin/test_worker_config.py -v` +Expected: PASS + +- [ ] **Step 5: 提交** + +```bash +git add rock/env_vars.py tests/unit/admin/test_worker_config.py +git commit -m "feat(env): add ROCK_PROXY_WORKERS env var (default 0=auto)" +``` + +--- + +## Task 2: `DatabaseConfig.pool_size` 可配 + `DatabaseProvider` 读取 + +**Files:** +- Modify: `rock/config.py`(`DatabaseConfig`,约 `:207-212`) +- Modify: `rock/admin/core/db_provider.py`(`init()`,`:40-47`) +- Test: `tests/unit/admin/test_db_provider_pool.py` + +- [ ] **Step 1: 写失败测试** + +创建 `tests/unit/admin/test_db_provider_pool.py`: + +```python +import pytest + +from rock.admin.core.db_provider import DatabaseProvider +from rock.config import DatabaseConfig + + +def test_database_config_has_pool_size_default(): + cfg = DatabaseConfig(url="") + assert cfg.pool_size == 100 + + +def test_database_config_pool_size_override(): + cfg = DatabaseConfig(url="", pool_size=10) + assert cfg.pool_size == 10 + + +@pytest.mark.asyncio +async def test_engine_uses_configured_pool_size_for_postgres(monkeypatch): + captured = {} + + def fake_create_async_engine(url, **kwargs): + captured.update(kwargs) + captured["url"] = url + return object() + + monkeypatch.setattr( + "rock.admin.core.db_provider.create_async_engine", fake_create_async_engine + ) + provider = DatabaseProvider( + db_config=DatabaseConfig(url="postgresql://u:p@h:5432/db", pool_size=7) + ) + await provider.init() + assert captured["pool_size"] == 7 + assert captured["max_overflow"] == 0 +``` + +- [ ] **Step 2: 运行测试确认失败** + +Run: `uv run pytest tests/unit/admin/test_db_provider_pool.py -v` +Expected: FAIL(`TypeError: __init__() got an unexpected keyword argument 'pool_size'`) + +- [ ] **Step 3: 实现 — `DatabaseConfig` 加字段** + +`rock/config.py`,把 `DatabaseConfig` 改为: + +```python +@dataclass +class DatabaseConfig: + # Supported URL formats: + # SQLite: sqlite:///relative/path.db or sqlite:////absolute/path.db + # PostgreSQL: postgresql://user:password@host:port/dbname + url: str = "" + pool_size: int = 100 # per-process PG pool; multi-worker shrinks this (see worker_config) +``` + +- [ ] **Step 4: 实现 — `DatabaseProvider` 读取** + +`rock/admin/core/db_provider.py`,`__init__` 保存 config,`init()` 用 `self._pool_size`: + +`__init__` 改为(保留原 `_convert_url`): + +```python + def __init__(self, db_config: DatabaseConfig) -> None: + self._url = self._convert_url(db_config.url) + self._pool_size = db_config.pool_size + self._engine: AsyncEngine | None = None +``` + +`init()` 内 asyncpg 分支改为: + +```python + if "asyncpg" in self._url: + engine_kwargs["connect_args"] = {"statement_cache_size": 0} + engine_kwargs["pool_size"] = self._pool_size + engine_kwargs["max_overflow"] = 0 + engine_kwargs["pool_timeout"] = 120 +``` + +- [ ] **Step 5: 运行测试确认通过** + +Run: `uv run pytest tests/unit/admin/test_db_provider_pool.py -v` +Expected: PASS + +- [ ] **Step 6: 提交** + +```bash +git add rock/config.py rock/admin/core/db_provider.py tests/unit/admin/test_db_provider_pool.py +git commit -m "feat(db): make PG pool_size configurable via DatabaseConfig" +``` + +--- + +## Task 3: 纯函数 `resolve_workers` / `compute_pool_size` + +**Files:** +- Create: `rock/admin/worker_config.py` +- Test: `tests/unit/admin/test_worker_config.py`(追加) + +- [ ] **Step 1: 写失败测试** + +在 `tests/unit/admin/test_worker_config.py` 末尾追加: + +```python +from rock.admin.worker_config import compute_pool_size, resolve_workers + + +def test_resolve_workers_admin_always_one(): + assert resolve_workers("admin", override=None, env_workers=8, cpu_count=16) == 1 + assert resolve_workers("admin", override=10, env_workers=8, cpu_count=16) == 1 + + +def test_resolve_workers_proxy_override_wins(): + assert resolve_workers("proxy", override=4, env_workers=8, cpu_count=16) == 4 + + +def test_resolve_workers_proxy_env_when_no_override(): + assert resolve_workers("proxy", override=None, env_workers=8, cpu_count=16) == 8 + + +def test_resolve_workers_proxy_auto_uses_cpu_count(): + assert resolve_workers("proxy", override=None, env_workers=0, cpu_count=16) == 16 + + +def test_resolve_workers_proxy_auto_floor_one(): + assert resolve_workers("proxy", override=None, env_workers=0, cpu_count=None) == 1 + + +def test_compute_pool_size_divides_by_workers(): + assert compute_pool_size(base=100, workers=8) == 12 + + +def test_compute_pool_size_floor_two(): + assert compute_pool_size(base=100, workers=200) == 2 + + +def test_compute_pool_size_single_worker_keeps_base(): + assert compute_pool_size(base=100, workers=1) == 100 +``` + +- [ ] **Step 2: 运行测试确认失败** + +Run: `uv run pytest tests/unit/admin/test_worker_config.py -v` +Expected: FAIL(`ModuleNotFoundError: No module named 'rock.admin.worker_config'`) + +- [ ] **Step 3: 实现** + +创建 `rock/admin/worker_config.py`: + +```python +"""Pure helpers for multi-worker proxy sizing. No I/O, fully unit-testable.""" + +from __future__ import annotations + +MIN_POOL_SIZE = 2 + + +def resolve_workers( + role: str, + override: int | None, + env_workers: int, + cpu_count: int | None, +) -> int: + """Resolve uvicorn worker count. + + admin role is always single-process (owns scheduler/Ray singletons). + proxy role: explicit override > env > auto(cpu_count), floored at 1. + """ + if role != "proxy": + return 1 + if override and override > 0: + return override + if env_workers and env_workers > 0: + return env_workers + return cpu_count or 1 + + +def compute_pool_size(base: int, workers: int) -> int: + """Per-worker DB pool so that workers * pool ~= base (no per-pod regression).""" + if workers <= 1: + return base + return max(MIN_POOL_SIZE, base // workers) +``` + +- [ ] **Step 4: 运行测试确认通过** + +Run: `uv run pytest tests/unit/admin/test_worker_config.py -v` +Expected: PASS(全部) + +- [ ] **Step 5: 提交** + +```bash +git add rock/admin/worker_config.py tests/unit/admin/test_worker_config.py +git commit -m "feat(admin): add pure resolve_workers/compute_pool_size helpers" +``` + +--- + +## Task 4: `main.py` 改 app-factory + 多 worker 启动 + +**Files:** +- Modify: `rock/admin/main.py`(顶层 argparse、`app = FastAPI(...)` 块、`main()`) +- Test: `tests/unit/admin/test_create_app_routes.py` + +本任务把"模块级 app + main() 里 include_router"重构为工厂函数 `create_app()`,并把路由装配抽成可测的 `_include_routers(app, role)`。 + +- [ ] **Step 1: 写失败测试** + +创建 `tests/unit/admin/test_create_app_routes.py`: + +```python +from fastapi import FastAPI + +from rock.admin.main import _include_routers + + +def _paths(app: FastAPI) -> set[str]: + return {getattr(r, "path", "") for r in app.routes} + + +def test_proxy_role_mounts_proxy_router(): + app = FastAPI() + _include_routers(app, role="proxy") + paths = _paths(app) + # proxy-only endpoint from sandbox_proxy_router + assert any(p.endswith("/get_token") for p in paths) + + +def test_proxy_role_excludes_admin_router(): + app = FastAPI() + _include_routers(app, role="proxy") + paths = _paths(app) + # admin-ops router must NOT be present on proxy role + assert not any("/ops" in p for p in paths) + + +def test_admin_role_mounts_admin_routers(): + app = FastAPI() + _include_routers(app, role="admin") + paths = _paths(app) + assert any("/ops" in p for p in paths) +``` + +- [ ] **Step 2: 运行测试确认失败** + +Run: `uv run pytest tests/unit/admin/test_create_app_routes.py -v` +Expected: FAIL(`ImportError: cannot import name '_include_routers'`) + +- [ ] **Step 3: 实现 — 删顶层 argparse,加工厂** + +`rock/admin/main.py` 顶部:删除模块级的 `parser = argparse.ArgumentParser()` / `add_argument` 三行 / `args = parser.parse_args()`(`:52-57`),替换为一个 `_parse_args()` 函数: + +```python +def _parse_args(): + parser = argparse.ArgumentParser() + parser.add_argument("--env", type=str, default="local") + parser.add_argument("--role", type=str, default="admin", choices=["admin", "proxy"]) + parser.add_argument("--port", type=int, default=8080) + parser.add_argument("--workers", type=int, default=None) + return parser.parse_args() +``` + +- [ ] **Step 4: 实现 — lifespan 改读 env(本任务先改 env/role 来源,池逻辑留 Task 5)** + +`lifespan` 内把 `args.env` 改为 `env_vars.ROCK_ADMIN_ENV`,`args.role` 改为 `env_vars.ROCK_ADMIN_ROLE`。具体: + +- `config_file_path` 那行 `f"rock-{args.env}.yml"` → `f"rock-{env_vars.ROCK_ADMIN_ENV}.yml"` +- 删除 `env_vars.ROCK_ADMIN_ENV = args.env` 与 `env_vars.ROCK_ADMIN_ROLE = args.role` 两行(env 由 `main()` 写,模块属性赋值对 `__getattr__` 模块无效) +- 所有 `if args.role == "admin":` / `else:` → `if env_vars.ROCK_ADMIN_ROLE == "admin":` +- `if args.env in ["local", "test", "dev"]` → `if env_vars.ROCK_ADMIN_ENV in ["local", "test", "dev"]` + +- [ ] **Step 5: 实现 — 抽 `_include_routers` + `create_app`,移动 app 级注册** + +把模块级的 `app = FastAPI(lifespan=lifespan)`、CORS、`add_exception_handler`、`@app.exception_handler`、`@app.get("/")`、`@app.middleware("http")` 全部移入工厂。做法: + +1. 把现有 `@app.exception_handler(Exception)` 的 `base_exception_handler`、`@app.get("/")` 的 `root`、`@app.middleware("http")` 的 `log_requests_and_responses` 改成**不带装饰器的普通 async 函数**(函数体不变)。 +2. 新增: + +```python +def _include_routers(app: FastAPI, role: str) -> None: + if role == "admin": + app.include_router(sandbox_router, prefix="/apis/envs/sandbox/v1", tags=["sandbox"]) + app.include_router(admin_ops_router, prefix="/apis/envs/sandbox/v1/ops", tags=["admin-ops"]) + else: + app.include_router(sandbox_proxy_router, prefix="/apis/envs/sandbox/v1", tags=["sandbox"]) + app.include_router(warmup_router, prefix="/apis/envs/sandbox/v1", tags=["warmup"]) + app.include_router(gem_router, prefix="/apis/v1/envs/gem", tags=["gem"]) + + +def create_app() -> FastAPI: + app = FastAPI(lifespan=lifespan) + app.add_middleware( + CORSMiddleware, + allow_origins=["*"], + allow_credentials=True, + allow_methods=["*"], + allow_headers=["*"], + ) + app.add_exception_handler(RequestValidationError, request_validation_exception_handler) + app.add_exception_handler(Exception, base_exception_handler) + app.middleware("http")(log_requests_and_responses) + app.add_api_route("/", root, methods=["GET"]) + _include_routers(app, role=env_vars.ROCK_ADMIN_ROLE) + return app +``` + +> 注意:`base_exception_handler(request, exc)`、`root()`、`log_requests_and_responses(request, call_next)` 现在是普通函数,签名保持不变。 + +- [ ] **Step 6: 实现 — `main()` 写 env、解析 worker、起多 worker** + +`main()` 改为: + +```python +def main(): + import os + + from rock.admin.worker_config import resolve_workers + + args = _parse_args() + os.environ["ROCK_ADMIN_ENV"] = args.env + os.environ["ROCK_ADMIN_ROLE"] = args.role + + workers = resolve_workers( + role=args.role, + override=args.workers, + env_workers=int(os.getenv("ROCK_PROXY_WORKERS", "0")), + cpu_count=os.cpu_count(), + ) + # write the *resolved* count back so each worker's lifespan sizes pools deterministically + os.environ["ROCK_PROXY_WORKERS"] = str(workers) + logger.info(f"starting role={args.role} port={args.port} workers={workers}") + + uvicorn.run( + "rock.admin.main:create_app", + factory=True, + host="0.0.0.0", + port=args.port, + workers=workers, + ws_ping_interval=None, + ws_ping_timeout=None, + timeout_keep_alive=30, + ) +``` + +删除旧 `main()` 里所有 `app.include_router(...)` 行(已移入 `_include_routers`)与旧 `uvicorn.run(app, ...)`。删除模块级 `app = FastAPI(...)` 及其下 CORS/handler 装饰器块(已移入 `create_app`)。 + +- [ ] **Step 7: 运行测试确认通过** + +Run: `uv run pytest tests/unit/admin/test_create_app_routes.py -v` +Expected: PASS + +- [ ] **Step 8: 冒烟 — 工厂可导入、单 worker 解析正确** + +Run: `uv run python -c "import os; os.environ['ROCK_ADMIN_ROLE']='proxy'; from rock.admin.main import create_app; a=create_app(); print('routes', len(a.routes))"` +Expected: 打印 routes 数量,无异常。 + +- [ ] **Step 9: 提交** + +```bash +git add rock/admin/main.py tests/unit/admin/test_create_app_routes.py +git commit -m "refactor(admin): app-factory + uvicorn multi-worker for proxy role" +``` + +--- + +## Task 5: lifespan 按 worker 算 DB 池 + proxy 退出 aclose + +**Files:** +- Modify: `rock/admin/main.py`(`lifespan` 内 db_provider 构建、teardown) + +- [ ] **Step 1: 实现 — 按 worker 计算 pool_size** + +`lifespan` 内,把: + +```python + db_provider = DatabaseProvider(db_config=DatabaseConfig(url=db_url)) +``` + +改为: + +```python + from rock.admin.worker_config import compute_pool_size + + workers = int(os.getenv("ROCK_PROXY_WORKERS", "1")) or 1 + effective_pool = compute_pool_size(base=rock_config.database.pool_size, workers=workers) + db_provider = DatabaseProvider(db_config=DatabaseConfig(url=db_url, pool_size=effective_pool)) + logger.info(f"db pool_size={effective_pool} (base={rock_config.database.pool_size}, workers={workers})") +``` + +> 文件顶部确保 `import os` 存在(已有 `import asyncio` 等;若无 `os` 则在导入区加 `import os`)。 +> 说明:admin role 时 `main()` 写入的 `ROCK_PROXY_WORKERS=1` → `compute_pool_size` 返回 base,行为不变。 + +- [ ] **Step 2: 实现 — proxy 退出时 aclose service 的 httpx clients** + +`lifespan` 的 `else:` 分支(proxy)里保存引用: + +```python + else: + sandbox_manager = SandboxProxyService(rock_config=rock_config, meta_store=meta_store) + set_sandbox_proxy_service(sandbox_manager) + proxy_service_ref = sandbox_manager +``` + +在 `yield` 之前先 `proxy_service_ref = None`(admin 分支默认);在 `yield` 之后的 teardown 段(`if db_provider:` 之前)加: + +```python + if proxy_service_ref is not None: + await proxy_service_ref.aclose() + logger.info("proxy httpx clients closed") +``` + +> `aclose()` 在 Task 6 实现;本步骤先接好调用点。两任务在同会话顺序执行,Task 6 紧随其后即可。 + +- [ ] **Step 3: 运行相关测试(确保未破坏既有 admin 测试)** + +Run: `uv run pytest tests/unit/admin -v -m "not need_ray and not need_admin and not need_admin_and_network" --reruns 1` +Expected: PASS(已存在的 admin 单测 + 本计划新增的 admin 单测) + +- [ ] **Step 4: 提交** + +```bash +git add rock/admin/main.py +git commit -m "feat(admin): size DB pool per-worker and aclose proxy clients on shutdown" +``` + +--- + +## Task 6: `SandboxProxyService` 拆两个 httpx client + aclose + Metrics pid 标 + +**Files:** +- Modify: `rock/sandbox/service/sandbox_proxy_service.py`(`__init__` `:54-95`、`_send_request` `:659`) +- Test: `tests/unit/sandbox/test_proxy_httpx_reuse.py` + +- [ ] **Step 1: 写失败测试** + +创建 `tests/unit/sandbox/test_proxy_httpx_reuse.py`: + +```python +import os + +import httpx +import pytest + + +@pytest.mark.asyncio +async def test_service_has_two_distinct_httpx_clients(sandbox_proxy_service): + svc = sandbox_proxy_service + assert isinstance(svc._rpc_client, httpx.AsyncClient) + assert isinstance(svc._proxy_client, httpx.AsyncClient) + assert svc._rpc_client is not svc._proxy_client + + +@pytest.mark.asyncio +async def test_metrics_monitor_has_worker_pid_tag(sandbox_proxy_service): + tags = sandbox_proxy_service.metrics_monitor.user_defined_tags + assert tags.get("worker_pid") == str(os.getpid()) + + +@pytest.mark.asyncio +async def test_aclose_closes_both_clients(sandbox_proxy_service): + svc = sandbox_proxy_service + await svc.aclose() + assert svc._rpc_client.is_closed + assert svc._proxy_client.is_closed +``` + +- [ ] **Step 2: 运行测试确认失败** + +Run: `uv run pytest tests/unit/sandbox/test_proxy_httpx_reuse.py -v` +Expected: FAIL(`AttributeError: ... '_rpc_client'`) + +- [ ] **Step 3: 实现 — `__init__` 拆 client + pid 标** + +`rock/sandbox/service/sandbox_proxy_service.py`: + +文件顶部导入区加 `import os`(若无)。 + +把类属性 `_httpx_client = None`(`:52`)删除。 + +`__init__` 中,把 MetricsMonitor 构建改为带 pid 标: + +```python + self.metrics_monitor = MetricsMonitor.create( + export_interval_millis=20_000, + metrics_endpoint=rock_config.runtime.metrics_endpoint, + user_defined_tags={ + **(rock_config.runtime.user_defined_tags or {}), + "worker_pid": str(os.getpid()), + }, + ) +``` + +把原来单个 `self._httpx_client = httpx.AsyncClient(...)` 块(`:66-72`)替换为两个 client: + +```python + # Control-plane RPC client: short JSON calls to rocklet. + self._rpc_client = httpx.AsyncClient( + timeout=self.proxy_config.timeout, + limits=httpx.Limits( + max_connections=self.proxy_config.max_connections, + max_keepalive_connections=self.proxy_config.max_keepalive_connections, + ), + ) + # Data-plane proxy client: streaming/SSE/large bodies. No total timeout; + # per-request timeout is set via build_request(timeout=...). NEVER closed + # per-request — lives for the process lifetime, closed in aclose(). + self._proxy_client = httpx.AsyncClient( + timeout=httpx.Timeout(None), + limits=httpx.Limits( + max_connections=self.proxy_config.max_connections, + max_keepalive_connections=self.proxy_config.max_keepalive_connections, + ), + ) +``` + +- [ ] **Step 4: 实现 — `_send_request` 用 `_rpc_client`** + +`_send_request` 内 `await self._httpx_client.request(...)`(`:659`)改为 `await self._rpc_client.request(...)`。 + +- [ ] **Step 5: 实现 — 新增 `aclose`** + +在类内(`__init__` 之后任意位置)加: + +```python + async def aclose(self) -> None: + """Close both shared httpx clients. Called on proxy shutdown.""" + await self._rpc_client.aclose() + await self._proxy_client.aclose() +``` + +- [ ] **Step 6: 运行测试确认通过** + +Run: `uv run pytest tests/unit/sandbox/test_proxy_httpx_reuse.py -v` +Expected: PASS + +- [ ] **Step 7: 提交** + +```bash +git add rock/sandbox/service/sandbox_proxy_service.py tests/unit/sandbox/test_proxy_httpx_reuse.py +git commit -m "feat(proxy): split rpc/proxy httpx clients, add aclose, tag metrics with pid" +``` + +--- + +## Task 7: `http_proxy` 复用 `_proxy_client`(不关闭) + +**Files:** +- Modify: `rock/sandbox/service/sandbox_proxy_service.py`(`http_proxy` `:901-1019`) +- Test: `tests/unit/sandbox/test_proxy_httpx_reuse.py`(追加) + +- [ ] **Step 1: 写失败测试** + +在 `tests/unit/sandbox/test_proxy_httpx_reuse.py` 追加: + +```python +from unittest.mock import AsyncMock + +import httpx +import pytest + +from rock.deployments.constants import Port +from starlette.datastructures import Headers + + +def _fake_status(): + return {"host_ip": "1.2.3.4", "port_mapping": {Port.SERVER.value: 18080}} + + +@pytest.mark.asyncio +async def test_http_proxy_reuses_proxy_client_without_closing(sandbox_proxy_service, monkeypatch): + svc = sandbox_proxy_service + + def handler(request: httpx.Request) -> httpx.Response: + return httpx.Response(200, json={"ok": True}) + + svc._proxy_client = httpx.AsyncClient(transport=httpx.MockTransport(handler)) + monkeypatch.setattr(svc, "_update_expire_time", AsyncMock()) + monkeypatch.setattr(svc, "get_service_status", AsyncMock(return_value=[_fake_status()])) + + resp = await svc.http_proxy( + sandbox_id="sb1", + target_path="hello", + body=None, + headers=Headers({}), + method="GET", + ) + + assert resp.status_code == 200 + # shared client must remain open for reuse + assert not svc._proxy_client.is_closed +``` + +- [ ] **Step 2: 运行测试确认失败** + +Run: `uv run pytest tests/unit/sandbox/test_proxy_httpx_reuse.py::test_http_proxy_reuses_proxy_client_without_closing -v` +Expected: FAIL(当前实现新建本地 client 并在 finally `await client.aclose()`,不会复用 `_proxy_client`;且本地 client 会被关闭——断言 `_proxy_client` 未关闭可能"假通过",所以下一步实现后这条才真正验证复用。若失败信息为本地 client 行为,继续实现) + +> 注:这条测试的核心是实现后断言 `_proxy_client` 真正被用到且未被关闭。失败阶段主要确保测试能跑(无导入/构造错误)。 + +- [ ] **Step 3: 实现 — 复用 `_proxy_client`,移除本地 client 创建与关闭** + +`http_proxy` 内: + +删除: + +```python + client = httpx.AsyncClient(timeout=httpx.Timeout(None)) + + try: + resp = await client.send( + client.build_request( + method=method, + url=target_url, + headers=request_headers, + timeout=120, + **request_kwargs, + ), + stream=True, + ) + except Exception: + await client.aclose() + raise +``` + +替换为: + +```python + try: + resp = await self._proxy_client.send( + self._proxy_client.build_request( + method=method, + url=target_url, + headers=request_headers, + timeout=120, + **request_kwargs, + ), + stream=True, + ) + except Exception: + raise +``` + +SSE 分支的 `event_stream()` 的 `finally`:把 + +```python + finally: + await resp.aclose() + await client.aclose() +``` + +改为(只关 response,不关共享 client): + +```python + finally: + await resp.aclose() +``` + +非流式分支末尾的 `finally`:把 + +```python + finally: + await resp.aclose() + await client.aclose() +``` + +改为: + +```python + finally: + await resp.aclose() +``` + +- [ ] **Step 4: 运行测试确认通过** + +Run: `uv run pytest tests/unit/sandbox/test_proxy_httpx_reuse.py -v` +Expected: PASS + +- [ ] **Step 5: 提交** + +```bash +git add rock/sandbox/service/sandbox_proxy_service.py tests/unit/sandbox/test_proxy_httpx_reuse.py +git commit -m "perf(proxy): reuse shared _proxy_client in http_proxy (no per-request client)" +``` + +--- + +## Task 8: `host_proxy` 复用 `_proxy_client`(不关闭) + +**Files:** +- Modify: `rock/sandbox/service/sandbox_proxy_service.py`(`host_proxy` `:843-899`) +- Test: `tests/unit/sandbox/test_proxy_httpx_reuse.py`(追加) + +- [ ] **Step 1: 写失败测试** + +追加: + +```python +@pytest.mark.asyncio +async def test_host_proxy_reuses_proxy_client_without_closing(sandbox_proxy_service): + svc = sandbox_proxy_service + + def handler(request: httpx.Request) -> httpx.Response: + return httpx.Response(200, json={"pong": True}) + + svc._proxy_client = httpx.AsyncClient(transport=httpx.MockTransport(handler)) + + resp = await svc.host_proxy( + host_ip="10.0.0.1", + target_path="ping", + body={"a": 1}, + headers=Headers({}), + ) + + assert resp.status_code == 200 + assert not svc._proxy_client.is_closed +``` + +- [ ] **Step 2: 运行测试确认失败** + +Run: `uv run pytest tests/unit/sandbox/test_proxy_httpx_reuse.py::test_host_proxy_reuses_proxy_client_without_closing -v` +Expected: FAIL(当前 `host_proxy` 用 `async with httpx.AsyncClient(...)` 本地 client,不会用 `_proxy_client`;实现后此条验证复用) + +- [ ] **Step 3: 实现 — 复用 `_proxy_client`** + +`host_proxy` 内,把: + +```python + async with httpx.AsyncClient(timeout=httpx.Timeout(90)) as http_client: + try: + resp = await http_client.post( + url=target_url, + json=payload, + headers=request_headers, + ) + except httpx.RequestError as exc: + logger.error(f"Error forwarding request to {target_url}: {exc}", exc_info=True) + raise Exception(f"Service unavailable: Rocklet at {host_ip}:{Port.PROXY.value} is not reachable.") + + content_type = resp.headers.get("content-type", "") + response_headers = filter_headers(resp.headers) + + if "application/json" in content_type: + return JSONResponse( + status_code=resp.status_code, + content=resp.json(), + headers=response_headers, + ) + + return Response( + status_code=resp.status_code, + content=resp.content, + media_type=content_type or "application/octet-stream", + headers=response_headers, + ) +``` + +替换为(去掉 `async with`,改用共享 client + 每请求 timeout=90;不关闭): + +```python + try: + resp = await self._proxy_client.post( + url=target_url, + json=payload, + headers=request_headers, + timeout=90, + ) + except httpx.RequestError as exc: + logger.error(f"Error forwarding request to {target_url}: {exc}", exc_info=True) + raise Exception(f"Service unavailable: Rocklet at {host_ip}:{Port.PROXY.value} is not reachable.") + + content_type = resp.headers.get("content-type", "") + response_headers = filter_headers(resp.headers) + + if "application/json" in content_type: + return JSONResponse( + status_code=resp.status_code, + content=resp.json(), + headers=response_headers, + ) + + return Response( + status_code=resp.status_code, + content=resp.content, + media_type=content_type or "application/octet-stream", + headers=response_headers, + ) +``` + +- [ ] **Step 4: 运行测试确认通过** + +Run: `uv run pytest tests/unit/sandbox/test_proxy_httpx_reuse.py -v` +Expected: PASS(全部) + +- [ ] **Step 5: 提交** + +```bash +git add rock/sandbox/service/sandbox_proxy_service.py tests/unit/sandbox/test_proxy_httpx_reuse.py +git commit -m "perf(proxy): reuse shared _proxy_client in host_proxy" +``` + +--- + +## Task 9: 回归 + lint + 容量预算文档落地 + +**Files:** +- Modify: `docs/_specs/proxy-multicore/02_design.md`(补一行"实现状态") + +- [ ] **Step 1: 跑相关单测全集** + +Run: +```bash +uv run pytest tests/unit/admin tests/unit/sandbox -m "not need_ray and not need_admin and not need_admin_and_network" --reruns 1 +``` +Expected: PASS(无 regression) + +- [ ] **Step 2: lint + format** + +Run: +```bash +uv run ruff check --fix rock/ tests/unit/admin tests/unit/sandbox +uv run ruff format rock/admin/main.py rock/admin/worker_config.py rock/sandbox/service/sandbox_proxy_service.py rock/config.py rock/env_vars.py rock/admin/core/db_provider.py +``` +Expected: 无错误 + +- [ ] **Step 3: 文档补实现状态** + +在 `docs/_specs/proxy-multicore/02_design.md` 末尾追加: + +```markdown +--- +## 实现状态(2026-06-22) + +已实现:env `ROCK_PROXY_WORKERS`;`DatabaseConfig.pool_size` 可配;`resolve_workers`/`compute_pool_size` 纯函数;`main.py` app-factory + 多 worker(仅 proxy);lifespan 按 worker 算池 + proxy 退出 aclose;`SandboxProxyService` 拆 `_rpc_client`/`_proxy_client` + pid 标 + `http_proxy`/`host_proxy` 复用。 +``` + +- [ ] **Step 4: 提交** + +```bash +git add docs/_specs/proxy-multicore/02_design.md +git commit -m "docs(spec): mark proxy-multicore implementation status" +``` + +--- + +## 部署侧后续(本计划范围外,交付说明) + +- 生产启动命令需带 worker 数:`admin --role proxy --port 8080 --env --workers ` 或设 `ROCK_PROXY_WORKERS`。不设则默认 `os.cpu_count()`。 +- 上线前按 `02_design.md` 第 4 节"容量预算"填数:`pool_size × workers × pods ≤ PG_max_connections`;worker 上限 `min(cpu_count, 可用内存 / 单进程RSS)`,内存以 `smaps_rollup` 实测校准。 +- k8s 资源:Pod 内存需 ≥ `workers × 单进程RSS`,否则会 OOM。 + +--- + +## Self-Review 检查记录 + +- **Spec 覆盖**:多 worker(T1/T3/T4)、env 传参(T4)、仅 proxy 多进程(T3 `resolve_workers`)、DB 池可配+按 worker 缩(T2/T3/T5)、Metrics pid 标(T6)、httpx 拆池复用(T6/T7/T8)、aclose 生命周期(T5/T6)、容量预算文档(T9)——均有任务对应。 +- **Placeholder 扫描**:无 TBD/TODO;每个代码步骤含完整代码。 +- **类型/命名一致**:`resolve_workers`/`compute_pool_size` 跨 T3/T4/T5 签名一致;`_rpc_client`/`_proxy_client`/`aclose` 跨 T6/T7/T8 一致;`_include_routers`/`create_app` 跨 T4 测试与实现一致。 diff --git a/rock/admin/core/db_provider.py b/rock/admin/core/db_provider.py index 539652aeea..5853d9d27c 100644 --- a/rock/admin/core/db_provider.py +++ b/rock/admin/core/db_provider.py @@ -23,6 +23,7 @@ class DatabaseProvider: def __init__(self, db_config: DatabaseConfig) -> None: self._url = self._convert_url(db_config.url) + self._pool_size = db_config.pool_size self._engine: AsyncEngine | None = None @property @@ -40,7 +41,7 @@ async def init(self) -> None: engine_kwargs: dict[str, object] = {"echo": False} if "asyncpg" in self._url: engine_kwargs["connect_args"] = {"statement_cache_size": 0} - engine_kwargs["pool_size"] = 100 + engine_kwargs["pool_size"] = self._pool_size engine_kwargs["max_overflow"] = 0 engine_kwargs["pool_timeout"] = 120 diff --git a/rock/admin/main.py b/rock/admin/main.py index a3c7537f40..df28f13e84 100644 --- a/rock/admin/main.py +++ b/rock/admin/main.py @@ -2,6 +2,7 @@ import asyncio import json import logging +import os import time import traceback import uuid @@ -39,7 +40,7 @@ from rock.admin.service.ops_service import OpsService from rock.common.exception import request_validation_exception_handler from rock.config import DatabaseConfig, RockConfig, SchedulerConfig -from rock.logger import init_logger +from rock.logger import init_logger, reset_log_file from rock.sandbox.gem_manager import GemManager from rock.sandbox.operator.factory import OperatorContext, OperatorFactory from rock.sandbox.sandbox_meta_store import SandboxMetaStore @@ -48,13 +49,17 @@ from rock.utils import EAGLE_EYE_TRACE_ID, sandbox_id_ctx_var, trace_id_ctx_var from rock.utils.providers import RedisProvider from rock.utils.system import is_primary_pod +from rock.utils.worker import compute_pool_size, resolve_workers -parser = argparse.ArgumentParser() -parser.add_argument("--env", type=str, default="local") -parser.add_argument("--role", type=str, default="admin", choices=["admin", "proxy"]) -parser.add_argument("--port", type=int, default=8080) -args = parser.parse_args() +def _parse_args(): + parser = argparse.ArgumentParser() + parser.add_argument("--env", type=str, default="local") + parser.add_argument("--role", type=str, default="admin", choices=["admin", "proxy"]) + parser.add_argument("--port", type=int, default=8080) + parser.add_argument("--workers", type=int, default=None) + return parser.parse_args() + logger = init_logger("admin") logging.getLogger("urllib3").setLevel(logging.WARNING) @@ -87,7 +92,7 @@ def _init_ops_service( @asynccontextmanager async def lifespan(app: FastAPI): config_file_path = ( - Path(__file__).resolve().parents[2] / env_vars.ROCK_CONFIG_DIR_NAME / f"rock-{args.env}.yml" + Path(__file__).resolve().parents[2] / env_vars.ROCK_CONFIG_DIR_NAME / f"rock-{env_vars.ROCK_ADMIN_ENV}.yml" if not env_vars.ROCK_CONFIG else env_vars.ROCK_CONFIG ) @@ -100,11 +105,8 @@ async def lifespan(app: FastAPI): rock_config.scheduler = SchedulerConfig(**nacos_config["scheduler"]) logger.info(f"Overrode scheduler config from Nacos with {len(rock_config.scheduler.tasks)} tasks") - env_vars.ROCK_ADMIN_ENV = args.env - env_vars.ROCK_ADMIN_ROLE = args.role - # init redis provider (fallback to fakeredis if no host configured) - if args.env in ["local", "test", "dev"] or not rock_config.redis.host: + if env_vars.ROCK_ADMIN_ENV in ["local", "test", "dev"] or not rock_config.redis.host: from fakeredis import aioredis if not rock_config.redis.host: @@ -123,7 +125,10 @@ async def lifespan(app: FastAPI): db_url = rock_config.database.url or "sqlite+aiosqlite:///:memory:" if not rock_config.database.url: logger.info("database.url is not configured, falling back to SQLite in-memory") - db_provider = DatabaseProvider(db_config=DatabaseConfig(url=db_url)) + workers = int(os.getenv("ROCK_PROXY_WORKERS", "1")) or 1 + effective_pool = compute_pool_size(base=rock_config.database.pool_size, workers=workers) + db_provider = DatabaseProvider(db_config=DatabaseConfig(url=db_url, pool_size=effective_pool)) + logger.info(f"db pool_size={effective_pool} (base={rock_config.database.pool_size}, workers={workers})") await db_provider.init() if not rock_config.database.url: await db_provider.create_tables() @@ -144,7 +149,8 @@ async def lifespan(app: FastAPI): scheduler_thread = None # init sandbox service - if args.role == "admin": + proxy_service_ref = None + if env_vars.ROCK_ADMIN_ROLE == "admin": # init ray service ray_service = RayService(rock_config.ray) ray_service.init() @@ -199,6 +205,7 @@ async def lifespan(app: FastAPI): else: sandbox_manager = SandboxProxyService(rock_config=rock_config, meta_store=meta_store) set_sandbox_proxy_service(sandbox_manager) + proxy_service_ref = sandbox_manager logger.info("rock-admin start") @@ -209,6 +216,10 @@ async def lifespan(app: FastAPI): scheduler_thread.stop() logger.info("Scheduler thread stopped") + if proxy_service_ref is not None: + await proxy_service_ref.aclose() + logger.info("proxy httpx clients closed") + if db_provider: await db_provider.close() @@ -218,43 +229,16 @@ async def lifespan(app: FastAPI): logger.info("rock-admin exit") -app = FastAPI(lifespan=lifespan) - -# --- CORS configuration start --- -# Allowed origins list -origins = [ - "*", # Your frontend origin -] - -app.add_middleware( - CORSMiddleware, - allow_origins=origins, # Set allowed origins - allow_credentials=True, # Whether to support cookie cross-origin - allow_methods=["*"], # Allow all methods - allow_headers=["*"], # Allow all headers -) -# --- CORS configuration end --- - - -# Pydantic validation errors are matched by FastAPI before the catch-all Exception -# handler below — register an explicit override so they come out as RockResponse -# envelopes instead of the default 422 ``{"detail": [...]}``. -app.add_exception_handler(RequestValidationError, request_validation_exception_handler) - - -@app.exception_handler(Exception) async def base_exception_handler(request: Request, exc: Exception): exc_content = {"detail": str(exc), "traceback": traceback.format_exc().split("\n")} logger.error(f"[app error] request:[{request}], exc:[{exc_content}]") return JSONResponse(status_code=500, content=exc_content) -@app.get("/") async def root(): return {"message": "hello, ROCK!"} -@app.middleware("http") async def log_requests_and_responses(request: Request, call_next): req_logger = init_logger("accessLog") @@ -297,9 +281,8 @@ async def log_requests_and_responses(request: Request, call_next): return response -def main(): - # config router - if args.role == "admin": +def _include_routers(app: FastAPI, role: str) -> None: + if role == "admin": app.include_router(sandbox_router, prefix="/apis/envs/sandbox/v1", tags=["sandbox"]) app.include_router(admin_ops_router, prefix="/apis/envs/sandbox/v1/ops", tags=["admin-ops"]) else: @@ -307,7 +290,54 @@ def main(): app.include_router(warmup_router, prefix="/apis/envs/sandbox/v1", tags=["warmup"]) app.include_router(gem_router, prefix="/apis/v1/envs/gem", tags=["gem"]) - uvicorn.run(app, host="0.0.0.0", port=args.port, ws_ping_interval=None, ws_ping_timeout=None, timeout_keep_alive=30) + +def create_app() -> FastAPI: + app = FastAPI(lifespan=lifespan) + app.add_middleware( + CORSMiddleware, + allow_origins=["*"], + allow_credentials=True, + allow_methods=["*"], + allow_headers=["*"], + ) + app.add_exception_handler(RequestValidationError, request_validation_exception_handler) + app.add_exception_handler(Exception, base_exception_handler) + app.middleware("http")(log_requests_and_responses) + app.add_api_route("/", root, methods=["GET"]) + _include_routers(app, role=env_vars.ROCK_ADMIN_ROLE) + return app + + +def main(): + args = _parse_args() + os.environ["ROCK_ADMIN_ENV"] = args.env + os.environ["ROCK_ADMIN_ROLE"] = args.role + + # Clear the log file once at deploy (master), then make every worker append + # to the shared file. Must run before workers spawn so only the master clears. + reset_log_file() + os.environ["ROCK_LOGGING_APPEND"] = "true" + + workers = resolve_workers( + role=args.role, + override=args.workers, + env_workers=int(os.getenv("ROCK_PROXY_WORKERS", "0")), + env=args.env, + ) + # write resolved count back so each worker's lifespan sizes pools deterministically + os.environ["ROCK_PROXY_WORKERS"] = str(workers) + logger.info(f"starting role={args.role} port={args.port} workers={workers}") + + uvicorn.run( + "rock.admin.main:create_app", + factory=True, + host="0.0.0.0", + port=args.port, + workers=workers, + ws_ping_interval=None, + ws_ping_timeout=None, + timeout_keep_alive=30, + ) if __name__ == "__main__": diff --git a/rock/config.py b/rock/config.py index f2bd988734..2789d391b1 100644 --- a/rock/config.py +++ b/rock/config.py @@ -210,6 +210,7 @@ class DatabaseConfig: # SQLite: sqlite:///relative/path.db or sqlite:////absolute/path.db # PostgreSQL: postgresql://user:password@host:port/dbname url: str = "" + pool_size: int = 100 # per-process PG pool; multi-worker shrinks this (see utils.worker) @dataclass diff --git a/rock/env_vars.py b/rock/env_vars.py index 11a9830613..1cb66ed0cd 100644 --- a/rock/env_vars.py +++ b/rock/env_vars.py @@ -9,6 +9,7 @@ ROCK_LOGGING_PATH: str | None = None ROCK_LOGGING_FILE_NAME: str | None = None ROCK_LOGGING_LEVEL: str | None = None + ROCK_LOGGING_APPEND: bool = False ROCK_SERVICE_STATUS_DIR: str | None = None ROCK_SCHEDULER_STATUS_DIR: str | None = None ROCK_CONFIG: str | None = None @@ -78,6 +79,7 @@ "ROCK_LOGGING_PATH": lambda: os.getenv("ROCK_LOGGING_PATH"), "ROCK_LOGGING_FILE_NAME": lambda: os.getenv("ROCK_LOGGING_FILE_NAME", "rocklet.log"), "ROCK_LOGGING_LEVEL": lambda: os.getenv("ROCK_LOGGING_LEVEL", "INFO"), + "ROCK_LOGGING_APPEND": lambda: os.getenv("ROCK_LOGGING_APPEND", "false").lower() == "true", "ROCK_SERVICE_STATUS_DIR": lambda: os.getenv("ROCK_SERVICE_STATUS_DIR", "/tmp"), "ROCK_SCHEDULER_STATUS_DIR": lambda: os.getenv("ROCK_SCHEDULER_STATUS_DIR", "/data/scheduler_status"), "ROCK_CONFIG": lambda: os.getenv("ROCK_CONFIG"), @@ -108,6 +110,7 @@ "ROCK_PYTHON_ENV_PATH": lambda: os.getenv("ROCK_PYTHON_ENV_PATH", sys.base_prefix), "ROCK_ADMIN_ENV": lambda: os.getenv("ROCK_ADMIN_ENV", "dev"), "ROCK_ADMIN_ROLE": lambda: os.getenv("ROCK_ADMIN_ROLE", "write"), + "ROCK_PROXY_WORKERS": lambda: int(os.getenv("ROCK_PROXY_WORKERS", "0")), "ROCK_FORCE_PRIMARY_POD": lambda: os.getenv("ROCK_FORCE_PRIMARY_POD", "false").lower() == "true", "ROCK_CLI_LOAD_PATHS": lambda: os.getenv("ROCK_CLI_LOAD_PATHS", str(Path(__file__).parent / "cli" / "command")), "ROCK_CLI_DEFAULT_CONFIG_PATH": lambda: os.getenv( diff --git a/rock/logger.py b/rock/logger.py index 30e55844eb..10efeeae10 100644 --- a/rock/logger.py +++ b/rock/logger.py @@ -73,12 +73,36 @@ def init_file_handler(log_name: str): # Ensure directory exists os.makedirs(os.path.dirname(log_file_path), exist_ok=True) - handler = logging.FileHandler(log_file_path, mode="w+", encoding="utf-8") + # Multi-worker safe: when ROCK_LOGGING_APPEND is set, every worker opens + # the shared file in append mode (no per-process truncate race). The + # one-time clear-on-deploy is done by the entrypoint via reset_log_file(). + # Default stays "w+" so single-process services (rocklet, cli) keep their + # truncate-on-start behavior. + mode = "a" if env_vars.ROCK_LOGGING_APPEND else "w+" + handler = logging.FileHandler(log_file_path, mode=mode, encoding="utf-8") handler.setFormatter(TimezoneFormatter(log_color_enable=False, tz_string=env_vars.ROCK_TIME_ZONE)) return handler return None +def reset_log_file(file_name: str | None = None) -> None: + """Truncate the configured log file once (e.g. at deploy / master startup). + + Under multi-worker, all workers open the same file in append mode, so the + one-time clear must happen here in the master BEFORE workers spawn — doing + it in the FileHandler would make each worker truncate and race. No-op when + file logging is disabled (stdout mode). + """ + if not env_vars.ROCK_LOGGING_PATH: + return + file_name = file_name or env_vars.ROCK_LOGGING_FILE_NAME + if not file_name: + return + log_file_path = os.path.join(env_vars.ROCK_LOGGING_PATH, file_name) + os.makedirs(os.path.dirname(log_file_path), exist_ok=True) + open(log_file_path, "w").close() + + def init_logger(name: str | None = None, file_name: str | None = None): """Initialize and return a logger instance with custom handler and formatter diff --git a/rock/sandbox/service/sandbox_proxy_service.py b/rock/sandbox/service/sandbox_proxy_service.py index e42564a7bd..8fb80309a7 100644 --- a/rock/sandbox/service/sandbox_proxy_service.py +++ b/rock/sandbox/service/sandbox_proxy_service.py @@ -1,5 +1,6 @@ import asyncio # noqa: I001 import json +import os from fastapi.responses import JSONResponse, StreamingResponse from starlette.datastructures import Headers @@ -49,27 +50,38 @@ class SandboxProxyService: - _httpx_client = None - def __init__(self, rock_config: RockConfig, meta_store: SandboxMetaStore): self._rock_config = rock_config self._meta_store = meta_store self.metrics_monitor = MetricsMonitor.create( export_interval_millis=20_000, metrics_endpoint=rock_config.runtime.metrics_endpoint, - user_defined_tags=rock_config.runtime.user_defined_tags, + user_defined_tags={ + **(rock_config.runtime.user_defined_tags or {}), + "worker_pid": str(os.getpid()), + }, ) self.oss_config: OssConfig = rock_config.oss self.proxy_config: ProxyServiceConfig = rock_config.proxy_service logger.info(f"proxy config: {self.proxy_config}") - # Initialize httpx client with configuration - self._httpx_client = httpx.AsyncClient( + # Control-plane RPC client: short JSON calls to rocklet. + self._rpc_client = httpx.AsyncClient( timeout=self.proxy_config.timeout, limits=httpx.Limits( max_connections=self.proxy_config.max_connections, max_keepalive_connections=self.proxy_config.max_keepalive_connections, ), ) + # Data-plane proxy client: streaming/SSE/large bodies. No total timeout; + # per-request timeout is set via build_request(timeout=...). NEVER closed + # per-request — lives for the process lifetime, closed in aclose(). + self._proxy_client = httpx.AsyncClient( + timeout=httpx.Timeout(None), + limits=httpx.Limits( + max_connections=self.proxy_config.max_connections, + max_keepalive_connections=self.proxy_config.max_keepalive_connections, + ), + ) # Replace single self.sts_client with a dict keyed by account name, # so /get_token?account=legacy|primary maps to the right credentials. @@ -94,6 +106,11 @@ def __init__(self, rock_config: RockConfig, meta_store: SandboxMetaStore): self._batch_get_status_max_count = rock_config.proxy_service.batch_get_status_max_count self._validate_oss_config_or_warn() + async def aclose(self) -> None: + """Close both shared httpx clients. Called on proxy shutdown.""" + await self._rpc_client.aclose() + await self._proxy_client.aclose() + def _validate_oss_config_or_warn(self) -> None: # Same resolution order as gen_oss_sts_token: env > YAML endpoint = env_vars.ROCK_OSS_BUCKET_ENDPOINT or self.oss_config.endpoint @@ -656,7 +673,7 @@ async def _send_request( # Make request try: - response = await self._httpx_client.request( + response = await self._rpc_client.request( method=method, url=full_request_url, headers=headers, @@ -870,34 +887,34 @@ def filter_headers(raw_headers: Headers) -> dict: request_headers = filter_headers(headers) payload = body or {} - async with httpx.AsyncClient(timeout=httpx.Timeout(90)) as http_client: - try: - resp = await http_client.post( - url=target_url, - json=payload, - headers=request_headers, - ) - except httpx.RequestError as exc: - logger.error(f"Error forwarding request to {target_url}: {exc}", exc_info=True) - raise Exception(f"Service unavailable: Rocklet at {host_ip}:{Port.PROXY.value} is not reachable.") - - content_type = resp.headers.get("content-type", "") - response_headers = filter_headers(resp.headers) + try: + resp = await self._proxy_client.post( + url=target_url, + json=payload, + headers=request_headers, + timeout=90, + ) + except httpx.RequestError as exc: + logger.error(f"Error forwarding request to {target_url}: {exc}", exc_info=True) + raise Exception(f"Service unavailable: Rocklet at {host_ip}:{Port.PROXY.value} is not reachable.") - if "application/json" in content_type: - return JSONResponse( - status_code=resp.status_code, - content=resp.json(), - headers=response_headers, - ) + content_type = resp.headers.get("content-type", "") + response_headers = filter_headers(resp.headers) - return Response( + if "application/json" in content_type: + return JSONResponse( status_code=resp.status_code, - content=resp.content, - media_type=content_type or "application/octet-stream", + content=resp.json(), headers=response_headers, ) + return Response( + status_code=resp.status_code, + content=resp.content, + media_type=content_type or "application/octet-stream", + headers=response_headers, + ) + async def http_proxy( self, sandbox_id: str, @@ -948,22 +965,16 @@ def rewrite_location(location: str) -> str: request_headers = filter_headers(headers) request_kwargs: dict = {"content": body} if body else {} - client = httpx.AsyncClient(timeout=httpx.Timeout(None)) - - try: - resp = await client.send( - client.build_request( - method=method, - url=target_url, - headers=request_headers, - timeout=120, - **request_kwargs, - ), - stream=True, - ) - except Exception: - await client.aclose() - raise + resp = await self._proxy_client.send( + self._proxy_client.build_request( + method=method, + url=target_url, + headers=request_headers, + timeout=120, + **request_kwargs, + ), + stream=True, + ) content_type = resp.headers.get("content-type", "") is_sse = "text/event-stream" in content_type @@ -989,7 +1000,6 @@ async def event_stream(): yield chunk finally: await resp.aclose() - await client.aclose() return StreamingResponse( event_stream(), @@ -1016,4 +1026,3 @@ async def event_stream(): ) finally: await resp.aclose() - await client.aclose() diff --git a/rock/utils/worker.py b/rock/utils/worker.py new file mode 100644 index 0000000000..0f013508e2 --- /dev/null +++ b/rock/utils/worker.py @@ -0,0 +1,36 @@ +"""Pure helpers for multi-worker proxy sizing. No I/O, fully unit-testable.""" + +from __future__ import annotations + +MIN_POOL_SIZE = 2 + +# Envs that fall back to fakeredis / in-memory sqlite (per-process state). +# Multi-worker there would unshare sandbox state across workers, so force 1. +SINGLE_WORKER_ENVS = frozenset({"local", "test", "dev"}) + + +def resolve_workers(role: str, override: int | None, env_workers: int, env: str | None = None) -> int: + """Resolve uvicorn worker count. + + admin role is always single-process (owns scheduler/Ray singletons). + local/test/dev are always single-process (fakeredis/in-memory state is + per-process; multi-worker would unshare it) — this overrides override/env. + proxy role otherwise: explicit override > env > 1. Worker count must be set + explicitly (via --workers or ROCK_PROXY_WORKERS); no cpu_count auto-detect. + """ + if role != "proxy": + return 1 + if env in SINGLE_WORKER_ENVS: + return 1 + if override and override > 0: + return override + if env_workers and env_workers > 0: + return env_workers + return 1 + + +def compute_pool_size(base: int, workers: int) -> int: + """Per-worker DB pool so that workers * pool ~= base (no per-pod regression).""" + if workers <= 1: + return base + return max(MIN_POOL_SIZE, base // workers) diff --git a/tests/unit/admin/test_create_app_routes.py b/tests/unit/admin/test_create_app_routes.py new file mode 100644 index 0000000000..cbbbb75ddf --- /dev/null +++ b/tests/unit/admin/test_create_app_routes.py @@ -0,0 +1,28 @@ +from fastapi import FastAPI + +from rock.admin.main import _include_routers + + +def _paths(app: FastAPI) -> set[str]: + return {getattr(r, "path", "") for r in app.routes} + + +def test_proxy_role_mounts_proxy_router(): + app = FastAPI() + _include_routers(app, role="proxy") + paths = _paths(app) + assert any(p.endswith("/get_token") for p in paths) + + +def test_proxy_role_excludes_admin_router(): + app = FastAPI() + _include_routers(app, role="proxy") + paths = _paths(app) + assert not any("/ops" in p for p in paths) + + +def test_admin_role_mounts_admin_routers(): + app = FastAPI() + _include_routers(app, role="admin") + paths = _paths(app) + assert any("/ops" in p for p in paths) diff --git a/tests/unit/admin/test_db_provider_pool.py b/tests/unit/admin/test_db_provider_pool.py new file mode 100644 index 0000000000..e233c6ff4f --- /dev/null +++ b/tests/unit/admin/test_db_provider_pool.py @@ -0,0 +1,30 @@ +import pytest + +from rock.admin.core.db_provider import DatabaseProvider +from rock.config import DatabaseConfig + + +def test_database_config_has_pool_size_default(): + cfg = DatabaseConfig(url="") + assert cfg.pool_size == 100 + + +def test_database_config_pool_size_override(): + cfg = DatabaseConfig(url="", pool_size=10) + assert cfg.pool_size == 10 + + +@pytest.mark.asyncio +async def test_engine_uses_configured_pool_size_for_postgres(monkeypatch): + captured = {} + + def fake_create_async_engine(url, **kwargs): + captured.update(kwargs) + captured["url"] = url + return object() + + monkeypatch.setattr("rock.admin.core.db_provider.create_async_engine", fake_create_async_engine) + provider = DatabaseProvider(db_config=DatabaseConfig(url="postgresql://u:p@h:5432/db", pool_size=7)) + await provider.init() + assert captured["pool_size"] == 7 + assert captured["max_overflow"] == 0 diff --git a/tests/unit/sandbox/test_proxy_enhancements.py b/tests/unit/sandbox/test_proxy_enhancements.py index b1e92bae5d..f43021d860 100644 --- a/tests/unit/sandbox/test_proxy_enhancements.py +++ b/tests/unit/sandbox/test_proxy_enhancements.py @@ -4,6 +4,7 @@ 3. batch_get_sandbox_status legacy-states filtering """ +import contextlib from unittest.mock import AsyncMock, MagicMock, patch import pytest @@ -334,7 +335,8 @@ async def __aexit__(self, *args): with patch("rock.sandbox.service.sandbox_proxy_service.ServiceStatus") as MockSS: MockSS.from_dict.return_value = mock_status - with patch("rock.sandbox.service.sandbox_proxy_service.httpx.AsyncClient", return_value=FakeClient()): + service._proxy_client = FakeClient() + with contextlib.nullcontext(): await SandboxProxyService.http_proxy( service, sandbox_id="sb1", @@ -386,7 +388,8 @@ async def __aexit__(self, *args): with patch("rock.sandbox.service.sandbox_proxy_service.ServiceStatus") as MockSS: MockSS.from_dict.return_value = mock_status - with patch("rock.sandbox.service.sandbox_proxy_service.httpx.AsyncClient", return_value=FakeClient()): + service._proxy_client = FakeClient() + with contextlib.nullcontext(): await SandboxProxyService.http_proxy( service, sandbox_id="sb1", @@ -437,7 +440,8 @@ async def __aexit__(self, *args): with patch("rock.sandbox.service.sandbox_proxy_service.ServiceStatus") as MockSS: MockSS.from_dict.return_value = mock_status - with patch("rock.sandbox.service.sandbox_proxy_service.httpx.AsyncClient", return_value=FakeClient()): + service._proxy_client = FakeClient() + with contextlib.nullcontext(): await SandboxProxyService.http_proxy( service, sandbox_id="sb1", @@ -492,7 +496,8 @@ async def __aexit__(self, *args): with patch("rock.sandbox.service.sandbox_proxy_service.ServiceStatus") as MockSS: MockSS.from_dict.return_value = mock_status - with patch("rock.sandbox.service.sandbox_proxy_service.httpx.AsyncClient", return_value=FakeClient()): + service._proxy_client = FakeClient() + with contextlib.nullcontext(): await SandboxProxyService.http_proxy( service, sandbox_id="sb1", @@ -550,7 +555,8 @@ async def test_relative_location_is_rewritten(self): with patch("rock.sandbox.service.sandbox_proxy_service.ServiceStatus") as MockSS: MockSS.from_dict.return_value = mock_status - with patch("rock.sandbox.service.sandbox_proxy_service.httpx.AsyncClient", return_value=FakeClient()): + service._proxy_client = FakeClient() + with contextlib.nullcontext(): resp = await SandboxProxyService.http_proxy( service, sandbox_id="sb1", @@ -573,7 +579,8 @@ async def test_absolute_upstream_location_is_stripped_to_path(self): with patch("rock.sandbox.service.sandbox_proxy_service.ServiceStatus") as MockSS: MockSS.from_dict.return_value = mock_status - with patch("rock.sandbox.service.sandbox_proxy_service.httpx.AsyncClient", return_value=FakeClient()): + service._proxy_client = FakeClient() + with contextlib.nullcontext(): resp = await SandboxProxyService.http_proxy( service, sandbox_id="sb1", @@ -619,7 +626,8 @@ async def aclose(self): with patch("rock.sandbox.service.sandbox_proxy_service.ServiceStatus") as MockSS: MockSS.from_dict.return_value = mock_status - with patch("rock.sandbox.service.sandbox_proxy_service.httpx.AsyncClient", return_value=FakeClient()): + service._proxy_client = FakeClient() + with contextlib.nullcontext(): resp = await SandboxProxyService.http_proxy( service, sandbox_id="sb1", @@ -642,7 +650,8 @@ async def test_proxy_prefix_none_location_unchanged(self): with patch("rock.sandbox.service.sandbox_proxy_service.ServiceStatus") as MockSS: MockSS.from_dict.return_value = mock_status - with patch("rock.sandbox.service.sandbox_proxy_service.httpx.AsyncClient", return_value=FakeClient()): + service._proxy_client = FakeClient() + with contextlib.nullcontext(): resp = await SandboxProxyService.http_proxy( service, sandbox_id="sb1", @@ -704,7 +713,8 @@ async def aclose(self): with patch("rock.sandbox.service.sandbox_proxy_service.ServiceStatus") as MockSS: MockSS.from_dict.return_value = mock_status - with patch("rock.sandbox.service.sandbox_proxy_service.httpx.AsyncClient", return_value=FakeClient()): + service._proxy_client = FakeClient() + with contextlib.nullcontext(): resp = await SandboxProxyService.http_proxy( service, sandbox_id="sb1", @@ -759,7 +769,8 @@ async def aclose(self): with patch("rock.sandbox.service.sandbox_proxy_service.ServiceStatus") as MockSS: MockSS.from_dict.return_value = mock_status - with patch("rock.sandbox.service.sandbox_proxy_service.httpx.AsyncClient", return_value=FakeClient()): + service._proxy_client = FakeClient() + with contextlib.nullcontext(): from starlette.datastructures import Headers await SandboxProxyService.http_proxy( @@ -813,7 +824,8 @@ async def aclose(self): with patch("rock.sandbox.service.sandbox_proxy_service.ServiceStatus") as MockSS: MockSS.from_dict.return_value = mock_status - with patch("rock.sandbox.service.sandbox_proxy_service.httpx.AsyncClient", return_value=FakeClient()): + service._proxy_client = FakeClient() + with contextlib.nullcontext(): from starlette.datastructures import Headers await SandboxProxyService.http_proxy( diff --git a/tests/unit/sandbox/test_proxy_httpx_reuse.py b/tests/unit/sandbox/test_proxy_httpx_reuse.py new file mode 100644 index 0000000000..d3fabba9d4 --- /dev/null +++ b/tests/unit/sandbox/test_proxy_httpx_reuse.py @@ -0,0 +1,78 @@ +import os +from unittest.mock import AsyncMock + +import httpx +import pytest +from starlette.datastructures import Headers + +from rock.deployments.constants import Port + + +@pytest.mark.asyncio +async def test_service_has_two_distinct_httpx_clients(sandbox_proxy_service): + svc = sandbox_proxy_service + assert isinstance(svc._rpc_client, httpx.AsyncClient) + assert isinstance(svc._proxy_client, httpx.AsyncClient) + assert svc._rpc_client is not svc._proxy_client + + +@pytest.mark.asyncio +async def test_metrics_monitor_has_worker_pid_tag(sandbox_proxy_service): + tags = sandbox_proxy_service.metrics_monitor.user_defined_tags + assert tags.get("worker_pid") == str(os.getpid()) + + +@pytest.mark.asyncio +async def test_aclose_closes_both_clients(sandbox_proxy_service): + svc = sandbox_proxy_service + await svc.aclose() + assert svc._rpc_client.is_closed + assert svc._proxy_client.is_closed + + +def _fake_status(): + return {"host_ip": "1.2.3.4", "port_mapping": {Port.SERVER.value: 18080}} + + +@pytest.mark.asyncio +async def test_http_proxy_reuses_proxy_client_without_closing(sandbox_proxy_service, monkeypatch): + svc = sandbox_proxy_service + + def handler(request: httpx.Request) -> httpx.Response: + return httpx.Response(200, json={"ok": True}) + + svc._proxy_client = httpx.AsyncClient(transport=httpx.MockTransport(handler)) + monkeypatch.setattr(svc, "_update_expire_time", AsyncMock()) + monkeypatch.setattr(svc, "get_service_status", AsyncMock(return_value=[_fake_status()])) + + resp = await svc.http_proxy( + sandbox_id="sb1", + target_path="hello", + body=None, + headers=Headers({}), + method="GET", + ) + + assert resp.status_code == 200 + # shared client must remain open for reuse + assert not svc._proxy_client.is_closed + + +@pytest.mark.asyncio +async def test_host_proxy_reuses_proxy_client_without_closing(sandbox_proxy_service): + svc = sandbox_proxy_service + + def handler(request: httpx.Request) -> httpx.Response: + return httpx.Response(200, json={"pong": True}) + + svc._proxy_client = httpx.AsyncClient(transport=httpx.MockTransport(handler)) + + resp = await svc.host_proxy( + host_ip="10.0.0.1", + target_path="ping", + body={"a": 1}, + headers=Headers({}), + ) + + assert resp.status_code == 200 + assert not svc._proxy_client.is_closed diff --git a/tests/unit/test_logger_multiprocess.py b/tests/unit/test_logger_multiprocess.py new file mode 100644 index 0000000000..579f35bf1a --- /dev/null +++ b/tests/unit/test_logger_multiprocess.py @@ -0,0 +1,54 @@ +"""Multi-worker logging: handler append mode + one-time deploy truncation.""" + +import importlib + +import pytest + +import rock.logger as rock_logger +from rock import env_vars + + +@pytest.fixture(autouse=True) +def _clear_handler_cache(): + rock_logger.init_file_handler.cache_clear() + yield + rock_logger.init_file_handler.cache_clear() + + +def test_rock_logging_append_defaults_false(monkeypatch): + monkeypatch.delenv("ROCK_LOGGING_APPEND", raising=False) + import rock.env_vars as ev + + importlib.reload(ev) + assert ev.ROCK_LOGGING_APPEND is False + + +def test_file_handler_defaults_to_truncate_mode(tmp_path, monkeypatch): + monkeypatch.setattr(env_vars, "ROCK_LOGGING_PATH", str(tmp_path), raising=False) + monkeypatch.setattr(env_vars, "ROCK_LOGGING_APPEND", False, raising=False) + handler = rock_logger.init_file_handler("trunc_default.log") + assert handler is not None + assert handler.mode == "w+" + + +def test_file_handler_append_when_env_set(tmp_path, monkeypatch): + monkeypatch.setattr(env_vars, "ROCK_LOGGING_PATH", str(tmp_path), raising=False) + monkeypatch.setattr(env_vars, "ROCK_LOGGING_APPEND", True, raising=False) + handler = rock_logger.init_file_handler("append_mode.log") + assert handler is not None + assert handler.mode == "a" + + +def test_reset_log_file_truncates_existing(tmp_path, monkeypatch): + monkeypatch.setattr(env_vars, "ROCK_LOGGING_PATH", str(tmp_path), raising=False) + monkeypatch.setattr(env_vars, "ROCK_LOGGING_FILE_NAME", "deploy.log", raising=False) + p = tmp_path / "deploy.log" + p.write_text("stale lines from previous deploy\n") + rock_logger.reset_log_file() + assert p.read_text() == "" + + +def test_reset_log_file_noop_without_path(monkeypatch): + monkeypatch.setattr(env_vars, "ROCK_LOGGING_PATH", None, raising=False) + # must not raise when file logging is disabled (stdout mode) + rock_logger.reset_log_file() diff --git a/tests/unit/utils/test_worker.py b/tests/unit/utils/test_worker.py new file mode 100644 index 0000000000..8af5bb0cda --- /dev/null +++ b/tests/unit/utils/test_worker.py @@ -0,0 +1,58 @@ +import importlib + +from rock.utils.worker import compute_pool_size, resolve_workers + + +def test_rock_proxy_workers_defaults_to_zero(monkeypatch): + monkeypatch.delenv("ROCK_PROXY_WORKERS", raising=False) + import rock.env_vars as env_vars + + importlib.reload(env_vars) + assert env_vars.ROCK_PROXY_WORKERS == 0 + + +def test_rock_proxy_workers_reads_env(monkeypatch): + monkeypatch.setenv("ROCK_PROXY_WORKERS", "6") + import rock.env_vars as env_vars + + importlib.reload(env_vars) + assert env_vars.ROCK_PROXY_WORKERS == 6 + + +def test_resolve_workers_admin_always_one(): + assert resolve_workers("admin", override=None, env_workers=8, env="prod") == 1 + assert resolve_workers("admin", override=10, env_workers=8, env="prod") == 1 + + +def test_resolve_workers_proxy_override_wins(): + assert resolve_workers("proxy", override=4, env_workers=8, env="prod") == 4 + + +def test_resolve_workers_proxy_env_when_no_override(): + assert resolve_workers("proxy", override=None, env_workers=8, env="prod") == 8 + + +def test_resolve_workers_proxy_defaults_to_one_when_unset(): + assert resolve_workers("proxy", override=None, env_workers=0, env="prod") == 1 + + +def test_resolve_workers_local_envs_force_one_over_override(): + for env in ("local", "test", "dev"): + assert resolve_workers("proxy", override=8, env_workers=8, env=env) == 1 + + +def test_resolve_workers_unknown_env_defaults_param_none(): + # env defaults to None (treated as non-local) — preserves explicit override + assert resolve_workers("proxy", override=4, env_workers=0) == 4 + + +def test_compute_pool_size_divides_by_workers(): + assert compute_pool_size(base=100, workers=8) == 12 + + +def test_compute_pool_size_floor_two(): + assert compute_pool_size(base=100, workers=200) == 2 + + +def test_compute_pool_size_single_worker_keeps_base(): + assert compute_pool_size(base=100, workers=1) == 100