From f6b2000feb896e458555a2f2be89598cba872024 Mon Sep 17 00:00:00 2001 From: Roy Lin Date: Sun, 28 Jun 2026 15:25:50 +0800 Subject: [PATCH 1/5] feat(sdk): parallel pipeline fan-out + typed JSON report with metrics Base::run_parallel runs steps as concurrent copy-on-write MicroVM forks (bounded, collect-all, input-ordered) and returns a dependency-free JSON Report. StepResult gains separated stdout/stderr, duration_ms, and metrics parsed from `::metric =` guest-stdout lines (a scoring channel for matrix/selection workloads). Steps now take &self (atomic fork counter), so fan-out no longer needs hand-rolled threads. RAII removes each fork on every path and the base snapshot on Drop (--force). Box/snapshot names carry per-process+instance entropy to prevent cross-pipeline name collisions; output is capped to bound report memory under large fan-out; infra failures use a distinct sentinel. 15 unit tests + doctest, clippy clean; validated end-to-end on a real /dev/kvm host (real boot, CoW fork-per-step, metrics, JSON report, 0 leaks). --- CHANGELOG.md | 20 ++ src/sdk/README.md | 36 ++- src/sdk/examples/pipeline.rs | 37 ++- src/sdk/src/pipeline.rs | 548 +++++++++++++++++++++++++++++++---- 4 files changed, 561 insertions(+), 80 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index e9aa4986..69df4235 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,26 @@ All notable changes to A3S Box will be documented in this file. ## [Unreleased] +### Added + +- **Programmable-CI pipeline: parallel fan-out + typed JSON report (`a3s-box-sdk`).** + `Base::run_parallel(steps, max_concurrency)` runs steps concurrently as isolated + copy-on-write MicroVM forks (bounded, collect-all, results in input order) and returns a + `Report` with a dependency-free `to_json()`. `StepResult` now carries separated + `stdout`/`stderr`, `duration_ms`, and `metrics` parsed from `::metric =` + guest-stdout lines (a machine-readable scoring channel for matrix/selection workloads). + Steps run via `&self` (atomic fork counter), so fan-out no longer needs hand-rolled + threads. The base auto-removes its snapshot on `Drop` (`--force`), and each fork is + removed on every path (including panic). Box/snapshot names now carry per-process + + per-instance entropy, so concurrent pipelines from the same image+setup can no longer + collide and tear down each other's boxes. Validated end-to-end on a real `/dev/kvm` host. + +### Changed + +- **`StepResult.logs` is replaced by separated `stdout` / `stderr` fields** + (use `StepResult::combined()` for the old concatenated view). Breaking for + direct `.logs` field access on the `a3s-box-sdk` pipeline API. + ## [2.6.0] — 2026-06-26 ### Added diff --git a/src/sdk/README.md b/src/sdk/README.md index 1200906e..7a8c5480 100644 --- a/src/sdk/README.md +++ b/src/sdk/README.md @@ -11,28 +11,48 @@ state). Set `A3S_BOX` if `a3s-box` is not on `PATH`. ## Pipelines -Warm a base box **once** (clone + install deps), snapshot it, fork per step: +Warm a base box **once** (clone + install deps), snapshot it, fork per step. +Run steps **sequentially** (fail-fast) or **in parallel** (collect-all → a typed +report): ```rust use a3s_box_sdk::pipeline::{warm_base, WarmBase, FileCache, Step}; fn main() -> Result<(), a3s_box_sdk::pipeline::PipelineError> { let cache = FileCache::new(".ci-cache")?; // skip a step when its inputs are unchanged - let mut base = warm_base( + let base = warm_base( WarmBase::new("node:20", "git clone $REPO /w && cd /w && npm ci") // runs ONCE .env("REPO", "https://github.com/me/app") .cache(&cache), )?; + + // Sequential, fail-fast: a non-zero exit returns Err. base.step(Step::new("lint", "cd /w && npm run lint"))?; - base.step(Step::new("test", "cd /w && npm test"))?; // nonzero exit -> Err (fail-fast) - base.step(Step::new("build", "cd /w && npm run build"))?; - base.dispose(); // drops the snapshot - Ok(()) + + // Parallel, collect-all: each step is an isolated CoW fork; <=4 at a time. + let report = base.run_parallel(vec![ + Step::new("test", "cd /w && npm test"), + Step::new("build", "cd /w && npm run build"), + ], 4); + + println!("{}", report.to_json()); // {"passed":..,"total_ms":..,"steps":[..]} + if !report.passed { /* inspect report.failures() */ } + Ok(()) // `base` drops here -> snapshot auto-removed (or call base.dispose()) } ``` -`Step::allow_failure()` keeps the pipeline going on a non-zero exit; `Step::input(..)` -adds extra cache-key parts. Parallel steps = spawn threads (each `step` is blocking). +- `run_parallel` is the way to use a3s-box's cheap (~ms) CoW fork at scale — a + matrix / evolution-style batch — without hand-rolling threads (every method + takes `&self`). +- A step reports a metric by printing `::metric =` to stdout; it + surfaces as `StepResult::metrics` (the scoring channel for a selection loop). +- `StepResult` carries separated `stdout`/`stderr`, `duration_ms`, and `cached`; + `Report::to_json()` is the machine-readable handoff to an agent/scorer. +- `Step::allow_failure()` keeps a non-zero step from failing the run; `Step::input(..)` + adds extra cache-key parts. + +The base **auto-disposes** its snapshot on drop, and each per-step box is removed +on every path (including a panic), so a long-running batch doesn't leak. ## Why forking is cheap diff --git a/src/sdk/examples/pipeline.rs b/src/sdk/examples/pipeline.rs index b26a5aa7..a79e4432 100644 --- a/src/sdk/examples/pipeline.rs +++ b/src/sdk/examples/pipeline.rs @@ -12,25 +12,36 @@ fn main() -> Result<(), a3s_box_sdk::pipeline::PipelineError> { let cache = FileCache::new("/tmp/.a3s-ci-demo")?; // Warm the base once (here: write a marker = "deps installed"), snapshot it. - let mut base = warm_base(WarmBase::new(image, "echo DEPS-INSTALLED > /warmed").cache(&cache))?; + let base = warm_base(WarmBase::new(image, "echo DEPS-INSTALLED > /warmed").cache(&cache))?; + // Sequential, fail-fast: a non-zero exit returns Err. let r = base.step(Step::new("read", "cat /warmed"))?; - println!( - "read -> code={} out={:?} cached={}", - r.exit_code, - r.logs.trim(), - r.cached - ); - + println!("read -> code={} out={:?}", r.exit_code, r.stdout.trim()); let r2 = base.step(Step::new("read", "cat /warmed"))?; // identical -> cache hit println!("read#2 -> cached={}", r2.cached); - match base.step(Step::new("fail", "exit 7")) { - Err(e) => println!("fail-fast ok: {e}"), - Ok(_) => println!("ERROR: fail step did not error"), - } + // Parallel matrix, collect-all: each step is an isolated CoW fork of the base. + // A step reports a metric by printing `::metric key=value`. + let report = base.run_parallel( + vec![ + Step::new("ok", "echo fine"), + Step::new("perf", "echo '::metric duration_ms=12.5'"), + Step::new("fail", "exit 7"), // collected, not fatal + ], + 4, + ); + println!("report -> {}", report.to_json()); + println!( + "passed={} failures={:?}", + report.passed, + report + .failures() + .iter() + .map(|s| &s.name) + .collect::>() + ); - base.dispose(); + // base.dispose() is optional — the snapshot is removed when `base` drops. println!("demo ok"); Ok(()) } diff --git a/src/sdk/src/pipeline.rs b/src/sdk/src/pipeline.rs index 166c3d33..bc33cc2f 100644 --- a/src/sdk/src/pipeline.rs +++ b/src/sdk/src/pipeline.rs @@ -10,8 +10,14 @@ //! Model: warm a base box once (clone + install deps), `snapshot` it, then fork //! the snapshot per step. With a3s-box's copy-on-write restore (overlay lower) //! each fork shares the snapshot's pristine rootfs read-only and writes to its -//! own upper — near-instant, space-cheap, isolated. The DAG is your code: -//! sequence with plain calls, fan out with threads. No YAML, no engine. +//! own upper — near-instant, space-cheap, isolated. Steps are *independent* +//! forks of the same base, so they run sequentially with [`Base::step`] +//! (fail-fast) or concurrently with [`Base::run_parallel`] (collect-all), which +//! returns a typed, JSON-serializable [`Report`]. The base auto-disposes its +//! snapshot on drop; per-step boxes are removed on every path (incl. panic). +//! +//! A step reports metrics by printing `::metric =` lines to stdout; +//! they surface as [`StepResult::metrics`] for scoring (e.g. an evolution loop). //! //! Set `A3S_BOX` to the CLI path if `a3s-box` is not on `PATH`. //! @@ -20,22 +26,57 @@ //! //! # fn main() -> Result<(), a3s_box_sdk::pipeline::PipelineError> { //! let cache = FileCache::new(".ci-cache")?; -//! let mut base = warm_base( +//! let base = warm_base( //! WarmBase::new("node:20", "git clone $REPO /w && cd /w && npm ci") //! .env("REPO", "https://github.com/me/app") //! .cache(&cache), //! )?; -//! base.step(Step::new("lint", "cd /w && npm run lint"))?; -//! base.step(Step::new("test", "cd /w && npm test"))?; // nonzero exit -> Err (fail-fast) -//! base.step(Step::new("build", "cd /w && npm run build"))?; -//! base.dispose(); -//! # Ok(()) } +//! // Fan out in parallel (each step is an isolated CoW fork of the base), +//! // bounded to 4 at a time; collect a typed report. +//! let report = base.run_parallel( +//! vec![ +//! Step::new("lint", "cd /w && npm run lint"), +//! Step::new("test", "cd /w && npm test"), +//! Step::new("build", "cd /w && npm run build"), +//! ], +//! 4, +//! ); +//! println!("{}", report.to_json()); // {"passed":true,"total_ms":...,"steps":[...]} +//! assert!(report.passed); +//! # Ok(()) } // base drops here -> snapshot auto-removed //! ``` -use std::collections::BTreeMap; +use std::collections::{BTreeMap, VecDeque}; use std::fmt; use std::path::PathBuf; use std::process::{Command, Output}; +use std::sync::atomic::{AtomicBool, AtomicU32, Ordering}; +use std::sync::Mutex; +use std::time::Instant; + +/// Default per-step cap on captured stdout/stderr — bounds in-memory [`Report`] +/// size when fanning hundreds of forks out. Override via [`WarmBase::max_output`]. +const DEFAULT_MAX_OUTPUT: usize = 1 << 20; // 1 MiB + +/// `exec_step` returns this exit code when the step never ran because of an +/// *infrastructure* failure (restore/start/exec), distinct from any real exit +/// code (a host process killed by a signal surfaces as -1, a guest `exit -1` as +/// 255), so a scorer can tell "never ran" from "ran and failed". +pub const INFRA_FAILURE: i32 = i32::MIN; + +/// Per-process instance seed. Folding it (plus the pid) into box/snapshot NAMES +/// gives two Bases from the same image+setup — or two processes on one host — +/// DISJOINT names. The CLI enforces name uniqueness, and a shared name would let +/// one Base's cleanup tear down another's live box (cross-talk → false CI). +static INSTANCE_SEQ: AtomicU32 = AtomicU32::new(0); + +fn instance_token() -> String { + format!( + "{}-{}", + std::process::id(), + INSTANCE_SEQ.fetch_add(1, Ordering::Relaxed) + ) +} fn box_bin() -> String { std::env::var("A3S_BOX").unwrap_or_else(|_| "a3s-box".to_string()) @@ -109,6 +150,15 @@ fn box_cleanup(args: &[&str]) { let _ = Command::new(box_bin()).args(args).output(); } +/// RAII: removes a per-step box on every exit path (normal return, `?`, panic), +/// so a step that fails after `start`/before exec never leaks a box. +struct BoxGuard(String); +impl Drop for BoxGuard { + fn drop(&mut self) { + box_cleanup(&["rm", "-f", &self.0]); + } +} + /// Content-addressed cache key: FNV-1a over NUL-delimited parts (stable, dep-free). fn key(parts: &[&str]) -> String { let mut h: u64 = 0xcbf2_9ce4_8422_2325; @@ -147,6 +197,69 @@ fn slug(s: &str) -> String { .collect() } +/// Truncate output to `cap` bytes on a UTF-8 boundary, appending a marker, so one +/// chatty/looping step can't balloon an in-memory [`Report`] during large fan-out. +fn cap_output(mut s: String, cap: usize) -> String { + if s.len() <= cap { + return s; + } + let mut end = cap; + while end > 0 && !s.is_char_boundary(end) { + end -= 1; + } + let dropped = s.len() - end; + s.truncate(end); + s.push_str(&format!("\n[..truncated {dropped} bytes]")); + s +} + +/// Parse `::metric =` lines from a step's stdout into a metric map. +/// Non-numeric and non-finite (NaN/inf) values are ignored; this is the scoring +/// channel for callers (e.g. an evolution loop that selects on `perf_ms`/`tests`). +fn parse_metrics(stdout: &str) -> BTreeMap { + let mut m = BTreeMap::new(); + for line in stdout.lines() { + if let Some(rest) = line.trim().strip_prefix("::metric ") { + if let Some((k, v)) = rest.split_once('=') { + if let Ok(val) = v.trim().parse::() { + if val.is_finite() { + m.insert(k.trim().to_string(), val); + } + } + } + } + } + m +} + +/// Minimal, correct JSON string encoder (dep-free): quotes + escapes. +fn json_str(s: &str) -> String { + let mut o = String::with_capacity(s.len() + 2); + o.push('"'); + for c in s.chars() { + match c { + '"' => o.push_str("\\\""), + '\\' => o.push_str("\\\\"), + '\n' => o.push_str("\\n"), + '\r' => o.push_str("\\r"), + '\t' => o.push_str("\\t"), + c if (c as u32) < 0x20 => o.push_str(&format!("\\u{:04x}", c as u32)), + c => o.push(c), + } + } + o.push('"'); + o +} + +/// A finite JSON number, or `null` for NaN/±inf (which are not valid JSON). +fn json_num(v: f64) -> String { + if v.is_finite() { + format!("{v}") + } else { + "null".to_string() + } +} + /// Poll `exec -- true` until the box is ready (box has no "wait-ready" verb). fn wait_ready(box_name: &str) -> Result<()> { // ~30s total budget. A box is usually ready in ~100-300ms, so poll fast first @@ -205,6 +318,7 @@ pub struct WarmBase<'a> { setup: String, env: BTreeMap, cache: Option<&'a FileCache>, + max_output: usize, } impl<'a> WarmBase<'a> { @@ -214,6 +328,7 @@ impl<'a> WarmBase<'a> { setup: setup.into(), env: BTreeMap::new(), cache: None, + max_output: DEFAULT_MAX_OUTPUT, } } pub fn env(mut self, k: impl Into, v: impl Into) -> Self { @@ -224,6 +339,11 @@ impl<'a> WarmBase<'a> { self.cache = Some(cache); self } + /// Cap captured stdout/stderr per step (default 1 MiB) to bound report memory. + pub fn max_output(mut self, bytes: usize) -> Self { + self.max_output = bytes; + self + } } /// A pipeline step: a command run in its own kernel forked from the base. @@ -261,46 +381,128 @@ impl Step { } } -/// Outcome of a step. +/// Outcome of a step. `stdout`/`stderr` are separated (and capped, see +/// [`WarmBase::max_output`]); `metrics` are parsed from `::metric k=v` stdout +/// lines; `duration_ms` is the command's wall-clock time. An infrastructure +/// failure surfaces as `exit_code == `[`INFRA_FAILURE`]. #[derive(Debug, Clone)] pub struct StepResult { pub name: String, pub exit_code: i32, - pub logs: String, + pub stdout: String, + pub stderr: String, pub cached: bool, + pub allow_failure: bool, + pub duration_ms: u128, + pub metrics: BTreeMap, +} + +impl StepResult { + /// A passing step: exit 0, or a non-zero step that opted into `allow_failure`. + pub fn ok(&self) -> bool { + self.exit_code == 0 || self.allow_failure + } + /// stdout followed by stderr (the legacy combined log view). + pub fn combined(&self) -> String { + let mut s = self.stdout.clone(); + s.push_str(&self.stderr); + s + } + /// Serialize to a JSON object (dep-free). + pub fn to_json(&self) -> String { + let mut metrics = String::from("{"); + for (i, (k, v)) in self.metrics.iter().enumerate() { + if i > 0 { + metrics.push(','); + } + metrics.push_str(&json_str(k)); + metrics.push(':'); + metrics.push_str(&json_num(*v)); + } + metrics.push('}'); + format!( + "{{\"name\":{},\"exit_code\":{},\"cached\":{},\"allow_failure\":{},\"duration_ms\":{},\"metrics\":{},\"stdout\":{},\"stderr\":{}}}", + json_str(&self.name), + self.exit_code, + self.cached, + self.allow_failure, + self.duration_ms, + metrics, + json_str(&self.stdout), + json_str(&self.stderr), + ) + } +} + +/// Aggregate outcome of a [`Base::run_parallel`] batch: per-step results in input +/// order, total wall-clock, and a `passed` gate (all steps [`StepResult::ok`]). +#[derive(Debug, Clone)] +pub struct Report { + pub steps: Vec, + pub total_ms: u128, + pub passed: bool, +} + +impl Report { + /// The steps that did not pass (non-zero exit without `allow_failure`). + pub fn failures(&self) -> Vec<&StepResult> { + self.steps.iter().filter(|r| !r.ok()).collect() + } + /// Serialize the whole report to a JSON object (dep-free) for an agent/scorer. + pub fn to_json(&self) -> String { + let steps: Vec = self.steps.iter().map(|s| s.to_json()).collect(); + format!( + "{{\"passed\":{},\"total_ms\":{},\"steps\":[{}]}}", + self.passed, + self.total_ms, + steps.join(",") + ) + } } -/// A warmed, forkable base — a snapshot plus an optional step cache. +/// A warmed, forkable base — a snapshot plus an optional step cache. Methods take +/// `&self` (the fork counter is atomic), so steps can run concurrently across +/// threads. The snapshot is removed on drop (or eagerly via [`Base::dispose`]). pub struct Base<'a> { snapshot_name: String, snapshot_id: String, cache: Option<&'a FileCache>, - n: u32, + max_output: usize, + n: AtomicU32, + disposed: AtomicBool, } impl Base<'_> { - /// Run one step in its own kernel forked from the warmed base. A non-zero exit - /// returns `Err(StepFailed)` (fail-fast) unless `Step::allow_failure` was set. - pub fn step(&mut self, step: Step) -> Result { + /// Fork the base, run one step in its own kernel, return its [`StepResult`]. + /// `Err` only on an infrastructure failure (restore/start/exec); a non-zero + /// *step* exit is returned as a `StepResult` (caller decides what it means). + fn exec_step(&self, step: &Step) -> Result { let mut parts: Vec<&str> = vec![&step.name, &step.cmd]; parts.extend(step.inputs.iter().map(|s| s.as_str())); let k = key(&parts); if let Some(c) = self.cache { if c.has(&k) { return Ok(StepResult { - name: step.name, + name: step.name.clone(), exit_code: 0, - logs: String::new(), + stdout: String::new(), + stderr: String::new(), cached: true, + allow_failure: step.allow_failure, + duration_ms: 0, + metrics: BTreeMap::new(), }); } } - self.n += 1; - let box_name = format!("{}-job{}-{}", self.snapshot_name, self.n, slug(&step.name)); - box_cleanup(&["rm", "-f", &box_name]); // idempotent: clear a crashed prior run + // Names are globally unique (snapshot_name carries pid+instance entropy, + // `i` is per-fork), so no cross-process pre-clean is needed; the guard + // removes the fork on every exit path. + let i = self.n.fetch_add(1, Ordering::Relaxed) + 1; + let box_name = format!("{}-job{}-{}", self.snapshot_name, i, slug(&step.name)); + let _guard = BoxGuard(box_name.clone()); - // Fork = restore + start. Since a3s-box's CoW restore, this mounts the + // Fork = restore + start. Thanks to a3s-box's CoW restore, this mounts the // snapshot's rootfs as a read-only overlay lower with a fresh per-box upper. box_run(&[ "snapshot", @@ -313,38 +515,123 @@ impl Base<'_> { wait_ready(&box_name)?; let full = with_env(&step.cmd, &step.env); + let t = Instant::now(); let out = Command::new(box_bin()) .args(["exec", &box_name, "--", "sh", "-c", &full]) .output()?; - box_cleanup(&["rm", "-f", &box_name]); + let duration_ms = t.elapsed().as_millis(); let code = out.status.code().unwrap_or(-1); - let mut logs = String::from_utf8_lossy(&out.stdout).into_owned(); - logs.push_str(&String::from_utf8_lossy(&out.stderr)); + let stdout = String::from_utf8_lossy(&out.stdout).into_owned(); + let stderr = String::from_utf8_lossy(&out.stderr).into_owned(); + let metrics = parse_metrics(&stdout); // parse BEFORE truncating if code == 0 { if let Some(c) = self.cache { c.put(&k); } } - if code != 0 && !step.allow_failure { - return Err(PipelineError::StepFailed { - name: step.name, - code, - logs, - }); - } Ok(StepResult { - name: step.name, + name: step.name.clone(), exit_code: code, - logs, + stdout: cap_output(stdout, self.max_output), + stderr: cap_output(stderr, self.max_output), cached: false, + allow_failure: step.allow_failure, + duration_ms, + metrics, }) } - /// Remove the snapshot (by ID — `snapshot rm` does not accept names). + /// Run one step, **fail-fast**: a non-zero exit returns `Err(StepFailed)` + /// unless `Step::allow_failure` was set. Use for a sequential pipeline. + pub fn step(&self, step: Step) -> Result { + let r = self.exec_step(&step)?; + if r.exit_code != 0 && !step.allow_failure { + return Err(PipelineError::StepFailed { + name: r.name.clone(), + code: r.exit_code, + logs: r.combined(), + }); + } + Ok(r) + } + + /// Run one step for a batch: **never** propagates — an infra failure becomes a + /// `StepResult{ exit_code: `[`INFRA_FAILURE`]`, stderr: }` so one bad + /// fork can't abort the batch. + fn run_one(&self, step: Step) -> StepResult { + match self.exec_step(&step) { + Ok(r) => r, + Err(e) => StepResult { + name: step.name.clone(), + exit_code: INFRA_FAILURE, + stdout: String::new(), + stderr: e.to_string(), + cached: false, + allow_failure: step.allow_failure, + duration_ms: 0, + metrics: BTreeMap::new(), + }, + } + } + + /// Fan steps out concurrently — each is an isolated CoW fork of the base — + /// bounded to `max_concurrency` at a time. **Collect-all** (not fail-fast): + /// every step runs and is returned in input order. This is the primitive that + /// makes a3s-box's cheap fork (~ms) usable for matrix / evolution-scale CI. + /// Do not call [`Base::dispose`] while a `run_parallel` is in flight. + pub fn run_parallel(&self, steps: Vec, max_concurrency: usize) -> Report { + let start = Instant::now(); + let n = steps.len(); + if n == 0 { + return Report { + steps: Vec::new(), + total_ms: 0, + passed: true, + }; + } + let conc = max_concurrency.max(1).min(n); + let queue: Mutex> = + Mutex::new(steps.into_iter().enumerate().collect()); + let results: Mutex> = Mutex::new(Vec::with_capacity(n)); + + std::thread::scope(|s| { + for _ in 0..conc { + s.spawn(|| loop { + let next = { queue.lock().unwrap().pop_front() }; + let Some((idx, step)) = next else { break }; + let r = self.run_one(step); + results.lock().unwrap().push((idx, r)); + }); + } + }); + + let mut v = results.into_inner().unwrap(); + v.sort_by_key(|(i, _)| *i); + let steps: Vec = v.into_iter().map(|(_, r)| r).collect(); + let passed = steps.iter().all(|r| r.ok()); + Report { + steps, + total_ms: start.elapsed().as_millis(), + passed, + } + } + + /// Eagerly remove the snapshot (by ID, `--force` since forks are CoW lowers). + /// Idempotent with the `Drop` impl, so calling it early is safe. pub fn dispose(&self) { - box_cleanup(&["snapshot", "rm", &self.snapshot_id]); + if !self.disposed.swap(true, Ordering::SeqCst) { + box_cleanup(&["snapshot", "rm", "--force", &self.snapshot_id]); + } + } +} + +impl Drop for Base<'_> { + fn drop(&mut self) { + if !self.disposed.swap(true, Ordering::SeqCst) { + box_cleanup(&["snapshot", "rm", "--force", &self.snapshot_id]); + } } } @@ -374,14 +661,15 @@ fn parse_snapshot_id(ls_output: &str, name: &str) -> Option { /// Warm a base box: run `setup` once, snapshot the result, return a forkable [`Base`]. pub fn warm_base(spec: WarmBase<'_>) -> Result> { - let base_box = format!("ci-base-{}", key(&[&spec.image, &spec.setup])); + // pid+instance entropy: two warms of the same spec get disjoint names, so they + // can't tear down each other's box/snapshot. Hence no shared-name pre-clean. + let base_box = format!( + "ci-base-{}-{}", + key(&[&spec.image, &spec.setup]), + instance_token() + ); let snap = format!("{base_box}-snap"); - box_cleanup(&["rm", "-f", &base_box]); // idempotent rerun - if let Ok(old) = snapshot_id(&snap) { - box_cleanup(&["snapshot", "rm", &old]); - } - box_run(&[ "run", "-d", @@ -400,12 +688,21 @@ pub fn warm_base(spec: WarmBase<'_>) -> Result> { snapshot_id(&snap) })(); box_cleanup(&["rm", "-f", &base_box]); + // If `snapshot create` succeeded but ID resolution failed, no Base is built, + // so Drop never fires — best-effort remove the orphan snapshot. + if prepared.is_err() { + if let Ok(id) = snapshot_id(&snap) { + box_cleanup(&["snapshot", "rm", "--force", &id]); + } + } Ok(Base { snapshot_name: snap, snapshot_id: prepared?, cache: spec.cache, - n: 0, + max_output: spec.max_output, + n: AtomicU32::new(0), + disposed: AtomicBool::new(false), }) } @@ -421,6 +718,11 @@ mod tests { assert_ne!(key(&["ab", "c"]), key(&["a", "bc"])); } + #[test] + fn instance_token_is_unique_per_call() { + assert_ne!(instance_token(), instance_token()); + } + #[test] fn with_env_escapes_and_prefixes() { let mut e = BTreeMap::new(); @@ -437,6 +739,86 @@ mod tests { assert_eq!(slug("a/b c"), "a-b-c"); } + #[test] + fn cap_output_truncates_on_char_boundary() { + assert_eq!(cap_output("hello".into(), 10), "hello"); // under cap: untouched + let capped = cap_output("a".repeat(100), 10); + assert!(capped.starts_with("aaaaaaaaaa")); + assert!(capped.contains("[..truncated 90 bytes]")); + // a multi-byte char ("é" = 2 bytes) must not be split mid-codepoint. + let c = cap_output("é".repeat(10), 5); // floors to a 4-byte boundary + assert!(std::str::from_utf8(c.as_bytes()).is_ok()); + assert!(c.contains("truncated")); + } + + #[test] + fn parse_metrics_extracts_finite_numeric_only() { + let m = parse_metrics( + "noise\n::metric perf_ms=84.2\n ::metric tests=412 \n::metric bad=xyz\n\ + ::metric inf_m=inf\n::metric nan_m=NaN\ntrailing", + ); + assert_eq!(m.get("perf_ms"), Some(&84.2)); + assert_eq!(m.get("tests"), Some(&412.0)); + assert!(!m.contains_key("bad")); // non-numeric ignored + assert!(!m.contains_key("inf_m")); // non-finite ignored + assert!(!m.contains_key("nan_m")); + assert_eq!(m.len(), 2); + } + + #[test] + fn json_str_escapes_quotes_backslash_control() { + assert_eq!(json_str("a\"b\\c\n"), "\"a\\\"b\\\\c\\n\""); + assert_eq!(json_str("\u{0001}"), "\"\\u0001\""); + assert_eq!(json_num(1.5), "1.5"); + assert_eq!(json_num(f64::NAN), "null"); + } + + #[test] + fn report_and_step_json_shape() { + let mut metrics = BTreeMap::new(); + metrics.insert("k".to_string(), 2.0); + let rep = Report { + passed: false, + total_ms: 5, + steps: vec![StepResult { + name: "t".into(), + exit_code: 1, + stdout: "o".into(), + stderr: "e\"x".into(), + cached: false, + allow_failure: false, + duration_ms: 3, + metrics, + }], + }; + let j = rep.to_json(); + assert!(j.contains("\"passed\":false")); + assert!(j.contains("\"total_ms\":5")); + assert!(j.contains("\"name\":\"t\"")); + assert!(j.contains("\"exit_code\":1")); + assert!(j.contains("\"metrics\":{\"k\":2}")); + assert!(j.contains("\"stderr\":\"e\\\"x\"")); // escaped quote in stderr + assert_eq!(rep.failures().len(), 1); + } + + #[test] + fn step_result_ok_respects_allow_failure() { + let mk = |code: i32, allow: bool| StepResult { + name: "s".into(), + exit_code: code, + stdout: String::new(), + stderr: String::new(), + cached: false, + allow_failure: allow, + duration_ms: 0, + metrics: BTreeMap::new(), + }; + assert!(mk(0, false).ok()); + assert!(!mk(1, false).ok()); + assert!(mk(1, true).ok()); // allowed failure still "ok" for the gate + assert!(!mk(INFRA_FAILURE, false).ok()); + } + #[test] fn file_cache_roundtrip() { let dir = std::env::temp_dir().join(format!("a3s-ci-test-{}", key(&["cache"]))); @@ -505,21 +887,26 @@ mod tests { } // A POSIX-sh stub standing in for `a3s-box`: enough to drive warm_base + step - // through their happy paths (run/exec/start/rm succeed, `snapshot create` - // records name->id in a state file that `snapshot ls` echoes, an exec whose - // command contains "boom" exits 7). Lets us cover the orchestration locally, - // without a real box or KVM. + // through their happy paths (run/exec/start/rm succeed; `snapshot create` + // records name->id in a state file that `snapshot ls` echoes; an exec whose + // command contains "boom" exits 7, "emitmetric" prints a `::metric` line; + // a `snapshot restore` whose target name contains "infrafail" exits 1). #[cfg(unix)] const FAKE_BOX: &str = r#"#!/bin/sh state="$(dirname "$0")/.snaps" case "$1" in run|start|rm) exit 0 ;; exec) last=""; for a in "$@"; do last="$a"; done - case "$last" in *boom*) exit 7 ;; *) exit 0 ;; esac ;; + case "$last" in + *emitmetric*) printf '::metric score=9\n'; exit 0 ;; + *boom*) exit 7 ;; + *) exit 0 ;; + esac ;; snapshot) case "$2" in create) name=""; while [ "$#" -gt 0 ]; do [ "$1" = "--name" ] && name="$2"; shift; done printf '%s %s\n' "snap-fake1" "$name" >> "$state"; printf 'snap-fake1\n'; exit 0 ;; + restore) for a in "$@"; do case "$a" in *infrafail*) exit 1 ;; esac; done; exit 0 ;; ls) printf 'ID NAME SRC\n'; cat "$state" 2>/dev/null; exit 0 ;; *) exit 0 ;; esac ;; @@ -543,7 +930,7 @@ esac // Happy path against the stub. std::env::set_var("A3S_BOX", &fake); let cache = FileCache::new(dir.join("cache")).unwrap(); - let mut base = warm_base(WarmBase::new("img", "echo hi").cache(&cache)).expect("warm_base"); + let base = warm_base(WarmBase::new("img", "echo hi").cache(&cache)).expect("warm_base"); let r = base.step(Step::new("build", "make")).expect("step"); assert_eq!(r.exit_code, 0); assert!(!r.cached); @@ -552,23 +939,63 @@ esac base.step(Step::new("fail", "boom")), Err(PipelineError::StepFailed { code: 7, .. }) )); - // box_run surfaces a non-zero exit as Cli (the stub exits 7 on a "boom" exec). - assert!(matches!( - box_run(&["exec", "x", "--", "boom"]), - Err(PipelineError::Cli { code: Some(7), .. }) - )); // allow_failure: a non-zero step returns Ok with the code instead of Err. let allowed = base .step(Step::new("maybe", "boom").allow_failure()) .expect("allow_failure step"); assert_eq!(allowed.exit_code, 7); - assert!(!allowed.cached); - // A second warm_base exercises the stale-snapshot pre-clean path. - let _ = warm_base(WarmBase::new("img", "echo hi").cache(&cache)).expect("warm_base #2"); + + // run_parallel: collect-all, input order preserved, metrics parsed, gate derived. + let rep = base.run_parallel( + vec![ + Step::new("a", "make"), + Step::new("b", "boom"), // exits 7 -> not ok + Step::new("m", "emitmetric"), // prints ::metric score=9 + ], + 2, + ); + assert_eq!(rep.steps.len(), 3); + assert_eq!(rep.steps[0].name, "a"); // order preserved despite concurrency + assert_eq!(rep.steps[1].exit_code, 7); + assert_eq!(rep.steps[2].metrics.get("score"), Some(&9.0)); + assert!(!rep.passed); + assert_eq!(rep.failures().len(), 1); + assert!(rep.to_json().contains("\"score\":9")); + + // an allowed-failure step must NOT fail the batch gate. + let rep2 = base.run_parallel( + vec![ + Step::new("ok", "make"), + Step::new("soft", "boom").allow_failure(), + ], + 2, + ); + assert!(rep2.passed, "allow_failure step must not fail the batch"); + + // an infra failure (restore refuses) surfaces as INFRA_FAILURE, not a panic. + let rep3 = base.run_parallel(vec![Step::new("infrafail", "make")], 1); + assert_eq!(rep3.steps[0].exit_code, INFRA_FAILURE); + assert!(!rep3.steps[0].stderr.is_empty()); + assert!(!rep3.passed); + + // empty batch is trivially passed. + assert!(base.run_parallel(Vec::new(), 4).passed); + + // Two Bases from the SAME spec must get disjoint names (no cross-talk). + let b1 = warm_base(WarmBase::new("u", "echo").cache(&cache)).expect("b1"); + let b2 = warm_base(WarmBase::new("u", "echo").cache(&cache)).expect("b2"); + assert_ne!( + b1.snapshot_name, b2.snapshot_name, + "same spec must yield disjoint snapshot names" + ); + b1.dispose(); + b2.dispose(); + base.dispose(); + base.dispose(); // idempotent: second dispose + Drop must not double-remove // A cache-less base exercises the no-cache branch in step(). - let mut nocache = warm_base(WarmBase::new("img", "echo hi")).expect("warm_base nocache"); + let nocache = warm_base(WarmBase::new("img", "echo hi")).expect("warm_base nocache"); assert!( !nocache .step(Step::new("x", "make")) @@ -598,7 +1025,10 @@ esac // Exercise every builder method (fields are private; chaining covers the bodies). let c = FileCache::new(std::env::temp_dir().join(format!("a3s-ci-bld-{}", key(&["bld"])))) .unwrap(); - let _ = WarmBase::new("img", "setup").env("A", "1").cache(&c); + let _ = WarmBase::new("img", "setup") + .env("A", "1") + .cache(&c) + .max_output(4096); let _ = Step::new("n", "cmd") .input("x") .env("K", "V") From d75b35e5ca199a291ad203ed19c0871064be854c Mon Sep 17 00:00:00 2001 From: Roy Lin Date: Sun, 28 Jun 2026 16:24:12 +0800 Subject: [PATCH 2/5] test(sdk): real-KVM integration + soak suites; add crash-orphan sweep + infra retry Backs the parallel pipeline with real-microVM coverage and hardens it for sustained, highly-concurrent churn: - sweep_orphans(): reclaim ci-base-* boxes/snapshots left by a SIGKILL/OOM'd pipeline process (its RAII never ran), matched by the dead owner pid embedded in the resource name; never touches a live peer's resources. - WarmBase::infra_retries (default 2): retry a fork that hits a TRANSIENT infra failure (restore/start/boot); the step's command never ran, so re-forking is idempotent. Keeps sustained high-concurrency churn green. - tests/integration_kvm.rs (5 #[ignore] tests): warm+fork+exec, cache hit, parallel order/metrics, fork isolation, leak-freeness, sweep crash-recovery. - tests/soak_kvm.rs: sustained fork-eval churn stays leak-free and RSS-stable; leak gates are process-scoped (robust to a concurrent pipeline on the host). - ci.yml: run both under the integration-kvm (real /dev/kvm) gate. Validated on a real KVM host (a3s-box 2.5.1): integration 5/5; soak 1500 fork-evals across 75 generations leak-free, RSS +512 KiB; 0 orphans after. --- .github/workflows/ci.yml | 24 ++++ CHANGELOG.md | 13 +- src/sdk/src/pipeline.rs | 154 ++++++++++++++++++++- src/sdk/tests/integration_kvm.rs | 229 +++++++++++++++++++++++++++++++ src/sdk/tests/soak_kvm.rs | 160 +++++++++++++++++++++ 5 files changed, 573 insertions(+), 7 deletions(-) create mode 100644 src/sdk/tests/integration_kvm.rs create mode 100644 src/sdk/tests/soak_kvm.rs diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 01e50777..e1b624d0 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -244,3 +244,27 @@ jobs: unset A3S_DEPS_STUB chmod +x bench/bench.sh bench/bench.sh race + # SDK programmable-CI pipeline against REAL microVMs: warm + CoW fork-per-step, + # parallel collect-all, `::metric` extraction, fork isolation, leak-freeness, + # and crash-orphan recovery (sweep_orphans). The unit suite only drives a fake + # CLI, so this is the only gate that the real fork/snapshot path actually works. + - name: SDK pipeline integration — real fork-per-step + sweep + env: + A3S_BOX: ${{ github.workspace }}/src/target/release/a3s-box + A3S_SDK_TEST_IMAGE: ${{ vars.KVM_CI_AGENT_IMAGE || 'docker.m.daocloud.io/library/alpine:latest' }} + run: | + unset A3S_DEPS_STUB + cd src + cargo test --release -p a3s-box-sdk --test integration_kvm -- --ignored --nocapture --test-threads=1 + # Soak the fork-eval loop: sustained churn must stay leak-free (no orphan + # boxes/snapshots, snapshot count flat) and memory-stable. Light here (120 + # fork-evals); crank A3S_SDK_SOAK_FORKS for a manual long soak. + - name: SDK pipeline soak — leak-free under churn + env: + A3S_BOX: ${{ github.workspace }}/src/target/release/a3s-box + A3S_SDK_TEST_IMAGE: ${{ vars.KVM_CI_AGENT_IMAGE || 'docker.m.daocloud.io/library/alpine:latest' }} + A3S_SDK_SOAK_FORKS: "120" + run: | + unset A3S_DEPS_STUB + cd src + cargo test --release -p a3s-box-sdk --test soak_kvm -- --ignored --nocapture --test-threads=1 diff --git a/CHANGELOG.md b/CHANGELOG.md index 69df4235..135e06f4 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -16,7 +16,18 @@ All notable changes to A3S Box will be documented in this file. threads. The base auto-removes its snapshot on `Drop` (`--force`), and each fork is removed on every path (including panic). Box/snapshot names now carry per-process + per-instance entropy, so concurrent pipelines from the same image+setup can no longer - collide and tear down each other's boxes. Validated end-to-end on a real `/dev/kvm` host. + collide and tear down each other's boxes. A fork that hits a *transient* + infrastructure failure (restore/start/boot) is retried — `WarmBase::infra_retries`, + default 2 — since its command never ran, which keeps sustained high-concurrency + churn green. Validated end-to-end on a real `/dev/kvm` host. +- **Crash-orphan recovery + real-VM integration & soak tests.** `sweep_orphans()` + reclaims `ci-base-*` boxes/snapshots left behind when a pipeline process is + `SIGKILL`ed / OOM-killed (its RAII cleanup never runs), by matching the dead owner + pid embedded in the resource name — and it never touches a live peer's resources. + Added `#[ignore]`'d real-microVM integration tests (`tests/integration_kvm.rs`: + warm + fork-per-step, cache, parallel order/metrics, fork isolation, leak-freeness, + sweep) and a soak test (`tests/soak_kvm.rs`: sustained fork-eval churn stays + leak-free and RSS-stable), both wired into the KVM CI gate. ### Changed diff --git a/src/sdk/src/pipeline.rs b/src/sdk/src/pipeline.rs index bc33cc2f..3dc91736 100644 --- a/src/sdk/src/pipeline.rs +++ b/src/sdk/src/pipeline.rs @@ -52,12 +52,18 @@ use std::path::PathBuf; use std::process::{Command, Output}; use std::sync::atomic::{AtomicBool, AtomicU32, Ordering}; use std::sync::Mutex; -use std::time::Instant; +use std::time::{Duration, Instant}; /// Default per-step cap on captured stdout/stderr — bounds in-memory [`Report`] /// size when fanning hundreds of forks out. Override via [`WarmBase::max_output`]. const DEFAULT_MAX_OUTPUT: usize = 1 << 20; // 1 MiB +/// Default number of times to retry a fork that hits an *infrastructure* failure +/// (restore/start/boot/exec-spawn). The step's command never ran, so re-forking is +/// safe and idempotent; this absorbs the transient boot hiccups that surface under +/// sustained, highly-concurrent churn. Override via [`WarmBase::infra_retries`]. +const DEFAULT_INFRA_RETRIES: usize = 2; + /// `exec_step` returns this exit code when the step never ran because of an /// *infrastructure* failure (restore/start/exec), distinct from any real exit /// code (a host process killed by a signal surfaces as -1, a guest `exit -1` as @@ -319,6 +325,7 @@ pub struct WarmBase<'a> { env: BTreeMap, cache: Option<&'a FileCache>, max_output: usize, + infra_retries: usize, } impl<'a> WarmBase<'a> { @@ -329,6 +336,7 @@ impl<'a> WarmBase<'a> { env: BTreeMap::new(), cache: None, max_output: DEFAULT_MAX_OUTPUT, + infra_retries: DEFAULT_INFRA_RETRIES, } } pub fn env(mut self, k: impl Into, v: impl Into) -> Self { @@ -344,6 +352,12 @@ impl<'a> WarmBase<'a> { self.max_output = bytes; self } + /// Retries for a fork that hits an infrastructure failure (default 2). The + /// step's command never ran on such a failure, so re-forking is idempotent. + pub fn infra_retries(mut self, n: usize) -> Self { + self.infra_retries = n; + self + } } /// A pipeline step: a command run in its own kernel forked from the base. @@ -468,6 +482,7 @@ pub struct Base<'a> { snapshot_id: String, cache: Option<&'a FileCache>, max_output: usize, + infra_retries: usize, n: AtomicU32, disposed: AtomicBool, } @@ -543,10 +558,29 @@ impl Base<'_> { }) } + /// `exec_step` with bounded retry on *infrastructure* failure (the step's + /// command never ran, so re-forking is idempotent). A non-zero step exit is + /// returned as `Ok` and never retried — only a failed fork is. + fn exec_step_retrying(&self, step: &Step) -> Result { + let mut last: Option = None; + for attempt in 0..=self.infra_retries { + match self.exec_step(step) { + Ok(r) => return Ok(r), + Err(e) => { + last = Some(e); + if attempt < self.infra_retries { + std::thread::sleep(Duration::from_millis(150 * (attempt as u64 + 1))); + } + } + } + } + Err(last.expect("loop runs at least once")) + } + /// Run one step, **fail-fast**: a non-zero exit returns `Err(StepFailed)` /// unless `Step::allow_failure` was set. Use for a sequential pipeline. pub fn step(&self, step: Step) -> Result { - let r = self.exec_step(&step)?; + let r = self.exec_step_retrying(&step)?; if r.exit_code != 0 && !step.allow_failure { return Err(PipelineError::StepFailed { name: r.name.clone(), @@ -561,7 +595,7 @@ impl Base<'_> { /// `StepResult{ exit_code: `[`INFRA_FAILURE`]`, stderr: }` so one bad /// fork can't abort the batch. fn run_one(&self, step: Step) -> StepResult { - match self.exec_step(&step) { + match self.exec_step_retrying(&step) { Ok(r) => r, Err(e) => StepResult { name: step.name.clone(), @@ -701,11 +735,75 @@ pub fn warm_base(spec: WarmBase<'_>) -> Result> { snapshot_id: prepared?, cache: spec.cache, max_output: spec.max_output, + infra_retries: spec.infra_retries, n: AtomicU32::new(0), disposed: AtomicBool::new(false), }) } +/// The owner pid embedded in a `ci-base---...` resource name. +fn parse_owner_pid(name: &str) -> Option { + let rest = name.strip_prefix("ci-base-")?; + let mut it = rest.split('-'); + it.next()?; // key hash + it.next()?.parse().ok() +} + +/// `Some(true/false)` where pid-liveness is knowable (Linux `/proc`); `None` +/// otherwise — so a sweep never reclaims resources whose owner it can't confirm dead. +fn pid_alive(pid: u32) -> Option { + if !std::path::Path::new("/proc").is_dir() { + return None; + } + Some(std::path::Path::new(&format!("/proc/{pid}")).exists()) +} + +/// If `name` is a ci-base resource owned by a confirmed-DEAD pid other than `me`, +/// return that pid (it is an orphan safe to reclaim). +fn orphan_pid(name: &str, me: u32) -> Option { + let pid = parse_owner_pid(name)?; + if pid == me { + return None; // ours — never sweep a live self + } + match pid_alive(pid) { + Some(false) => Some(pid), // confirmed dead + _ => None, // alive or unknowable — leave it + } +} + +/// Reclaim `ci-base-*` boxes and snapshots leaked by a **crashed** pipeline +/// process. `Drop`/guards clean up graceful exits, but `SIGKILL`/OOM/power-loss +/// skip them; resource names embed the owner pid, so a resource whose pid is no +/// longer alive (and isn't this process) is removed. Returns the names reclaimed. +/// Safe to call concurrently with live pipelines: it only touches dead-pid orphans. +pub fn sweep_orphans() -> Vec { + let me = std::process::id(); + let mut removed = Vec::new(); + + if let Ok(out) = box_run(&["ps", "-a", "--format", "{{.Names}}"]) { + for name in String::from_utf8_lossy(&out.stdout).lines() { + let name = name.trim(); + if orphan_pid(name, me).is_some() { + box_cleanup(&["rm", "-f", name]); + removed.push(name.to_string()); + } + } + } + if let Ok(out) = box_run(&["snapshot", "ls"]) { + let text = String::from_utf8_lossy(&out.stdout); + for line in text.lines() { + let mut cols = line.split_whitespace(); + if let (Some(id), Some(name)) = (cols.next(), cols.next()) { + if orphan_pid(name, me).is_some() { + box_cleanup(&["snapshot", "rm", "--force", id]); + removed.push(name.to_string()); + } + } + } + } + removed +} + #[cfg(test)] mod tests { use super::*; @@ -866,6 +964,31 @@ mod tests { ); } + #[test] + fn parse_owner_pid_and_orphan_detection() { + assert_eq!(parse_owner_pid("ci-base-deadbeef-4242-0-snap"), Some(4242)); + assert_eq!( + parse_owner_pid("ci-base-deadbeef-4242-7-snap-job3-make"), + Some(4242) + ); + assert_eq!(parse_owner_pid("other-box"), None); + assert_eq!(parse_owner_pid("ci-base-onlykey"), None); + // this process's own resources are never orphans, regardless of liveness. + let me = std::process::id(); + let mine = format!("ci-base-deadbeef-{me}-0-snap"); + assert_eq!(orphan_pid(&mine, me), None); + // a confirmed-dead pid is an orphan — but only where liveness is knowable + // (Linux /proc); on hosts where it isn't, sweep must leave it untouched. + if pid_alive(4_000_000_000).is_some() { + assert_eq!( + orphan_pid("ci-base-deadbeef-4000000000-0-snap", me), + Some(4_000_000_000) + ); + } else { + assert_eq!(orphan_pid("ci-base-deadbeef-4000000000-0-snap", me), None); + } + } + #[test] fn ci_error_display_covers_each_variant() { let io = PipelineError::from(std::io::Error::other("boom")); @@ -906,7 +1029,14 @@ case "$1" in case "$2" in create) name=""; while [ "$#" -gt 0 ]; do [ "$1" = "--name" ] && name="$2"; shift; done printf '%s %s\n' "snap-fake1" "$name" >> "$state"; printf 'snap-fake1\n'; exit 0 ;; - restore) for a in "$@"; do case "$a" in *infrafail*) exit 1 ;; esac; done; exit 0 ;; + restore) + for a in "$@"; do + case "$a" in + *infrafail*) exit 1 ;; + *flaky*) c="$(dirname "$0")/.flaky"; n=$(cat "$c" 2>/dev/null || echo 0); n=$((n+1)); echo "$n" > "$c"; [ "$n" -le 2 ] && exit 1 || exit 0 ;; + esac + done + exit 0 ;; ls) printf 'ID NAME SRC\n'; cat "$state" 2>/dev/null; exit 0 ;; *) exit 0 ;; esac ;; @@ -972,12 +1102,23 @@ esac ); assert!(rep2.passed, "allow_failure step must not fail the batch"); - // an infra failure (restore refuses) surfaces as INFRA_FAILURE, not a panic. + // an infra failure (restore refuses) surfaces as INFRA_FAILURE, not a panic + // (it is retried infra_retries times first, then gives up). let rep3 = base.run_parallel(vec![Step::new("infrafail", "make")], 1); assert_eq!(rep3.steps[0].exit_code, INFRA_FAILURE); assert!(!rep3.steps[0].stderr.is_empty()); assert!(!rep3.passed); + // a TRANSIENT infra failure is retried: "flaky" restore fails twice then + // succeeds, so with the default 2 retries (3 attempts) the step recovers. + let _ = std::fs::remove_file(dir.join(".flaky")); + let okr = base.run_parallel(vec![Step::new("flaky", "make")], 1); + assert_eq!( + okr.steps[0].exit_code, 0, + "transient infra failure should be retried to success" + ); + assert!(okr.passed); + // empty batch is trivially passed. assert!(base.run_parallel(Vec::new(), 4).passed); @@ -1028,7 +1169,8 @@ esac let _ = WarmBase::new("img", "setup") .env("A", "1") .cache(&c) - .max_output(4096); + .max_output(4096) + .infra_retries(1); let _ = Step::new("n", "cmd") .input("x") .env("K", "V") diff --git a/src/sdk/tests/integration_kvm.rs b/src/sdk/tests/integration_kvm.rs new file mode 100644 index 00000000..7fc58509 --- /dev/null +++ b/src/sdk/tests/integration_kvm.rs @@ -0,0 +1,229 @@ +//! Real-microVM integration tests for the programmable-CI pipeline. +//! +//! `#[ignore]` by default. Run on a `/dev/kvm` host: +//! ```text +//! A3S_BOX=/path/to/a3s-box \ +//! cargo test -p a3s-box-sdk --test integration_kvm -- --ignored --nocapture --test-threads=1 +//! ``` +//! Override the image with `A3S_SDK_TEST_IMAGE` (default: a daocloud alpine mirror). +//! Each test self-skips if no usable `a3s-box`/virtualization is present. + +use a3s_box_sdk::pipeline::{sweep_orphans, warm_base, FileCache, Step, WarmBase}; +use std::process::Command; + +fn a3s_box() -> String { + std::env::var("A3S_BOX").unwrap_or_else(|_| "a3s-box".into()) +} + +fn image() -> String { + std::env::var("A3S_SDK_TEST_IMAGE") + .unwrap_or_else(|_| "docker.m.daocloud.io/library/alpine:latest".into()) +} + +/// Run `a3s-box `, returning (success, stdout+stderr). +fn run(args: &[&str]) -> (bool, String) { + let out = Command::new(a3s_box()) + .args(args) + .output() + .expect("spawn a3s-box"); + let mut s = String::from_utf8_lossy(&out.stdout).into_owned(); + s.push_str(&String::from_utf8_lossy(&out.stderr)); + (out.status.success(), s) +} + +/// True only when a working `a3s-box` reports virtualization is available. +fn kvm_ready() -> bool { + match Command::new(a3s_box()).arg("info").output() { + Ok(o) => o.status.success() && String::from_utf8_lossy(&o.stdout).contains("available"), + Err(_) => false, + } +} + +/// Resource names embed this process's pid; scope leak counts to `--` so a +/// concurrent pipeline on the shared host can't perturb the assertions. +fn marker() -> String { + format!("-{}-", std::process::id()) +} + +fn ci_base_boxes() -> usize { + let m = marker(); + run(&["ps", "-a", "--format", "{{.Names}}"]) + .1 + .lines() + .filter(|l| l.trim().starts_with("ci-base-") && l.contains(&m)) + .count() +} + +fn ci_base_snaps() -> usize { + let m = marker(); + run(&["snapshot", "ls"]) + .1 + .lines() + .filter(|l| l.contains("ci-base-") && l.contains(&m)) + .count() +} + +#[test] +#[ignore = "needs a real a3s-box + /dev/kvm"] +fn warm_fork_exec_and_cache() { + if !kvm_ready() { + eprintln!("SKIP warm_fork_exec_and_cache: no A3S_BOX/KVM"); + return; + } + let cdir = std::env::temp_dir().join(format!("a3s-sdk-it-cache-{}", std::process::id())); + let _ = std::fs::remove_dir_all(&cdir); + let cache = FileCache::new(&cdir).unwrap(); + + let base = warm_base(WarmBase::new(image(), "echo DEPS-INSTALLED > /warmed").cache(&cache)) + .expect("warm_base"); + let r = base + .step(Step::new("read", "cat /warmed")) + .expect("step read"); + assert_eq!(r.exit_code, 0); + assert!(r.stdout.contains("DEPS-INSTALLED"), "stdout={:?}", r.stdout); + assert!(!r.cached); + let r2 = base + .step(Step::new("read", "cat /warmed")) + .expect("step read#2"); + assert!(r2.cached, "an identical step must hit the cache"); + base.dispose(); + let _ = std::fs::remove_dir_all(&cdir); +} + +#[test] +#[ignore = "needs a real a3s-box + /dev/kvm"] +fn parallel_collect_all_ordered_with_metrics() { + if !kvm_ready() { + eprintln!("SKIP parallel_collect_all_ordered_with_metrics: no A3S_BOX/KVM"); + return; + } + let base = warm_base(WarmBase::new(image(), "true")).expect("warm_base"); + let rep = base.run_parallel( + vec![ + Step::new("a", "echo first"), + Step::new( + "perf", + "echo '::metric duration_ms=12.5'; echo '::metric tests=7'", + ), + Step::new("b", "exit 3"), + Step::new("c", "echo third"), + ], + 4, + ); + assert_eq!(rep.steps.len(), 4); + assert_eq!(rep.steps[0].name, "a"); // input order preserved despite concurrency + assert_eq!(rep.steps[1].name, "perf"); + assert_eq!(rep.steps[1].metrics.get("duration_ms"), Some(&12.5)); + assert_eq!(rep.steps[1].metrics.get("tests"), Some(&7.0)); + assert_eq!(rep.steps[2].exit_code, 3); + assert!(!rep.passed); + assert_eq!(rep.failures().len(), 1); + assert!(rep.steps[0].duration_ms < 120_000); + assert!(rep.to_json().contains("\"duration_ms\":12.5")); + base.dispose(); +} + +#[test] +#[ignore = "needs a real a3s-box + /dev/kvm"] +fn forks_are_isolated() { + if !kvm_ready() { + eprintln!("SKIP forks_are_isolated: no A3S_BOX/KVM"); + return; + } + let base = warm_base(WarmBase::new(image(), "true")).expect("warm_base"); + // Each step is a fresh CoW fork of the base; a file written in one fork must + // not be visible to a sibling. + let rep = base.run_parallel( + vec![ + Step::new("writer", "echo HELLO > /marker; cat /marker"), + Step::new("reader", "cat /marker 2>/dev/null || echo MISSING"), + ], + 2, + ); + assert!(rep.steps[0].stdout.contains("HELLO")); + assert!( + rep.steps[1].stdout.contains("MISSING"), + "sibling fork leaked state: {:?}", + rep.steps[1].stdout + ); + base.dispose(); +} + +#[test] +#[ignore = "needs a real a3s-box + /dev/kvm"] +fn leak_free_under_churn() { + if !kvm_ready() { + eprintln!("SKIP leak_free_under_churn: no A3S_BOX/KVM"); + return; + } + let boxes0 = ci_base_boxes(); + let snaps0 = ci_base_snaps(); + let base = warm_base(WarmBase::new(image(), "true")).expect("warm_base"); + for gen in 0..3 { + let steps: Vec = (0..6) + .map(|i| Step::new(format!("g{gen}s{i}"), "true")) + .collect(); + let rep = base.run_parallel(steps, 4); + assert!(rep.passed, "gen {gen} unexpectedly failed"); + let m = marker(); + let lingering = run(&["ps", "-a", "--format", "{{.Names}}"]) + .1 + .lines() + .filter(|l| l.contains("-snap-job") && l.contains(&m)) + .count(); + assert_eq!(lingering, 0, "leaked fork boxes mid-churn at gen {gen}"); + } + base.dispose(); + assert_eq!(ci_base_boxes(), boxes0, "leaked ci-base boxes after run"); + assert_eq!( + ci_base_snaps(), + snaps0, + "leaked ci-base snapshots after dispose" + ); +} + +#[test] +#[ignore = "needs a real a3s-box + /dev/kvm"] +fn sweep_reclaims_dead_pid_orphan_but_spares_live() { + if !kvm_ready() { + eprintln!("SKIP sweep_reclaims_dead_pid_orphan_but_spares_live: no A3S_BOX/KVM"); + return; + } + // A live base (owned by THIS pid) must survive the sweep. + let base = warm_base(WarmBase::new(image(), "true")).expect("warm_base"); + + // Forge a running box named as if owned by a reaped (dead) pid. + let mut child = Command::new("true").spawn().expect("spawn true"); + let dead = child.id(); + child.wait().ok(); // reap -> /proc/ goes away -> confirmed dead + let orphan = format!("ci-base-deadbeef-{dead}-0-snap-job1-x"); + let (ok, out) = run(&[ + "run", + "-d", + "--name", + &orphan, + &image(), + "--", + "sleep", + "300", + ]); + assert!(ok, "could not create forged orphan box: {out}"); + + let removed = sweep_orphans(); + assert!( + removed.iter().any(|n| n == &orphan), + "sweep did not reclaim the dead-pid orphan; removed={removed:?}" + ); + let names = run(&["ps", "-a", "--format", "{{.Names}}"]).1; + assert!( + !names.lines().any(|l| l.trim() == orphan), + "orphan box still present after sweep" + ); + // The live base was spared — a step on it still works. + assert!( + base.step(Step::new("alive", "true")).is_ok(), + "sweep wrongly reclaimed a live base" + ); + run(&["rm", "-f", &orphan]); // belt-and-suspenders + base.dispose(); +} diff --git a/src/sdk/tests/soak_kvm.rs b/src/sdk/tests/soak_kvm.rs new file mode 100644 index 00000000..e7d10ab2 --- /dev/null +++ b/src/sdk/tests/soak_kvm.rs @@ -0,0 +1,160 @@ +//! Soak test: sustained fork-eval churn must stay leak-free and memory-stable. +//! +//! `#[ignore]` by default. Run on a `/dev/kvm` host: +//! ```text +//! A3S_BOX=/path/to/a3s-box A3S_SDK_SOAK_FORKS=2000 \ +//! cargo test -p a3s-box-sdk --test soak_kvm -- --ignored --nocapture --test-threads=1 +//! ``` +//! Knobs: `A3S_SDK_SOAK_FORKS` (total fork-evals, default 200), +//! `A3S_SDK_SOAK_CONC` (max concurrency, default 8), `A3S_SDK_TEST_IMAGE`. + +use a3s_box_sdk::pipeline::{warm_base, Step, WarmBase}; +use std::process::Command; +use std::time::Instant; + +fn a3s_box() -> String { + std::env::var("A3S_BOX").unwrap_or_else(|_| "a3s-box".into()) +} + +fn image() -> String { + std::env::var("A3S_SDK_TEST_IMAGE") + .unwrap_or_else(|_| "docker.m.daocloud.io/library/alpine:latest".into()) +} + +fn kvm_ready() -> bool { + match Command::new(a3s_box()).arg("info").output() { + Ok(o) => o.status.success() && String::from_utf8_lossy(&o.stdout).contains("available"), + Err(_) => false, + } +} + +/// Resource names embed this process's pid (`ci-base---...`); scope +/// every leak count to `--` so a *concurrent* pipeline on the same host can't +/// perturb the assertions (the global namespace is shared). +fn marker() -> String { + format!("-{}-", std::process::id()) +} + +/// Count THIS run's boxes whose name also contains `needle`. +fn count_box_names(needle: &str) -> usize { + let m = marker(); + let out = Command::new(a3s_box()) + .args(["ps", "-a", "--format", "{{.Names}}"]) + .output() + .expect("a3s-box ps"); + String::from_utf8_lossy(&out.stdout) + .lines() + .filter(|l| l.contains(needle) && l.contains(&m)) + .count() +} + +/// Count THIS run's ci-base snapshots. +fn count_my_snaps() -> usize { + let m = marker(); + let out = Command::new(a3s_box()) + .args(["snapshot", "ls"]) + .output() + .expect("a3s-box snapshot ls"); + String::from_utf8_lossy(&out.stdout) + .lines() + .filter(|l| l.contains("ci-base-") && l.contains(&m)) + .count() +} + +fn rss_kib() -> Option { + let s = std::fs::read_to_string("/proc/self/status").ok()?; + s.lines() + .find_map(|l| l.strip_prefix("VmRSS:")) + .and_then(|v| v.split_whitespace().next()) + .and_then(|n| n.parse().ok()) +} + +fn env_usize(key: &str, default: usize) -> usize { + std::env::var(key) + .ok() + .and_then(|v| v.parse().ok()) + .unwrap_or(default) +} + +#[test] +#[ignore = "needs a real a3s-box + /dev/kvm; long-running"] +fn soak_fork_eval_is_leak_free_and_stable() { + if !kvm_ready() { + eprintln!("SKIP soak_fork_eval_is_leak_free_and_stable: no A3S_BOX/KVM"); + return; + } + let target = env_usize("A3S_SDK_SOAK_FORKS", 200); + let conc = env_usize("A3S_SDK_SOAK_CONC", 8); + let batch = 20usize; + + let snaps0 = count_my_snaps(); + let rss0 = rss_kib(); + + let base = warm_base(WarmBase::new(image(), "true")).expect("warm_base"); + let snap_base = count_my_snaps(); + assert_eq!( + snap_base, + snaps0 + 1, + "warm_base should add exactly one snapshot" + ); + + let start = Instant::now(); + let mut done = 0usize; + let mut gen = 0usize; + while done < target { + let n = batch.min(target - done); + let steps: Vec = (0..n) + .map(|i| Step::new(format!("g{gen}s{i}"), "echo '::metric ok=1'")) + .collect(); + let rep = base.run_parallel(steps, conc); + assert!( + rep.passed, + "gen {gen} had failures: {:?}", + rep.failures() + .iter() + .map(|s| { + ( + s.name.clone(), + s.exit_code, + s.stderr.chars().take(160).collect::(), + ) + }) + .collect::>() + ); + assert_eq!(rep.steps.len(), n); + // every step parsed its metric (proves the channel survives churn). + assert!(rep.steps.iter().all(|s| s.metrics.get("ok") == Some(&1.0))); + // leak gate per generation: no fork box lingers, snapshot count stays flat. + assert_eq!( + count_box_names("-snap-job"), + 0, + "leaked fork boxes after gen {gen}" + ); + assert_eq!(count_my_snaps(), snap_base, "snapshot drift at gen {gen}"); + done += n; + gen += 1; + } + + let elapsed = start.elapsed().as_secs_f64(); + eprintln!( + "SOAK: {done} fork-evals across {gen} generations in {elapsed:.1}s = {:.1} forks/s", + done as f64 / elapsed + ); + + base.dispose(); + assert_eq!(count_my_snaps(), snaps0, "snapshot leaked after dispose"); + assert_eq!( + count_box_names("ci-base-"), + 0, + "ci-base box leaked after soak" + ); + + if let (Some(a), Some(b)) = (rss0, rss_kib()) { + let grow = b.saturating_sub(a); + eprintln!("SOAK RSS: {a} KiB -> {b} KiB (+{grow} KiB over {done} fork-evals)"); + assert!( + grow < 200_000, + "RSS grew {grow} KiB over {done} fork-evals — possible orchestrator leak" + ); + } +} From 55136c647fea5ef198878706026cc5a3387dcf7d Mon Sep 17 00:00:00 2001 From: Roy Lin Date: Mon, 29 Jun 2026 08:11:43 +0800 Subject: [PATCH 3/5] feat(sdk): a3s-box-ci runner binary + warm_base infra-retry - a3s-box-ci: a dep-free bin (in a3s-box-sdk) bridging any agent/tool to the pipeline. `run [SPEC|-]` parses a line-based spec -> run_parallel -> JSON Report (exit 0 iff passed); `sweep` reclaims crashed-pipeline orphans. This is what lets a3s-code / Claude Code / Codex drive the pipeline from a script. - warm_base now retries a transient infrastructure failure too (DRY'd with the per-step fork via a shared retry_infra), so concurrent same-image warms stay robust under load. Validated end-to-end on real KVM (runner: real pipeline + sweep; a3s-code drives it via session.program through the QuickJS runtime). 19 unit/bin tests + clippy. --- CHANGELOG.md | 6 + src/sdk/Cargo.toml | 6 + src/sdk/src/bin/a3s-box-ci.rs | 252 ++++++++++++++++++++++++++++++++++ src/sdk/src/pipeline.rs | 46 ++++--- 4 files changed, 294 insertions(+), 16 deletions(-) create mode 100644 src/sdk/src/bin/a3s-box-ci.rs diff --git a/CHANGELOG.md b/CHANGELOG.md index 135e06f4..ff6f8115 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -28,6 +28,12 @@ All notable changes to A3S Box will be documented in this file. warm + fork-per-step, cache, parallel order/metrics, fork isolation, leak-freeness, sweep) and a soak test (`tests/soak_kvm.rs`: sustained fork-eval churn stays leak-free and RSS-stable), both wired into the KVM CI gate. +- **`a3s-box-ci` runner + `warm_base` retry.** A dependency-free `a3s-box-ci` binary + (shipped by the `a3s-box-sdk` crate) bridges any agent/tool to the pipeline: a + line-based spec on stdin → a JSON `Report` on stdout (`a3s-box-ci run -`), plus + `a3s-box-ci sweep` for crash-orphan recovery. `warm_base` now also retries on a + transient infrastructure failure (sharing the step-fork's `retry_infra` budget), + so concurrent same-image warms are more robust under load. ### Changed diff --git a/src/sdk/Cargo.toml b/src/sdk/Cargo.toml index 8552bcde..a9d21a47 100644 --- a/src/sdk/Cargo.toml +++ b/src/sdk/Cargo.toml @@ -9,3 +9,9 @@ description = "Rust SDK for a3s-box. Includes a programmable CI/CD pipeline API [dependencies] # none — a thin, dependency-free wrapper over the `a3s-box` CLI. + +# Thin runner that bridges any agent (a3s-code / Claude Code / Codex) to the +# pipeline API: a line-based spec on stdin -> JSON Report on stdout. +[[bin]] +name = "a3s-box-ci" +path = "src/bin/a3s-box-ci.rs" diff --git a/src/sdk/src/bin/a3s-box-ci.rs b/src/sdk/src/bin/a3s-box-ci.rs new file mode 100644 index 00000000..459a03d6 --- /dev/null +++ b/src/sdk/src/bin/a3s-box-ci.rs @@ -0,0 +1,252 @@ +//! `a3s-box-ci` — thin runner bridging any agent (a3s-code / Claude Code / Codex) +//! to the programmable-CI pipeline: a line-based spec in, a JSON `Report` out. +//! +//! ```text +//! a3s-box-ci run [SPEC|-] # run a pipeline; prints Report JSON; exit 0 iff passed +//! a3s-box-ci sweep # reclaim crashed-pipeline orphans; prints {"removed":[...]} +//! ``` +//! +//! Spec format (line-based; `#` comments; reads stdin when SPEC is `-` or omitted): +//! ```text +//! image # required +//! setup # optional (default: true) +//! env # repeatable +//! concurrency # default 4 +//! retries # infra retries (default 2) +//! max-output # cap per-step stdout/stderr +//! cache # content-addressed step cache +//! step :: # repeatable; >= 1 required +//! ``` +//! Output JSON is `Report::to_json()`; the process exits non-zero when the report +//! did not pass, so a shell/agent caller can branch on it. + +use a3s_box_sdk::pipeline::{sweep_orphans, warm_base, FileCache, Step, WarmBase}; +use std::io::Read; + +#[derive(Debug)] +struct Spec { + image: String, + setup: String, + env: Vec<(String, String)>, + steps: Vec<(String, String)>, + concurrency: usize, + retries: Option, + max_output: Option, + cache: Option, +} + +fn parse_spec(text: &str) -> Result { + let mut s = Spec { + image: String::new(), + setup: "true".into(), + env: Vec::new(), + steps: Vec::new(), + concurrency: 4, + retries: None, + max_output: None, + cache: None, + }; + for (n, raw) in text.lines().enumerate() { + let line = raw.trim(); + if line.is_empty() || line.starts_with('#') { + continue; + } + let (key, rest) = match line.split_once(char::is_whitespace) { + Some((k, r)) => (k, r.trim()), + None => (line, ""), + }; + let bad = |what: &str| format!("line {}: {what}", n + 1); + match key { + "image" => s.image = rest.to_string(), + "setup" => s.setup = rest.to_string(), + "cache" => s.cache = Some(rest.to_string()), + "concurrency" => s.concurrency = rest.parse().map_err(|_| bad("bad concurrency"))?, + "retries" => s.retries = Some(rest.parse().map_err(|_| bad("bad retries"))?), + "max-output" => s.max_output = Some(rest.parse().map_err(|_| bad("bad max-output"))?), + "env" => { + let (k, v) = rest + .split_once('=') + .ok_or_else(|| bad("env needs KEY=VALUE"))?; + s.env.push((k.trim().to_string(), v.to_string())); + } + "step" => { + let (name, cmd) = rest + .split_once("::") + .ok_or_else(|| bad("step needs `name :: cmd`"))?; + s.steps + .push((name.trim().to_string(), cmd.trim().to_string())); + } + other => return Err(bad(&format!("unknown key `{other}`"))), + } + } + if s.image.is_empty() { + return Err("spec missing required `image`".into()); + } + if s.steps.is_empty() { + return Err("spec has no `step`".into()); + } + Ok(s) +} + +fn read_input(arg: Option<&str>) -> std::io::Result { + match arg { + None | Some("-") => { + let mut buf = String::new(); + std::io::stdin().read_to_string(&mut buf)?; + Ok(buf) + } + Some(path) => std::fs::read_to_string(path), + } +} + +fn run(spec_arg: Option<&str>) -> i32 { + let text = match read_input(spec_arg) { + Ok(t) => t, + Err(e) => { + eprintln!("a3s-box-ci: read spec: {e}"); + return 2; + } + }; + let spec = match parse_spec(&text) { + Ok(s) => s, + Err(e) => { + eprintln!("a3s-box-ci: spec error: {e}"); + return 2; + } + }; + let cache = match &spec.cache { + Some(dir) => match FileCache::new(dir) { + Ok(c) => Some(c), + Err(e) => { + eprintln!("a3s-box-ci: cache dir: {e}"); + return 2; + } + }, + None => None, + }; + + let mut wb = WarmBase::new(spec.image.as_str(), spec.setup.as_str()); + for (k, v) in &spec.env { + wb = wb.env(k.as_str(), v.as_str()); + } + if let Some(c) = &cache { + wb = wb.cache(c); + } + if let Some(r) = spec.retries { + wb = wb.infra_retries(r); + } + if let Some(m) = spec.max_output { + wb = wb.max_output(m); + } + + let base = match warm_base(wb) { + Ok(b) => b, + Err(e) => { + eprintln!("a3s-box-ci: warm_base: {e}"); + return 3; + } + }; + let steps: Vec = spec + .steps + .iter() + .map(|(name, cmd)| Step::new(name.as_str(), cmd.as_str())) + .collect(); + let report = base.run_parallel(steps, spec.concurrency); + println!("{}", report.to_json()); + i32::from(!report.passed) // 0 iff passed +} + +fn sweep() -> i32 { + let removed = sweep_orphans(); + let items: Vec = removed + .iter() + .map(|n| format!("\"{}\"", n.replace('\\', "\\\\").replace('"', "\\\""))) + .collect(); + println!("{{\"removed\":[{}]}}", items.join(",")); + 0 +} + +fn main() { + let args: Vec = std::env::args().collect(); + let code = match args.get(1).map(String::as_str) { + Some("run") => run(args.get(2).map(String::as_str)), + Some("sweep") => sweep(), + _ => { + eprintln!("usage: a3s-box-ci "); + 2 + } + }; + std::process::exit(code); +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn parses_a_full_spec() { + let s = parse_spec( + "# a pipeline\n\ + image node:20\n\ + setup git clone $REPO /w && cd /w && npm ci\n\ + env REPO=https://x/y\n\ + env TOKEN=a=b\n\ + concurrency 6\n\ + retries 1\n\ + max-output 4096\n\ + cache .ci\n\ + step lint :: cd /w && npm run lint\n\ + step test :: cd /w && npm test\n", + ) + .unwrap(); + assert_eq!(s.image, "node:20"); + assert_eq!(s.setup, "git clone $REPO /w && cd /w && npm ci"); + assert_eq!( + s.env, + vec![ + ("REPO".to_string(), "https://x/y".to_string()), + ("TOKEN".to_string(), "a=b".to_string()), // only the FIRST '=' splits + ] + ); + assert_eq!(s.concurrency, 6); + assert_eq!(s.retries, Some(1)); + assert_eq!(s.max_output, Some(4096)); + assert_eq!(s.cache.as_deref(), Some(".ci")); + assert_eq!( + s.steps, + vec![ + ("lint".to_string(), "cd /w && npm run lint".to_string()), + ("test".to_string(), "cd /w && npm test".to_string()), + ] + ); + } + + #[test] + fn defaults_when_omitted() { + let s = parse_spec("image alpine\nstep go :: true\n").unwrap(); + assert_eq!(s.setup, "true"); + assert_eq!(s.concurrency, 4); + assert!(s.retries.is_none() && s.max_output.is_none() && s.cache.is_none()); + assert!(s.env.is_empty()); + } + + #[test] + fn rejects_bad_specs() { + assert!(parse_spec("step go :: true").unwrap_err().contains("image")); + assert!(parse_spec("image alpine") + .unwrap_err() + .contains("no `step`")); + assert!(parse_spec("image alpine\nstep oops") + .unwrap_err() + .contains("name :: cmd")); + assert!(parse_spec("image alpine\nenv NOEQ\nstep g :: true") + .unwrap_err() + .contains("KEY=VALUE")); + assert!(parse_spec("image alpine\nbogus x\nstep g :: true") + .unwrap_err() + .contains("unknown key")); + assert!(parse_spec("image alpine\nconcurrency NaN\nstep g :: true") + .unwrap_err() + .contains("concurrency")); + } +} diff --git a/src/sdk/src/pipeline.rs b/src/sdk/src/pipeline.rs index 3dc91736..db977701 100644 --- a/src/sdk/src/pipeline.rs +++ b/src/sdk/src/pipeline.rs @@ -562,19 +562,7 @@ impl Base<'_> { /// command never ran, so re-forking is idempotent). A non-zero step exit is /// returned as `Ok` and never retried — only a failed fork is. fn exec_step_retrying(&self, step: &Step) -> Result { - let mut last: Option = None; - for attempt in 0..=self.infra_retries { - match self.exec_step(step) { - Ok(r) => return Ok(r), - Err(e) => { - last = Some(e); - if attempt < self.infra_retries { - std::thread::sleep(Duration::from_millis(150 * (attempt as u64 + 1))); - } - } - } - } - Err(last.expect("loop runs at least once")) + retry_infra(self.infra_retries, || self.exec_step(step)) } /// Run one step, **fail-fast**: a non-zero exit returns `Err(StepFailed)` @@ -693,10 +681,36 @@ fn parse_snapshot_id(ls_output: &str, name: &str) -> Option { None } -/// Warm a base box: run `setup` once, snapshot the result, return a forkable [`Base`]. +/// Retry `f` on an infrastructure failure, with backoff, up to `retries` extra +/// attempts. Shared by the per-step fork and `warm_base` — both re-run idempotent +/// work (a fresh box/fork each call), so a transient boot/restore hiccup under +/// concurrent load is absorbed rather than surfaced. +fn retry_infra(retries: usize, mut f: impl FnMut() -> Result) -> Result { + let mut last: Option = None; + for attempt in 0..=retries { + match f() { + Ok(v) => return Ok(v), + Err(e) => { + last = Some(e); + if attempt < retries { + std::thread::sleep(Duration::from_millis(150 * (attempt as u64 + 1))); + } + } + } + } + Err(last.expect("loop runs at least once")) +} + +/// Warm a base box: run `setup` once, snapshot the result, return a forkable +/// [`Base`]. Retried on a transient infrastructure failure (the spec's +/// `infra_retries` budget) so concurrent same-image warms stay robust under load. pub fn warm_base(spec: WarmBase<'_>) -> Result> { - // pid+instance entropy: two warms of the same spec get disjoint names, so they - // can't tear down each other's box/snapshot. Hence no shared-name pre-clean. + retry_infra(spec.infra_retries, || warm_once(&spec)) +} + +/// One warm attempt. Fresh pid+instance-tagged names each call, so a retry can't +/// collide with its own partial leftovers nor tear down a concurrent peer's box. +fn warm_once<'a>(spec: &WarmBase<'a>) -> Result> { let base_box = format!( "ci-base-{}-{}", key(&[&spec.image, &spec.setup]), From 38a01e2cec703a6e08e3011aa942dd7bd939996d Mon Sep 17 00:00:00 2001 From: Roy Lin Date: Mon, 29 Jun 2026 08:41:45 +0800 Subject: [PATCH 4/5] fix(runtime): rootfs-cache in-use guard prevents concurrent same-image corruption MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit RootfsCache::prune (called after a cache-miss put) evicted LRU entries with no in-use guard, so it could remove_dir_all a cache entry that a CONCURRENT box was using as its overlayfs lowerdir — that box's mount(2) then failed with ENOENT ('No such file or directory (os error 2)'), persisting through retries since the backing was gone. Two pipelines from the same image collapse onto one cache key, so this hit any concurrent same-image workload. Fix: the same in-use guard SnapshotStore::prune already applies to live CoW lowers. Each overlay box records the cache key it holds in /.rootfs- cache-key (removed with the box dir); prune skips any still-referenced key (prune_protecting, with prune as the empty-set wrapper). Found via a concurrent-pipeline chaos test driven through a3s-code; verified on a real /dev/kvm host (concurrency scenario: ~50% failure -> reliably green). 41 rootfs-cache unit tests + clippy clean. --- CHANGELOG.md | 15 ++++++ src/runtime/src/cache/rootfs_cache.rs | 69 +++++++++++++++++++++++++-- src/runtime/src/vm/layout.rs | 37 +++++++++++++- 3 files changed, 116 insertions(+), 5 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index ff6f8115..bdd333be 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -41,6 +41,21 @@ All notable changes to A3S Box will be documented in this file. (use `StepResult::combined()` for the old concatenated view). Breaking for direct `.logs` field access on the `a3s-box-sdk` pipeline API. +### Fixed + +- **Concurrent same-image pipelines could corrupt each other's rootfs cache.** + `RootfsCache::prune` (run after a cache-miss `put`) evicted least-recently-used + entries with no in-use guard, so it could `remove_dir_all` a cache entry that + another box was simultaneously using as its overlayfs **lowerdir** — the peer's + `mount(2)` then failed with `No such file or directory (os error 2)`, and the + failure persisted through retries (the backing was gone). Added the same in-use + guard `SnapshotStore::prune` already applies to live copy-on-write lowers: each + overlay box records the cache key it holds in a `/.rootfs-cache-key` + marker (removed with the box dir), and `prune` skips any still-referenced key. + Found via a concurrent-pipeline chaos test driven through a3s-code; root-caused + and verified on a real `/dev/kvm` host (the concurrency scenario went from ~50% + failure to reliably green). + ## [2.6.0] — 2026-06-26 ### Added diff --git a/src/runtime/src/cache/rootfs_cache.rs b/src/runtime/src/cache/rootfs_cache.rs index 54ae5ec0..48bbf04f 100644 --- a/src/runtime/src/cache/rootfs_cache.rs +++ b/src/runtime/src/cache/rootfs_cache.rs @@ -190,11 +190,27 @@ impl RootfsCache { Ok(()) } - /// Prune the cache to stay within the given entry count limit. + /// Prune the cache to stay within the given entry count / byte limit. /// - /// Evicts least-recently-accessed entries first. - /// Returns the number of entries evicted. + /// Evicts least-recently-accessed entries first. Returns the number evicted. pub fn prune(&self, max_entries: usize, max_bytes: u64) -> Result { + self.prune_protecting(max_entries, max_bytes, &std::collections::HashSet::new()) + } + + /// Like [`RootfsCache::prune`], but never evicts an entry whose key is in + /// `protected`. Such an entry is currently serving as a box's overlayfs + /// **lowerdir**, and `remove_dir_all`-ing it out from under a concurrent box's + /// `mount(2)` makes the mount fail with ENOENT ("No such file or directory"). + /// This is the same in-use guard [`crate::SnapshotStore::prune`] applies to + /// live copy-on-write lowers — without it, two pipelines built from the same + /// image (one cache-hit overlay box, one cache-miss box that prunes after its + /// put) can race and corrupt each other. + pub fn prune_protecting( + &self, + max_entries: usize, + max_bytes: u64, + protected: &std::collections::HashSet, + ) -> Result { let mut entries = self.list_entries()?; if entries.len() <= max_entries { @@ -215,6 +231,11 @@ impl RootfsCache { if current_count <= max_entries && current_size <= max_bytes { break; } + // Never evict an entry in use as a live overlay lower — deleting the + // lowerdir under a concurrent box's mount(2) is the bug this guards. + if protected.contains(&entry.key) { + continue; + } self.invalidate(&entry.key)?; current_count -= 1; current_size = current_size.saturating_sub(entry.size_bytes); @@ -495,6 +516,48 @@ mod tests { assert_eq!(cache.entry_count().unwrap(), 1); } + #[test] + fn prune_protecting_never_evicts_in_use_key() { + let tmp = TempDir::new().unwrap(); + let cache = RootfsCache::new(tmp.path()).unwrap(); + for i in 0..4 { + let src = tmp.path().join(format!("s{i}")); + create_test_rootfs(&src, &[("f", "x")]); + cache.put(&format!("k{i}"), &src, &format!("e{i}")).unwrap(); + std::thread::sleep(std::time::Duration::from_millis(10)); + } + // k0 is the OLDEST (normally evicted first) but is in use as an overlay lower. + let mut protected = std::collections::HashSet::new(); + protected.insert("k0".to_string()); + // keep=2 over 4 entries evicts two; the protected k0 is never one of them. + // (last_accessed is second-resolution, so WHICH two unprotected entries go + // is not asserted — only that the in-use lower survives.) + let evicted = cache.prune_protecting(2, u64::MAX, &protected).unwrap(); + assert_eq!(evicted, 2, "two unprotected entries evicted to meet keep=2"); + assert!( + cache.get("k0").unwrap().is_some(), + "the in-use (protected) lower must survive prune" + ); + assert_eq!(cache.entry_count().unwrap(), 2, "k0 + one unprotected remain"); + } + + #[test] + fn prune_protecting_keeps_all_when_all_in_use() { + let tmp = TempDir::new().unwrap(); + let cache = RootfsCache::new(tmp.path()).unwrap(); + for i in 0..2 { + let src = tmp.path().join(format!("s{i}")); + create_test_rootfs(&src, &[("f", "x")]); + cache.put(&format!("k{i}"), &src, "e").unwrap(); + } + let protected: std::collections::HashSet = + ["k0", "k1"].iter().map(|s| s.to_string()).collect(); + // Even asked to keep 0, nothing is evicted — every entry is a live lower. + let evicted = cache.prune_protecting(0, 0, &protected).unwrap(); + assert_eq!(evicted, 0, "all in-use -> nothing evicted"); + assert_eq!(cache.entry_count().unwrap(), 2); + } + #[test] fn test_rootfs_cache_metadata_persists() { let tmp = TempDir::new().unwrap(); diff --git a/src/runtime/src/vm/layout.rs b/src/runtime/src/vm/layout.rs index 225e4a90..4f2ede20 100644 --- a/src/runtime/src/vm/layout.rs +++ b/src/runtime/src/vm/layout.rs @@ -156,6 +156,9 @@ impl VmManager { let cache_key = RootfsCache::compute_key(reference, &[], &[], &[]); if let Some(cached_path) = self.try_rootfs_cache_path(&cache_key)? { let rootfs_path = self.rootfs_provider.prepare(&box_dir, &cached_path)?; + // Record that this box holds `cache_key` as its overlay lower, so a + // concurrent box's cache prune won't evict it mid-mount (ENOENT). + self.mark_rootfs_cache_key(&box_dir, &cache_key); let tee_instance_config = self.generate_tee_config(&box_dir)?; return Ok(BoxLayout { rootfs_path, @@ -204,6 +207,9 @@ impl VmManager { prom.rootfs_cache_hits.inc(); } let rootfs_path = self.rootfs_provider.prepare(&box_dir, &cached_path)?; + // Record that this box holds `cache_key` as its overlay lower, so a + // concurrent box's cache prune won't evict it mid-mount (ENOENT). + self.mark_rootfs_cache_key(&box_dir, &cache_key); if let Ok(guest_init_path) = Self::find_guest_init() { tracing::info!( @@ -377,10 +383,14 @@ impl VmManager { description = %description, "Stored rootfs in cache" ); - // Prune if needed - if let Err(e) = cache.prune( + // Prune if needed — but never evict a cache entry that is in use as + // a live overlay lower for a concurrent box (deleting the lowerdir + // under its mount(2) is the same-image concurrency bug this guards). + let protected = self.referenced_rootfs_cache_keys(); + if let Err(e) = cache.prune_protecting( self.config.cache.max_rootfs_entries, self.config.cache.max_cache_bytes, + &protected, ) { tracing::warn!(error = %e, "Failed to prune rootfs cache"); } @@ -391,6 +401,29 @@ impl VmManager { } } + /// Record which rootfs-cache key this box holds as its overlay lower, in a + /// `/.rootfs-cache-key` marker (mirror of the snapshot store's + /// `.snapshot-lower`). Read back by [`Self::referenced_rootfs_cache_keys`] so + /// the cache prune never evicts a live lower. Best-effort; removed with box_dir. + fn mark_rootfs_cache_key(&self, box_dir: &Path, cache_key: &str) { + let _ = std::fs::write(box_dir.join(".rootfs-cache-key"), cache_key); + } + + /// Rootfs-cache keys currently in use as an overlay lower by some live box. + /// Boxes live under `/boxes//`; a removed box's marker is gone with + /// its dir, so an evictable key is simply one no live box references. + fn referenced_rootfs_cache_keys(&self) -> std::collections::HashSet { + let mut set = std::collections::HashSet::new(); + if let Ok(entries) = std::fs::read_dir(self.home_dir.join("boxes")) { + for entry in entries.flatten() { + if let Ok(k) = std::fs::read_to_string(entry.path().join(".rootfs-cache-key")) { + set.insert(k.trim().to_string()); + } + } + } + set + } + /// Resolve the cache directory from config or default. pub(crate) fn resolve_cache_dir(&self) -> PathBuf { self.config From e76d41340947efd3b1ebbe5b8f9af6ead6f731da Mon Sep 17 00:00:00 2001 From: Roy Lin Date: Mon, 29 Jun 2026 09:02:39 +0800 Subject: [PATCH 5/5] style(runtime): rustfmt the rootfs-cache in-use-guard test --- src/runtime/src/cache/rootfs_cache.rs | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/src/runtime/src/cache/rootfs_cache.rs b/src/runtime/src/cache/rootfs_cache.rs index 48bbf04f..e01dc6d6 100644 --- a/src/runtime/src/cache/rootfs_cache.rs +++ b/src/runtime/src/cache/rootfs_cache.rs @@ -538,7 +538,11 @@ mod tests { cache.get("k0").unwrap().is_some(), "the in-use (protected) lower must survive prune" ); - assert_eq!(cache.entry_count().unwrap(), 2, "k0 + one unprotected remain"); + assert_eq!( + cache.entry_count().unwrap(), + 2, + "k0 + one unprotected remain" + ); } #[test]