From 3192930747bb69b5810eb01f3f571688cdac3cb7 Mon Sep 17 00:00:00 2001 From: Ruben Fiszel Date: Fri, 26 Jun 2026 13:45:40 +0000 Subject: [PATCH 01/10] [ee] perf(audit): re-anchor S3 audit export on enable + opt-in backfill MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The S3/GCS audit-log export's steady-state query filters by `age(xmin)` (unindexable), so the only scan bound is the timestamp floor. On a fresh enable the floor was epoch, and on a re-enable the cursor resumed from its pre-disable position — either way the first run scanned the whole `audit_partitioned` table. Under a `statement_timeout` (e.g. Aiven) that scan never completes: the cursor never advances, nothing is exported, and the repeated full scans saturate the database. Re-anchor on enable (EE companion, windmill-ee-private#634): - New trigger migration records a recent timestamp floor instead of the epoch sentinel and `DO UPDATE`s the cursor to the current snapshot xmin on re-enable, so the export always resumes from ~now and never rescans history. Includes a one-time fixup for legacy epoch-sentinel checkpoints on upgrade. Opt-in historical backfill (new `audit_logs_s3_backfill` module + endpoints): - Exports a chosen `[from, to)` window on demand, scanning strictly by `timestamp` (the partition key) in bounded keyset pages — each query is an index scan capped at one page (verified via EXPLAIN: later partitions `never executed`, ~11ms/page), so it stays well under any statement timeout regardless of window size. Writes alongside the steady-state objects under logs/audit/, without touching the xmin cursor. - POST /settings/audit_logs_s3_backfill {from,to} (super-admin + Enterprise), GET /settings/audit_logs_s3_backfill_status. Also repurposes the status endpoint's `bootstrapping` flag to mean "draining a backlog" (the cursor is capped and catching up), and updates the setting description to point operators at the backfill. Co-Authored-By: Claude Opus 4.8 (1M context) --- ...100e926bb678171947e71d6d87fdd0c3f9299.json | 15 + ...790770c32716ef953a2ea4af17d122d0a3b3c.json | 44 +++ backend/ee-repo-ref.txt | 2 +- ..._audit_logs_s3_reanchor_on_enable.down.sql | 19 ++ ...51_audit_logs_s3_reanchor_on_enable.up.sql | 68 ++++ .../src/audit_logs_s3.rs | 9 +- .../src/audit_logs_s3_backfill.rs | 319 ++++++++++++++++++ backend/windmill-api-settings/src/lib.rs | 35 +- backend/windmill-api/openapi.yaml | 86 +++++ .../src/lib/components/instanceSettings.ts | 2 +- 10 files changed, 592 insertions(+), 7 deletions(-) create mode 100644 backend/.sqlx/query-3accb7e0eab75fcd34bf5b6d75e100e926bb678171947e71d6d87fdd0c3f9299.json create mode 100644 backend/.sqlx/query-fd023a9365388f1f74423416bd8790770c32716ef953a2ea4af17d122d0a3b3c.json create mode 100644 backend/migrations/20260626132251_audit_logs_s3_reanchor_on_enable.down.sql create mode 100644 backend/migrations/20260626132251_audit_logs_s3_reanchor_on_enable.up.sql create mode 100644 backend/windmill-api-settings/src/audit_logs_s3_backfill.rs diff --git a/backend/.sqlx/query-3accb7e0eab75fcd34bf5b6d75e100e926bb678171947e71d6d87fdd0c3f9299.json b/backend/.sqlx/query-3accb7e0eab75fcd34bf5b6d75e100e926bb678171947e71d6d87fdd0c3f9299.json new file mode 100644 index 0000000000000..54db7ff310da1 --- /dev/null +++ b/backend/.sqlx/query-3accb7e0eab75fcd34bf5b6d75e100e926bb678171947e71d6d87fdd0c3f9299.json @@ -0,0 +1,15 @@ +{ + "db_name": "PostgreSQL", + "query": "INSERT INTO background_task_state (name, value)\n SELECT $1, jsonb_build_object(\n 'last_xmin', txid_snapshot_xmin(txid_current_snapshot())::bigint,\n 'last_ts', now(),\n 'last_oldest_inflight_ts', COALESCE(\n CASE WHEN (current_setting('is_superuser') = 'on'\n OR pg_has_role(current_user, 'pg_read_all_stats', 'USAGE'))\n AND NOT EXISTS (SELECT 1 FROM pg_prepared_xacts)\n THEN (SELECT min(xact_start) FROM pg_stat_activity WHERE xact_start IS NOT NULL)\n ELSE NULL END,\n now() - interval '7 days'))\n WHERE NOT EXISTS (SELECT 1 FROM global_settings WHERE name = $2)\n ON CONFLICT (name) DO NOTHING", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "Text", + "Text" + ] + }, + "nullable": [] + }, + "hash": "3accb7e0eab75fcd34bf5b6d75e100e926bb678171947e71d6d87fdd0c3f9299" +} diff --git a/backend/.sqlx/query-fd023a9365388f1f74423416bd8790770c32716ef953a2ea4af17d122d0a3b3c.json b/backend/.sqlx/query-fd023a9365388f1f74423416bd8790770c32716ef953a2ea4af17d122d0a3b3c.json new file mode 100644 index 0000000000000..5ac11c6a5eeaf --- /dev/null +++ b/backend/.sqlx/query-fd023a9365388f1f74423416bd8790770c32716ef953a2ea4af17d122d0a3b3c.json @@ -0,0 +1,44 @@ +{ + "db_name": "PostgreSQL", + "query": "SELECT to_char(timestamp AT TIME ZONE 'UTC', 'YYYY-MM-DD') AS \"day!\",\n id AS \"id!\",\n timestamp AS \"ts!\",\n row_to_json(r)::text AS \"line!\"\n FROM (\n SELECT workspace_id, id, timestamp, username, operation,\n action_kind::text AS action_kind, resource, parameters, email, span\n FROM audit_partitioned\n WHERE timestamp >= $1 AND timestamp < $2\n AND (timestamp, id) > ($3, $4)\n ORDER BY timestamp, id\n LIMIT $5\n ) r\n ORDER BY timestamp, id", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "day!", + "type_info": "Text" + }, + { + "ordinal": 1, + "name": "id!", + "type_info": "Int8" + }, + { + "ordinal": 2, + "name": "ts!", + "type_info": "Timestamptz" + }, + { + "ordinal": 3, + "name": "line!", + "type_info": "Text" + } + ], + "parameters": { + "Left": [ + "Timestamptz", + "Timestamptz", + "Timestamptz", + "Int8", + "Int8" + ] + }, + "nullable": [ + null, + false, + false, + null + ] + }, + "hash": "fd023a9365388f1f74423416bd8790770c32716ef953a2ea4af17d122d0a3b3c" +} diff --git a/backend/ee-repo-ref.txt b/backend/ee-repo-ref.txt index 85d87fe2ca11b..ba5acebcc5d8b 100644 --- a/backend/ee-repo-ref.txt +++ b/backend/ee-repo-ref.txt @@ -1 +1 @@ -7b92c8e0de4cfc6d986499d60a5f79cd1c6b9d0b +7af96d247de927898b5b44bad3a000409c118fb8 \ No newline at end of file diff --git a/backend/migrations/20260626132251_audit_logs_s3_reanchor_on_enable.down.sql b/backend/migrations/20260626132251_audit_logs_s3_reanchor_on_enable.down.sql new file mode 100644 index 0000000000000..2c9e78659ea71 --- /dev/null +++ b/backend/migrations/20260626132251_audit_logs_s3_reanchor_on_enable.down.sql @@ -0,0 +1,19 @@ +-- Restore the previous anchor: epoch sentinel + preserve-cursor (DO NOTHING). +CREATE OR REPLACE FUNCTION audit_logs_s3_anchor_on_enable() +RETURNS TRIGGER AS $$ +BEGIN + IF NEW.value = to_jsonb(true) + AND (TG_OP = 'INSERT' OR OLD.value IS DISTINCT FROM NEW.value) THEN + INSERT INTO background_task_state (name, value) + VALUES ( + 'audit_logs_s3_export', + jsonb_build_object( + 'last_xmin', txid_snapshot_xmin(txid_current_snapshot())::bigint, + 'last_ts', '1970-01-01T00:00:00+00:00' + ) + ) + ON CONFLICT (name) DO NOTHING; + END IF; + RETURN NEW; +END; +$$ LANGUAGE plpgsql; diff --git a/backend/migrations/20260626132251_audit_logs_s3_reanchor_on_enable.up.sql b/backend/migrations/20260626132251_audit_logs_s3_reanchor_on_enable.up.sql new file mode 100644 index 0000000000000..749c0a9bdfd36 --- /dev/null +++ b/backend/migrations/20260626132251_audit_logs_s3_reanchor_on_enable.up.sql @@ -0,0 +1,68 @@ +-- Re-anchor the audit→object-store export cursor on every enable (including a +-- re-enable after a disable), and anchor a *recent* timestamp floor instead of +-- the epoch sentinel. +-- +-- The previous version wrote `last_ts = epoch` and used `ON CONFLICT DO NOTHING` +-- (preserve the old cursor). Two consequences hurt large instances: +-- * the epoch floor disables partition pruning, so the first export run +-- sequentially scans the entire `audit_partitioned` table — under a +-- `statement_timeout` (e.g. Aiven) it never completes, so the cursor never +-- advances and nothing is ever exported; +-- * preserving the cursor across a long disable means re-enabling tries to +-- backfill the whole disabled window by the same unindexable `age(xmin)` +-- scan, with the same outcome. +-- +-- Now the anchor records a recent floor (the oldest in-flight `xact_start` when +-- the stats privilege is available — a sound lower bound for any row whose xid +-- >= this snapshot xmin — else a bounded 7-day window), and `ON CONFLICT DO +-- UPDATE` advances the cursor to the current snapshot xmin on re-enable (never +-- backwards, so it stays HA-safe). Exporting the gap a disable left behind is +-- the job of the opt-in historical backfill, not this steady-state cursor. +-- +-- The task name literal must match +-- `windmill_common::global_settings::AUDIT_LOGS_S3_EXPORT_TASK`. + +CREATE OR REPLACE FUNCTION audit_logs_s3_anchor_on_enable() +RETURNS TRIGGER AS $$ +DECLARE + v_floor timestamptz; +BEGIN + IF NEW.value = to_jsonb(true) + AND (TG_OP = 'INSERT' OR OLD.value IS DISTINCT FROM NEW.value) THEN + v_floor := COALESCE( + CASE WHEN (current_setting('is_superuser') = 'on' + OR pg_has_role(current_user, 'pg_read_all_stats', 'USAGE')) + AND NOT EXISTS (SELECT 1 FROM pg_prepared_xacts) + THEN (SELECT min(xact_start) FROM pg_stat_activity WHERE xact_start IS NOT NULL) + ELSE NULL END, + now() - interval '7 days'); + INSERT INTO background_task_state (name, value) + VALUES ( + 'audit_logs_s3_export', + jsonb_build_object( + 'last_xmin', txid_snapshot_xmin(txid_current_snapshot())::bigint, + 'last_ts', now(), + 'last_oldest_inflight_ts', v_floor + ) + ) + ON CONFLICT (name) DO UPDATE + SET value = EXCLUDED.value + WHERE (background_task_state.value->>'last_xmin')::bigint + < (EXCLUDED.value->>'last_xmin')::bigint; + END IF; + RETURN NEW; +END; +$$ LANGUAGE plpgsql; + +-- One-time fixup for an instance upgrading while a legacy epoch sentinel cursor +-- is still in place (enabled on the old version but never drained, e.g. object +-- store down): stamp a recent floor so the next run prunes to recent partitions +-- instead of scanning all history. The xid cursor is left untouched (the drain +-- continues); only rows older than the new floor — possible solely via a +-- transaction open longer than the 7-day window — fall outside it. +UPDATE background_task_state +SET value = value || jsonb_build_object( + 'last_ts', to_jsonb(now()), + 'last_oldest_inflight_ts', to_jsonb(now() - interval '7 days')) +WHERE name = 'audit_logs_s3_export' + AND (value->>'last_ts')::timestamptz <= 'epoch'::timestamptz; diff --git a/backend/windmill-api-settings/src/audit_logs_s3.rs b/backend/windmill-api-settings/src/audit_logs_s3.rs index 9fa4574a1e6ea..d7d749cb584a6 100644 --- a/backend/windmill-api-settings/src/audit_logs_s3.rs +++ b/backend/windmill-api-settings/src/audit_logs_s3.rs @@ -20,9 +20,11 @@ use windmill_common::DB; pub struct AuditLogsS3ExportStatus { /// xid cursor: rows of transactions below this have been exported. pub last_xmin: i64, - /// Partition-pruning floor (the epoch sentinel while still bootstrapping). + /// Partition-pruning floor: the latest audit-row timestamp the cursor has + /// reached (also the read side's 7-day-fallback anchor). pub last_ts: Option>, - /// True until the initial post-enable backlog has been fully drained. + /// True while the exporter is draining a backlog — the last run was capped + /// at `MAX_XID_INTERVAL` xids and has not yet caught up to the live snapshot. pub bootstrapping: bool, /// The latest audit-row timestamp actually written to object storage so /// far (monotonic) — the "how current is the mirror" figure. @@ -50,8 +52,7 @@ pub async fn get_status(db: &DB) -> error::Result::from_timestamp(0, 0).unwrap(); - let bootstrapping = last_ts.map(|t| t <= epoch).unwrap_or(true); + let bootstrapping = v.get("draining").and_then(|x| x.as_bool()).unwrap_or(false); Ok(Some(AuditLogsS3ExportStatus { last_xmin: v.get("last_xmin").and_then(|x| x.as_i64()).unwrap_or(0), last_ts, diff --git a/backend/windmill-api-settings/src/audit_logs_s3_backfill.rs b/backend/windmill-api-settings/src/audit_logs_s3_backfill.rs new file mode 100644 index 0000000000000..b4051bce6c1a7 --- /dev/null +++ b/backend/windmill-api-settings/src/audit_logs_s3_backfill.rs @@ -0,0 +1,319 @@ +#![cfg(feature = "parquet")] +//! Opt-in historical backfill of audit logs to the instance object store. +//! +//! The steady-state exporter (the EE `export_audit_logs_to_object_store`) cursors +//! on transaction xmin and, by design, only exports rows committed *after* the +//! feature was enabled — it never rescans history (an `age(xmin)` predicate is +//! unindexable, so scanning the whole partitioned table can't survive a +//! `statement_timeout`). This module covers the complementary need: exporting a +//! chosen historical `[from, to)` window (e.g. the gap left while the export was +//! disabled) on demand. +//! +//! It is safe to run on a large table because it scans strictly by `timestamp` +//! (the partition key — pruned and indexed) in bounded keyset pages, so every +//! query touches at most one page worth of rows and survives a statement timeout. +//! It does not touch the xmin cursor / checkpoint at all. +//! +//! Objects are written next to the steady-state ones under `logs/audit/dt=/` +//! as `audit_backfill_.ndjson`, with the exact same row shape, so a +//! consumer reads them uniformly. Re-running the same window is deterministic +//! (audit history is append-only), so it overwrites the same objects rather than +//! duplicating. A window that overlaps already-exported steady-state rows simply +//! re-emits them under a different key; consumers dedupe by `id`. +//! +//! Progress is persisted in `background_task_state` (name [`TASK_NAME`]) so any +//! API replica can serve the status endpoint, mirroring `log_cleanup`. + +use std::sync::Arc; + +use chrono::{DateTime, Utc}; +use serde::{Deserialize, Serialize}; +use tokio::sync::RwLock; + +use crate::background_task; +use windmill_common::error::{self}; +use windmill_common::tracing_init::LOGS_AUDIT; +use windmill_common::{DB, INSTANCE_NAME}; + +use windmill_object_store::object_store_reexports::{ObjectStore, Path as ObjectPath}; + +pub const TASK_NAME: &str = "audit_logs_s3_backfill"; + +/// Rows fetched per keyset page. Bounds each query so it stays well under any +/// `statement_timeout` even on a busy partition, and bounds peak memory (one +/// page of ndjson is buffered before the day-grouped PUTs). +const PAGE_ROWS: i64 = 10_000; + +#[derive(Clone, Serialize, Deserialize)] +pub struct AuditBackfillProgress { + pub running: bool, + pub started_at: DateTime, + pub finished_at: Option>, + /// Human-readable description of the current phase. + pub phase: String, + /// Inclusive lower / exclusive upper bound of the window being exported. + pub from: DateTime, + pub to: DateTime, + /// Audit rows written to object storage so far. + pub rows_written: u64, + /// Object PUTs issued so far (one per day per page). + pub objects_written: u64, + /// Keyset cursor: the timestamp of the last row exported (how far the + /// backfill has progressed through the window). + pub last_ts: Option>, + pub errors: u64, + pub last_error: Option, +} + +impl AuditBackfillProgress { + fn new_running(from: DateTime, to: DateTime) -> Self { + Self { + running: true, + started_at: Utc::now(), + finished_at: None, + phase: "starting".to_string(), + from, + to, + rows_written: 0, + objects_written: 0, + last_ts: None, + errors: 0, + last_error: None, + } + } +} + +struct Session { + db: DB, + owner: String, + progress: RwLock, +} + +impl Session { + async fn update(&self, f: F) { + let snapshot = { + let mut p = self.progress.write().await; + f(&mut p); + p.clone() + }; + if let Err(e) = + background_task::update_state(&self.db, TASK_NAME, &self.owner, &snapshot).await + { + tracing::warn!("audit backfill: failed to persist progress: {e:#}"); + } + } + + async fn record_error(&self, msg: String) { + tracing::error!("audit backfill: {msg}"); + self.update(|p| { + p.errors = p.errors.saturating_add(1); + p.last_error = Some(msg); + }) + .await; + } + + async fn release(&self) { + let snapshot = { + let mut p = self.progress.write().await; + p.running = false; + p.finished_at = Some(Utc::now()); + p.phase = "done".to_string(); + p.clone() + }; + tracing::info!( + "audit backfill finished: {} row(s) in {} object(s) for [{}, {}), {} error(s)", + snapshot.rows_written, + snapshot.objects_written, + snapshot.from, + snapshot.to, + snapshot.errors + ); + if let Err(e) = background_task::release(&self.db, TASK_NAME, &self.owner, &snapshot).await + { + tracing::warn!("audit backfill: failed to release lease: {e:#}"); + } + } +} + +#[derive(Deserialize)] +pub struct BackfillRequest { + pub from: DateTime, + pub to: DateTime, +} + +/// Atomically claim the backfill lease, or error if one is already running. +pub async fn try_start(db: &DB, from: DateTime, to: DateTime) -> error::Result<()> { + if from >= to { + return Err(error::Error::BadRequest( + "audit backfill: `from` must be strictly before `to`".to_string(), + )); + } + let claimed = background_task::try_claim( + db, + TASK_NAME, + &*INSTANCE_NAME, + &AuditBackfillProgress::new_running(from, to), + ) + .await?; + if !claimed { + return Err(error::Error::BadRequest( + "An audit log backfill is already running".to_string(), + )); + } + Ok(()) +} + +/// Fetch the current backfill status. Any API server can call this. +pub async fn get_status(db: &DB) -> error::Result> { + let Some(r) = background_task::get(db, TASK_NAME).await? else { + return Ok(None); + }; + match serde_json::from_value::(r.value) { + Ok(mut p) => { + // get() collapses `running` to false when the heartbeat is stale. + p.running = r.running; + Ok(Some(p)) + } + Err(e) => Err(error::Error::internal_err(format!( + "deserialize audit backfill progress: {e:#}" + ))), + } +} + +pub fn spawn_backfill(db: DB, from: DateTime, to: DateTime) { + use futures::FutureExt; + use std::panic::AssertUnwindSafe; + + tokio::spawn(async move { + let session = Arc::new(Session { + db: db.clone(), + owner: INSTANCE_NAME.clone(), + progress: RwLock::new(AuditBackfillProgress::new_running(from, to)), + }); + + let s = session.clone(); + let task = async move { + let store = match windmill_object_store::get_object_store().await { + Some(st) => st, + None => { + s.record_error("Object storage is not configured".to_string()) + .await; + return; + } + }; + if let Err(e) = run_backfill(&s, &db, &store, from, to).await { + s.record_error(format!("backfill failed: {e:#}")).await; + } + }; + + // catch_unwind so a panic can't leave the lease held forever. + if let Err(panic) = AssertUnwindSafe(task).catch_unwind().await { + let msg = panic + .downcast_ref::<&str>() + .map(|s| s.to_string()) + .or_else(|| panic.downcast_ref::().cloned()) + .unwrap_or_else(|| "unknown panic".to_string()); + session + .record_error(format!("backfill task panicked: {msg}")) + .await; + } + + session.release().await; + }); +} + +/// Export `[from, to)` in keyset pages ordered by `(timestamp, id)`. Each page is +/// a bounded, partition-pruned scan; rows are grouped by UTC day and written one +/// object per day per page. +async fn run_backfill( + session: &Session, + db: &DB, + store: &Arc, + from: DateTime, + to: DateTime, +) -> error::Result<()> { + session.update(|p| p.phase = "exporting".to_string()).await; + + // Keyset cursor over (timestamp, id). `id` starts below any real value so the + // first page includes rows at exactly `from`. + let mut cursor_ts = from; + let mut cursor_id: i64 = -1; + + loop { + let rows = sqlx::query!( + r#"SELECT to_char(timestamp AT TIME ZONE 'UTC', 'YYYY-MM-DD') AS "day!", + id AS "id!", + timestamp AS "ts!", + row_to_json(r)::text AS "line!" + FROM ( + SELECT workspace_id, id, timestamp, username, operation, + action_kind::text AS action_kind, resource, parameters, email, span + FROM audit_partitioned + WHERE timestamp >= $1 AND timestamp < $2 + AND (timestamp, id) > ($3, $4) + ORDER BY timestamp, id + LIMIT $5 + ) r + ORDER BY timestamp, id"#, + from, + to, + cursor_ts, + cursor_id, + PAGE_ROWS + ) + .fetch_all(db) + .await?; + + if rows.is_empty() { + break; + } + + // Group this page's ndjson lines by day, preserving (timestamp, id) order, + // and track the min id per day for a deterministic, collision-free key. + let mut by_day: Vec<(String, i64, String)> = Vec::new(); // (day, min_id, ndjson) + for row in &rows { + match by_day.last_mut() { + Some((day, _min_id, acc)) if *day == row.day => { + acc.push('\n'); + acc.push_str(&row.line); + } + _ => by_day.push((row.day.clone(), row.id, row.line.clone())), + } + } + + let mut page_rows: u64 = 0; + let mut page_objects: u64 = 0; + for (day, min_id, ndjson) in &by_day { + let object_path = ObjectPath::from(format!( + "{LOGS_AUDIT}dt={day}/audit_backfill_{min_id}.ndjson" + )); + store + .put(&object_path, ndjson.clone().into_bytes().into()) + .await + .map_err(|e| error::Error::internal_err(format!("upload {object_path}: {e:#}")))?; + page_rows += ndjson.lines().count() as u64; + page_objects += 1; + } + + // Advance the keyset cursor past the last row of this page. + let last = rows.last().expect("page is non-empty"); + cursor_ts = last.ts; + cursor_id = last.id; + + let new_last_ts = last.ts; + session + .update(|p| { + p.rows_written = p.rows_written.saturating_add(page_rows); + p.objects_written = p.objects_written.saturating_add(page_objects); + p.last_ts = Some(new_last_ts); + }) + .await; + + // A short page means the window is exhausted. + if (rows.len() as i64) < PAGE_ROWS { + break; + } + } + + Ok(()) +} diff --git a/backend/windmill-api-settings/src/lib.rs b/backend/windmill-api-settings/src/lib.rs index 3c179168f5a7a..a7e1322ad91f2 100644 --- a/backend/windmill-api-settings/src/lib.rs +++ b/backend/windmill-api-settings/src/lib.rs @@ -14,6 +14,8 @@ use std::{ #[cfg(feature = "parquet")] mod audit_logs_s3; #[cfg(feature = "parquet")] +mod audit_logs_s3_backfill; +#[cfg(feature = "parquet")] mod background_task; #[cfg(feature = "private")] mod ee; @@ -204,7 +206,12 @@ pub fn global_service() -> Router { ) .route("/run_log_cleanup", post(run_log_cleanup)) .route("/log_cleanup_status", get(log_cleanup_status)) - .route("/audit_logs_s3_status", get(audit_logs_s3_status)); + .route("/audit_logs_s3_status", get(audit_logs_s3_status)) + .route("/audit_logs_s3_backfill", post(run_audit_logs_s3_backfill)) + .route( + "/audit_logs_s3_backfill_status", + get(audit_logs_s3_backfill_status), + ); } #[cfg(not(feature = "parquet"))] @@ -601,6 +608,32 @@ async fn audit_logs_s3_status( Ok(Json(audit_logs_s3::get_status(&db).await?)) } +#[cfg(feature = "parquet")] +async fn run_audit_logs_s3_backfill( + Extension(db): Extension, + authed: ApiAuthed, + Json(req): Json, +) -> error::Result { + require_super_admin(&db, &authed.email).await?; + if !matches!(get_license_plan().await, LicensePlan::Enterprise) { + return Err(error::Error::BadRequest( + "Audit log export to object storage is an Enterprise feature".to_string(), + )); + } + audit_logs_s3_backfill::try_start(&db, req.from, req.to).await?; + audit_logs_s3_backfill::spawn_backfill(db.clone(), req.from, req.to); + Ok(axum::http::StatusCode::ACCEPTED) +} + +#[cfg(feature = "parquet")] +async fn audit_logs_s3_backfill_status( + Extension(db): Extension, + authed: ApiAuthed, +) -> error::JsonResult> { + require_super_admin(&db, &authed.email).await?; + Ok(Json(audit_logs_s3_backfill::get_status(&db).await?)) +} + #[derive(Deserialize)] pub struct TestKey { pub license_key: String, diff --git a/backend/windmill-api/openapi.yaml b/backend/windmill-api/openapi.yaml index 0bbce245d33f8..7b7f4c4388ecf 100644 --- a/backend/windmill-api/openapi.yaml +++ b/backend/windmill-api/openapi.yaml @@ -1812,6 +1812,92 @@ paths: - last_run_exported - updated_at + /settings/audit_logs_s3_backfill: + post: + summary: start an opt-in historical backfill of audit logs to object storage + operationId: runAuditLogsS3Backfill + tags: + - setting + requestBody: + required: true + content: + application/json: + schema: + type: object + properties: + from: + type: string + format: date-time + description: inclusive lower bound of the window to export + to: + type: string + format: date-time + description: exclusive upper bound of the window to export + required: + - from + - to + responses: + "202": + description: backfill started + + /settings/audit_logs_s3_backfill_status: + get: + summary: get status of the audit-log object-store historical backfill + operationId: getAuditLogsS3BackfillStatus + tags: + - setting + responses: + "200": + description: current backfill status (null if never run) + content: + application/json: + schema: + nullable: true + type: object + properties: + running: + type: boolean + started_at: + type: string + format: date-time + finished_at: + type: string + format: date-time + nullable: true + phase: + type: string + from: + type: string + format: date-time + to: + type: string + format: date-time + rows_written: + type: integer + format: int64 + objects_written: + type: integer + format: int64 + last_ts: + type: string + format: date-time + nullable: true + errors: + type: integer + format: int64 + last_error: + type: string + nullable: true + required: + - running + - started_at + - phase + - from + - to + - rows_written + - objects_written + - errors + /settings/send_stats: post: summary: send stats diff --git a/frontend/src/lib/components/instanceSettings.ts b/frontend/src/lib/components/instanceSettings.ts index 72ccbeca2bee7..278863c81df75 100644 --- a/frontend/src/lib/components/instanceSettings.ts +++ b/frontend/src/lib/components/instanceSettings.ts @@ -443,7 +443,7 @@ export const settings: Record = { { label: 'Store audit logs in object storage', description: - 'When enabled and instance object storage is configured, audit logs are also exported as newline-delimited JSON to the dedicated logs/audit/ folder (partitioned by day). Export is incremental and runs off the hot path. Pre-existing history is not backfilled: export starts from when the setting is enabled (transactions in flight at that moment may include a bounded set of just-prior rows). No audit log committed after enabling is ever skipped.', + 'When enabled and instance object storage is configured, audit logs are also exported as newline-delimited JSON to the dedicated logs/audit/ folder (partitioned by day). Export is incremental and runs off the hot path. Pre-existing history is not backfilled: enabling (or re-enabling) starts the export from ~now (transactions in flight at that moment may include a bounded set of just-prior rows), and no audit log committed after enabling is ever skipped. To export a historical window (e.g. a gap left while the export was disabled), use the opt-in backfill API: POST /settings/audit_logs_s3_backfill {from, to} (status at GET /settings/audit_logs_s3_backfill_status).', key: 'store_audit_logs_s3', fieldType: 'boolean', storage: 'setting', From e0fcd0d5a92f4ceb54f811527745ca41851051b1 Mon Sep 17 00:00:00 2001 From: Ruben Fiszel Date: Fri, 26 Jun 2026 16:55:49 +0000 Subject: [PATCH 02/10] fix(audit): heartbeat backfill lease per object; bump EE ref Address review (cubic): persist progress (refreshing the lease heartbeat) after every object PUT in the backfill page loop, not only once per page, so the gap between heartbeats stays well under STALE_HEARTBEAT_SECS even on slow uploads and another replica can't re-claim mid-page and run a concurrent backfill. Bumps ee-repo-ref.txt to pull in the EE test-race fix (folding the backlog-drain regression into the single audit e2e test). Co-Authored-By: Claude Opus 4.8 (1M context) --- backend/ee-repo-ref.txt | 2 +- .../src/audit_logs_s3_backfill.rs | 24 ++++++++++--------- 2 files changed, 14 insertions(+), 12 deletions(-) diff --git a/backend/ee-repo-ref.txt b/backend/ee-repo-ref.txt index ba5acebcc5d8b..2555cb615888a 100644 --- a/backend/ee-repo-ref.txt +++ b/backend/ee-repo-ref.txt @@ -1 +1 @@ -7af96d247de927898b5b44bad3a000409c118fb8 \ No newline at end of file +23a40676ecc9d7124b0aa733de027bf1e3e2103e \ No newline at end of file diff --git a/backend/windmill-api-settings/src/audit_logs_s3_backfill.rs b/backend/windmill-api-settings/src/audit_logs_s3_backfill.rs index b4051bce6c1a7..5620e21803348 100644 --- a/backend/windmill-api-settings/src/audit_logs_s3_backfill.rs +++ b/backend/windmill-api-settings/src/audit_logs_s3_backfill.rs @@ -281,8 +281,6 @@ async fn run_backfill( } } - let mut page_rows: u64 = 0; - let mut page_objects: u64 = 0; for (day, min_id, ndjson) in &by_day { let object_path = ObjectPath::from(format!( "{LOGS_AUDIT}dt={day}/audit_backfill_{min_id}.ndjson" @@ -291,8 +289,18 @@ async fn run_backfill( .put(&object_path, ndjson.clone().into_bytes().into()) .await .map_err(|e| error::Error::internal_err(format!("upload {object_path}: {e:#}")))?; - page_rows += ndjson.lines().count() as u64; - page_objects += 1; + let n = ndjson.lines().count() as u64; + // Persist progress (and refresh the lease heartbeat) after every object, + // not just once the page completes: a stale heartbeat lets another replica + // re-claim the lease and run a concurrent backfill, so the gap between + // heartbeats must stay well under STALE_HEARTBEAT_SECS even if a page's + // uploads are slow. + session + .update(|p| { + p.rows_written = p.rows_written.saturating_add(n); + p.objects_written = p.objects_written.saturating_add(1); + }) + .await; } // Advance the keyset cursor past the last row of this page. @@ -301,13 +309,7 @@ async fn run_backfill( cursor_id = last.id; let new_last_ts = last.ts; - session - .update(|p| { - p.rows_written = p.rows_written.saturating_add(page_rows); - p.objects_written = p.objects_written.saturating_add(page_objects); - p.last_ts = Some(new_last_ts); - }) - .await; + session.update(|p| p.last_ts = Some(new_last_ts)).await; // A short page means the window is exhausted. if (rows.len() as i64) < PAGE_ROWS { From e8ba4473f1dd2d9ba18c19b936f48f805258c62b Mon Sep 17 00:00:00 2001 From: Ruben Fiszel Date: Fri, 26 Jun 2026 17:14:55 +0000 Subject: [PATCH 03/10] fix(audit): reject unstable backfill windows; bump EE ref MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Address review (P1): the backfill keyset-pages over rows visible at scan time and declares completion when the scan runs dry, but a row's `timestamp` is its inserting transaction's `xact_start`. A window whose upper bound is recent or in the future could silently omit a transaction that started inside `[from, to)` but commits after the scan passed that timestamp. `try_start` now rejects any `to` newer than the oldest in-flight `xact_start` (everything strictly older than the oldest running transaction is committed and stable), using the same trustworthy stats gating as the exporter's floor (restricted role / 2PC → a 7-day-old cutoff). Bumps ee-repo-ref.txt for the EE monotonic-checkpoint fix. Co-Authored-By: Claude Opus 4.8 (1M context) --- ...b64b33c722afde7457912e8abd5743de8829.json} | 7 +++-- ...f172c89da5b216d3530c2743319ab1f4ca853.json | 20 +++++++++++++ backend/ee-repo-ref.txt | 2 +- .../src/audit_logs_s3_backfill.rs | 28 +++++++++++++++++++ 4 files changed, 53 insertions(+), 4 deletions(-) rename backend/.sqlx/{query-8711bb7861cb3c453519a620057e1530039c09b824065027387bbb667a49fe8d.json => query-881d996af5aaa1ec01693e473519b64b33c722afde7457912e8abd5743de8829.json} (68%) create mode 100644 backend/.sqlx/query-a54f686b1bfb16e4e1da2bc143ef172c89da5b216d3530c2743319ab1f4ca853.json diff --git a/backend/.sqlx/query-8711bb7861cb3c453519a620057e1530039c09b824065027387bbb667a49fe8d.json b/backend/.sqlx/query-881d996af5aaa1ec01693e473519b64b33c722afde7457912e8abd5743de8829.json similarity index 68% rename from backend/.sqlx/query-8711bb7861cb3c453519a620057e1530039c09b824065027387bbb667a49fe8d.json rename to backend/.sqlx/query-881d996af5aaa1ec01693e473519b64b33c722afde7457912e8abd5743de8829.json index 56b887e8b9361..f8f8511709d6d 100644 --- a/backend/.sqlx/query-8711bb7861cb3c453519a620057e1530039c09b824065027387bbb667a49fe8d.json +++ b/backend/.sqlx/query-881d996af5aaa1ec01693e473519b64b33c722afde7457912e8abd5743de8829.json @@ -1,16 +1,17 @@ { "db_name": "PostgreSQL", - "query": "INSERT INTO background_task_state\n (name, value, running, owner, started_at, finished_at, updated_at)\n VALUES ($1, $2, false, $3, now(), now(), now())\n ON CONFLICT (name) DO UPDATE SET\n value = $2, running = false, owner = $3,\n finished_at = now(), updated_at = now()", + "query": "INSERT INTO background_task_state\n (name, value, running, owner, started_at, finished_at, updated_at)\n VALUES ($1, $2, false, $3, now(), now(), now())\n ON CONFLICT (name) DO UPDATE SET\n value = $2, running = false, owner = $3,\n finished_at = now(), updated_at = now()\n WHERE (background_task_state.value->>'last_xmin')::bigint <= $4", "describe": { "columns": [], "parameters": { "Left": [ "Text", "Jsonb", - "Text" + "Text", + "Int8" ] }, "nullable": [] }, - "hash": "8711bb7861cb3c453519a620057e1530039c09b824065027387bbb667a49fe8d" + "hash": "881d996af5aaa1ec01693e473519b64b33c722afde7457912e8abd5743de8829" } diff --git a/backend/.sqlx/query-a54f686b1bfb16e4e1da2bc143ef172c89da5b216d3530c2743319ab1f4ca853.json b/backend/.sqlx/query-a54f686b1bfb16e4e1da2bc143ef172c89da5b216d3530c2743319ab1f4ca853.json new file mode 100644 index 0000000000000..4c0d0ae14f71f --- /dev/null +++ b/backend/.sqlx/query-a54f686b1bfb16e4e1da2bc143ef172c89da5b216d3530c2743319ab1f4ca853.json @@ -0,0 +1,20 @@ +{ + "db_name": "PostgreSQL", + "query": "SELECT CASE WHEN (current_setting('is_superuser') = 'on'\n OR pg_has_role(current_user, 'pg_read_all_stats', 'USAGE'))\n AND NOT EXISTS (SELECT 1 FROM pg_prepared_xacts)\n THEN (SELECT min(xact_start) FROM pg_stat_activity WHERE xact_start IS NOT NULL)\n ELSE NULL END AS \"cutoff?\"", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "cutoff?", + "type_info": "Timestamptz" + } + ], + "parameters": { + "Left": [] + }, + "nullable": [ + null + ] + }, + "hash": "a54f686b1bfb16e4e1da2bc143ef172c89da5b216d3530c2743319ab1f4ca853" +} diff --git a/backend/ee-repo-ref.txt b/backend/ee-repo-ref.txt index 2555cb615888a..d8f55f59bc4ca 100644 --- a/backend/ee-repo-ref.txt +++ b/backend/ee-repo-ref.txt @@ -1 +1 @@ -23a40676ecc9d7124b0aa733de027bf1e3e2103e \ No newline at end of file +6b191b77aabcf77658ad4f9031576e0d7b66bf89 \ No newline at end of file diff --git a/backend/windmill-api-settings/src/audit_logs_s3_backfill.rs b/backend/windmill-api-settings/src/audit_logs_s3_backfill.rs index 5620e21803348..a4b60436d7640 100644 --- a/backend/windmill-api-settings/src/audit_logs_s3_backfill.rs +++ b/backend/windmill-api-settings/src/audit_logs_s3_backfill.rs @@ -148,6 +148,34 @@ pub async fn try_start(db: &DB, from: DateTime, to: DateTime) -> error "audit backfill: `from` must be strictly before `to`".to_string(), )); } + // The backfill keyset-pages by `(timestamp, id)` over rows visible at scan time and + // declares completion once the scan runs dry. But a row's `timestamp` is its + // inserting transaction's `xact_start`, so a transaction that started inside + // `[from, to)` yet commits after the scan has passed that timestamp — or any row + // committed when `to` is in the future — would be silently omitted from a + // "completed" backfill. Require `to` to be at or before the oldest in-flight + // `xact_start`: everything strictly older than the oldest running transaction is + // already committed and stable. Same trustworthy gating as the exporter's floor + // (a restricted stats role or a prepared 2PC txn makes the value unsafe); fall back + // to a 7-day-old cutoff then. + let settled_cutoff: Option> = sqlx::query_scalar!( + r#"SELECT CASE WHEN (current_setting('is_superuser') = 'on' + OR pg_has_role(current_user, 'pg_read_all_stats', 'USAGE')) + AND NOT EXISTS (SELECT 1 FROM pg_prepared_xacts) + THEN (SELECT min(xact_start) FROM pg_stat_activity WHERE xact_start IS NOT NULL) + ELSE NULL END AS "cutoff?""# + ) + .fetch_one(db) + .await?; + let settled_cutoff = settled_cutoff.unwrap_or_else(|| Utc::now() - chrono::Duration::days(7)); + if to > settled_cutoff { + return Err(error::Error::BadRequest(format!( + "audit backfill: `to` ({to}) must be at or before {settled_cutoff}, the point up to \ + which audit rows are guaranteed settled; choose an earlier upper bound. (Granting \ + pg_read_all_stats to the windmill DB user tightens this cutoff from a 7-day margin to \ + the live oldest-transaction boundary.)" + ))); + } let claimed = background_task::try_claim( db, TASK_NAME, From 3c6f556da33ebf305ca82f3f4488d95eae96ae6c Mon Sep 17 00:00:00 2001 From: Ruben Fiszel Date: Fri, 26 Jun 2026 17:24:48 +0000 Subject: [PATCH 04/10] fix(audit): re-anchor legacy epoch checkpoints instead of synthetic floor MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Address review (P1): the legacy-checkpoint fixup stamped last_oldest_inflight_ts to now()-7d while leaving the old last_xmin in place. On an instance that enabled export on the old code >7 days ago and got stuck before the first successful batch, the next run would filter post-enable rows older than 7 days out via `timestamp >= ts_floor` while still advancing last_xmin over the interval — silently dropping them (the same floor-vs-cursor loss class fixed elsewhere in this PR), and contradicting the "nothing committed after enabling is skipped" guarantee. A stuck epoch-sentinel checkpoint cannot be safely resumed (its backlog can be arbitrarily old, so any recent floor prunes rows the cursor then skips, and an epoch floor reintroduces the full scan). Re-anchor it to the migration's current snapshot xmin instead — exactly like a fresh enable — so the export resumes cleanly from ~now and the never-exported pre-upgrade window is recovered via the opt-in backfill rather than silently dropped. Reword the setting description so it no longer implies the disabled/legacy window is covered by the cursor. Co-Authored-By: Claude Opus 4.8 (1M context) --- ...51_audit_logs_s3_reanchor_on_enable.up.sql | 27 +++++++++++++------ .../src/lib/components/instanceSettings.ts | 2 +- 2 files changed, 20 insertions(+), 9 deletions(-) diff --git a/backend/migrations/20260626132251_audit_logs_s3_reanchor_on_enable.up.sql b/backend/migrations/20260626132251_audit_logs_s3_reanchor_on_enable.up.sql index 749c0a9bdfd36..175c0d275900e 100644 --- a/backend/migrations/20260626132251_audit_logs_s3_reanchor_on_enable.up.sql +++ b/backend/migrations/20260626132251_audit_logs_s3_reanchor_on_enable.up.sql @@ -54,15 +54,26 @@ BEGIN END; $$ LANGUAGE plpgsql; --- One-time fixup for an instance upgrading while a legacy epoch sentinel cursor --- is still in place (enabled on the old version but never drained, e.g. object --- store down): stamp a recent floor so the next run prunes to recent partitions --- instead of scanning all history. The xid cursor is left untouched (the drain --- continues); only rows older than the new floor — possible solely via a --- transaction open longer than the 7-day window — fall outside it. +-- One-time recovery for an instance upgrading while a legacy epoch-sentinel +-- checkpoint is still in place (enabled on the old code but never drained — e.g. +-- the very full-table scan this migration removes left it stuck). Such a +-- checkpoint cannot be safely resumed: its un-drained backlog can be arbitrarily +-- old, so stamping a recent floor over the old xmin would prune the older rows +-- while the cursor advanced past them (silent loss), and keeping the epoch floor +-- would reintroduce the full scan. Instead re-anchor it to now, exactly like a +-- fresh enable — the export resumes cleanly from ~now, and the pre-upgrade window +-- (which was never successfully exported) is recovered with the opt-in historical +-- backfill rather than silently dropped. UPDATE background_task_state -SET value = value || jsonb_build_object( +SET value = jsonb_build_object( + 'last_xmin', txid_snapshot_xmin(txid_current_snapshot())::bigint, 'last_ts', to_jsonb(now()), - 'last_oldest_inflight_ts', to_jsonb(now() - interval '7 days')) + 'last_oldest_inflight_ts', to_jsonb(COALESCE( + CASE WHEN (current_setting('is_superuser') = 'on' + OR pg_has_role(current_user, 'pg_read_all_stats', 'USAGE')) + AND NOT EXISTS (SELECT 1 FROM pg_prepared_xacts) + THEN (SELECT min(xact_start) FROM pg_stat_activity WHERE xact_start IS NOT NULL) + ELSE NULL END, + now() - interval '7 days'))) WHERE name = 'audit_logs_s3_export' AND (value->>'last_ts')::timestamptz <= 'epoch'::timestamptz; diff --git a/frontend/src/lib/components/instanceSettings.ts b/frontend/src/lib/components/instanceSettings.ts index 278863c81df75..6359d82b87423 100644 --- a/frontend/src/lib/components/instanceSettings.ts +++ b/frontend/src/lib/components/instanceSettings.ts @@ -443,7 +443,7 @@ export const settings: Record = { { label: 'Store audit logs in object storage', description: - 'When enabled and instance object storage is configured, audit logs are also exported as newline-delimited JSON to the dedicated logs/audit/ folder (partitioned by day). Export is incremental and runs off the hot path. Pre-existing history is not backfilled: enabling (or re-enabling) starts the export from ~now (transactions in flight at that moment may include a bounded set of just-prior rows), and no audit log committed after enabling is ever skipped. To export a historical window (e.g. a gap left while the export was disabled), use the opt-in backfill API: POST /settings/audit_logs_s3_backfill {from, to} (status at GET /settings/audit_logs_s3_backfill_status).', + 'When enabled and instance object storage is configured, audit logs are also exported as newline-delimited JSON to the dedicated logs/audit/ folder (partitioned by day). Export is incremental and runs off the hot path. Enabling (or re-enabling) anchors the export at ~now: while it stays enabled, every audit log committed from that point on is exported (transactions in flight at the moment of enabling may include a bounded set of just-prior rows). Pre-existing history, and any window during which export was disabled, are NOT exported by this cursor — use the opt-in backfill API to export a chosen historical range: POST /settings/audit_logs_s3_backfill {from, to} (status at GET /settings/audit_logs_s3_backfill_status).', key: 'store_audit_logs_s3', fieldType: 'boolean', storage: 'setting', From 2dddb7cedc157c723ab43dc023bd7f97ba059923 Mon Sep 17 00:00:00 2001 From: Ruben Fiszel Date: Fri, 26 Jun 2026 17:33:10 +0000 Subject: [PATCH 05/10] test(audit): end-to-end integration tests for the object-store backfill The backfill previously had only SQL-level/EXPLAIN validation. Add real integration tests (in-memory object store, sqlx::test) exercising the public path: - backfill_exports_window_in_pages: with the page size forced to 2 rows, a settled 3-day window is exported across multiple keyset pages; asserts every in-window row lands exactly once, rows outside [from,to) are excluded, a day that straddles a page boundary yields more than one object, progress counts match, and a re-run is idempotent (deterministic keys overwritten, no dupes). - backfill_rejects_unstable_window: a future/live `to` is rejected as unstable, a window safely in the past is accepted. Adds a test-only PAGE_ROWS override so multi-page behaviour is exercised with a handful of rows. Co-Authored-By: Claude Opus 4.8 (1M context) --- .../src/audit_logs_s3_backfill.rs | 204 +++++++++++++++++- 1 file changed, 202 insertions(+), 2 deletions(-) diff --git a/backend/windmill-api-settings/src/audit_logs_s3_backfill.rs b/backend/windmill-api-settings/src/audit_logs_s3_backfill.rs index a4b60436d7640..8f3878e2b8059 100644 --- a/backend/windmill-api-settings/src/audit_logs_s3_backfill.rs +++ b/backend/windmill-api-settings/src/audit_logs_s3_backfill.rs @@ -44,6 +44,25 @@ pub const TASK_NAME: &str = "audit_logs_s3_backfill"; /// page of ndjson is buffered before the day-grouped PUTs). const PAGE_ROWS: i64 = 10_000; +/// Test-only override for [`PAGE_ROWS`] (0 = use the default), so a test can force +/// multi-page / page-spanning-day keyset behaviour with only a handful of rows. +#[cfg(test)] +static PAGE_ROWS_OVERRIDE: std::sync::atomic::AtomicI64 = std::sync::atomic::AtomicI64::new(0); + +fn page_rows() -> i64 { + #[cfg(test)] + { + match PAGE_ROWS_OVERRIDE.load(std::sync::atomic::Ordering::Relaxed) { + 0 => PAGE_ROWS, + n => n, + } + } + #[cfg(not(test))] + { + PAGE_ROWS + } +} + #[derive(Clone, Serialize, Deserialize)] pub struct AuditBackfillProgress { pub running: bool, @@ -266,6 +285,7 @@ async fn run_backfill( // first page includes rows at exactly `from`. let mut cursor_ts = from; let mut cursor_id: i64 = -1; + let page_rows = page_rows(); loop { let rows = sqlx::query!( @@ -287,7 +307,7 @@ async fn run_backfill( to, cursor_ts, cursor_id, - PAGE_ROWS + page_rows ) .fetch_all(db) .await?; @@ -340,10 +360,190 @@ async fn run_backfill( session.update(|p| p.last_ts = Some(new_last_ts)).await; // A short page means the window is exhausted. - if (rows.len() as i64) < PAGE_ROWS { + if (rows.len() as i64) < page_rows { break; } } Ok(()) } + +#[cfg(all(test, feature = "parquet"))] +mod tests { + use super::*; + use futures::stream::StreamExt; + use std::sync::atomic::Ordering; + use windmill_object_store::object_store_reexports::{InMemory, ObjectStore, Path as OsPath}; + use windmill_object_store::{ExpirableObjectStore, OBJECT_STORE_SETTINGS}; + + async fn install_in_memory_store() -> std::sync::Arc { + let store = std::sync::Arc::new(InMemory::new()); + let dynstore: std::sync::Arc = store.clone(); + let mut settings = OBJECT_STORE_SETTINGS.write().await; + *settings = Some(ExpirableObjectStore::from(dynstore)); + store + } + + async fn clear_store() { + let mut settings = OBJECT_STORE_SETTINGS.write().await; + *settings = None; + } + + /// Insert an audit row `days` days in the past (creating the daily partition if + /// needed). The row's `timestamp` defaults to that point, landing it in the + /// matching partition. + async fn insert_audit_days_ago(db: &DB, operation: &str, days: i64) -> i64 { + sqlx::query(&format!( + "DO $$ DECLARE d date := current_date - {days}; BEGIN \ + EXECUTE format('CREATE TABLE IF NOT EXISTS %I PARTITION OF audit_partitioned \ + FOR VALUES FROM (%L) TO (%L)', 'audit_'||to_char(d,'YYYYMMDD'), d, d + 1); END $$;" + )) + .execute(db) + .await + .ok(); + sqlx::query_scalar::<_, i64>(&format!( + "INSERT INTO audit_partitioned + (workspace_id, username, operation, action_kind, parameters, timestamp) + VALUES ('test-ws','tester',$1,'create'::action_kind,'{{}}'::jsonb, + now() - interval '{days} days') + RETURNING id" + )) + .bind(operation) + .fetch_one(db) + .await + .expect("insert audit row") + } + + /// All ids across every `audit_backfill_*.ndjson` object, and the set of object + /// paths (to assert pagination/day keying). + async fn backfilled(store: &InMemory) -> (Vec, Vec) { + let prefix = OsPath::from("logs/audit"); + let metas = store + .list(Some(&prefix)) + .collect::>() + .await + .into_iter() + .map(|m| m.expect("list object")) + .collect::>(); + let mut ids = Vec::new(); + let mut paths = Vec::new(); + for meta in metas { + paths.push(meta.location.to_string()); + let bytes = store + .get(&meta.location) + .await + .expect("get object") + .bytes() + .await + .expect("read bytes"); + for line in std::str::from_utf8(&bytes).unwrap().lines() { + if line.is_empty() { + continue; + } + let v: serde_json::Value = serde_json::from_str(line).expect("valid ndjson"); + ids.push(v.get("id").and_then(|x| x.as_i64()).expect("row has id")); + } + } + ids.sort(); + paths.sort(); + (ids, paths) + } + + fn session(db: &DB, from: DateTime, to: DateTime) -> Session { + Session { + db: db.clone(), + owner: INSTANCE_NAME.clone(), + progress: RwLock::new(AuditBackfillProgress::new_running(from, to)), + } + } + + // End-to-end backfill: a settled multi-day window is exported in bounded keyset + // pages (forced to 2 rows/page) — every in-window row lands exactly once, rows + // outside [from,to) are excluded, a day that spans a page boundary produces more + // than one object, and re-running is idempotent (same keys overwritten, no dupes). + #[sqlx::test(migrations = "../migrations")] + async fn backfill_exports_window_in_pages(db: DB) -> anyhow::Result<()> { + let store = install_in_memory_store().await; + let dyn_store = windmill_object_store::get_object_store() + .await + .expect("store configured"); + + // In window [now-6d, now-2d): days 5, 4, 3 ago. + let mut want = Vec::new(); + for i in 0..3 { + want.push(insert_audit_days_ago(&db, &format!("bf.d5.{i}"), 5).await); + } + for i in 0..2 { + want.push(insert_audit_days_ago(&db, &format!("bf.d4.{i}"), 4).await); + } + for i in 0..2 { + want.push(insert_audit_days_ago(&db, &format!("bf.d3.{i}"), 3).await); + } + want.sort(); + // Out of window: before `from` and at/after `to`. + let before = insert_audit_days_ago(&db, "bf.before", 7).await; + let after = insert_audit_days_ago(&db, "bf.after", 1).await; + + let from = Utc::now() - chrono::Duration::days(6); + let to = Utc::now() - chrono::Duration::days(2); + + PAGE_ROWS_OVERRIDE.store(2, Ordering::Relaxed); + let s = session(&db, from, to); + run_backfill(&s, &db, &dyn_store, from, to).await?; + PAGE_ROWS_OVERRIDE.store(0, Ordering::Relaxed); + + let (ids, paths) = backfilled(&store).await; + assert_eq!(ids, want, "exactly the in-window rows, each once: {ids:?}"); + assert!( + !ids.contains(&before) && !ids.contains(&after), + "rows outside [from,to) must not be exported" + ); + // 3 rows on the day-5 partition at a 2-row page size => that day spans pages, + // so it yields >1 object — proving keyset paging across a day boundary. + let day5_objects = paths + .iter() + .filter(|p| p.contains("audit_backfill_")) + .count(); + assert!( + day5_objects >= 4, + "expected multiple paged objects (incl. a split day), got {paths:?}" + ); + { + let p = s.progress.read().await; + assert_eq!(p.rows_written, want.len() as u64, "progress row count"); + } + + // Idempotent re-run: deterministic keys are overwritten, never duplicated. + PAGE_ROWS_OVERRIDE.store(2, Ordering::Relaxed); + let s2 = session(&db, from, to); + run_backfill(&s2, &db, &dyn_store, from, to).await?; + PAGE_ROWS_OVERRIDE.store(0, Ordering::Relaxed); + let (ids2, _) = backfilled(&store).await; + assert_eq!(ids2, want, "re-run stays exactly once per row: {ids2:?}"); + + clear_store().await; + Ok(()) + } + + // The endpoint rejects a window whose upper bound is not yet settled (a row's + // timestamp is its txn's xact_start, so a future/live `to` could miss late + // commits), but accepts a window safely in the past. + #[sqlx::test(migrations = "../migrations")] + async fn backfill_rejects_unstable_window(db: DB) -> anyhow::Result<()> { + let future = Utc::now() + chrono::Duration::days(1); + let past_from = Utc::now() - chrono::Duration::days(2); + let err = try_start(&db, past_from, future).await.unwrap_err(); + assert!( + matches!(err, error::Error::BadRequest(_)), + "a future `to` must be rejected as unstable, got {err:?}" + ); + + // A window fully in the settled past is accepted. + let from = Utc::now() - chrono::Duration::days(3); + let to = Utc::now() - chrono::Duration::days(2); + try_start(&db, from, to) + .await + .expect("a settled past window is accepted"); + Ok(()) + } +} From 69139471297dbf4be868d83c3e8d24205a10c909 Mon Sep 17 00:00:00 2001 From: Ruben Fiszel Date: Fri, 26 Jun 2026 17:37:08 +0000 Subject: [PATCH 06/10] docs(audit): note backfill scope is audit_partitioned only Make explicit that, like the steady-state export, the backfill reads only audit_partitioned; the pre-partitioning `audit` table is intentionally out of scope (not a missed case). Co-Authored-By: Claude Opus 4.8 (1M context) --- backend/windmill-api-settings/src/audit_logs_s3_backfill.rs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/backend/windmill-api-settings/src/audit_logs_s3_backfill.rs b/backend/windmill-api-settings/src/audit_logs_s3_backfill.rs index 8f3878e2b8059..ae9d598f4a755 100644 --- a/backend/windmill-api-settings/src/audit_logs_s3_backfill.rs +++ b/backend/windmill-api-settings/src/audit_logs_s3_backfill.rs @@ -21,6 +21,9 @@ //! duplicating. A window that overlaps already-exported steady-state rows simply //! re-emits them under a different key; consumers dedupe by `id`. //! +//! Scope: like the steady-state export, this reads only `audit_partitioned`. The +//! pre-partitioning `audit` table is intentionally not exported or backfilled. +//! //! Progress is persisted in `background_task_state` (name [`TASK_NAME`]) so any //! API replica can serve the status endpoint, mirroring `log_cleanup`. From 4214d3b98e40fd3bf4294662b42e84709031a169 Mon Sep 17 00:00:00 2001 From: Ruben Fiszel Date: Fri, 26 Jun 2026 17:43:40 +0000 Subject: [PATCH 07/10] fix(audit): reject backfill windows before the partitioned boundary MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Address review (Codex P1): the backfill reads only audit_partitioned, but pre-partitioning history lives in the legacy `audit` table (still read by audit list/get via UNION ALL, and retained for the configured period — 365 days by default on EE). Since the setting text points operators at this API for "pre-existing history", a window overlapping legacy rows would report completion while silently omitting them. Per the decision to not export the legacy table, reject instead of silently omit: try_start now rejects a `from` earlier than the oldest audit_partitioned timestamp (every legacy row predates the partition cutover, so a `from` at/after that boundary can never overlap them). Reworded the setting text to scope the backfill to the partitioned era. Added a regression test, plus an RAII guard (cubic P2) so the test-only globals are restored even if an assertion panics. Co-Authored-By: Claude Opus 4.8 (1M context) --- .../src/audit_logs_s3_backfill.rs | 76 ++++++++++++++++--- .../src/lib/components/instanceSettings.ts | 2 +- 2 files changed, 68 insertions(+), 10 deletions(-) diff --git a/backend/windmill-api-settings/src/audit_logs_s3_backfill.rs b/backend/windmill-api-settings/src/audit_logs_s3_backfill.rs index ae9d598f4a755..c9bdf66836b95 100644 --- a/backend/windmill-api-settings/src/audit_logs_s3_backfill.rs +++ b/backend/windmill-api-settings/src/audit_logs_s3_backfill.rs @@ -22,7 +22,9 @@ //! re-emits them under a different key; consumers dedupe by `id`. //! //! Scope: like the steady-state export, this reads only `audit_partitioned`. The -//! pre-partitioning `audit` table is intentionally not exported or backfilled. +//! pre-partitioning `audit` table is intentionally not exported; a window that +//! starts before the earliest partitioned row is rejected (see [`try_start`]) so a +//! backfill never silently reports success while omitting those legacy rows. //! //! Progress is persisted in `background_task_state` (name [`TASK_NAME`]) so any //! API replica can serve the status endpoint, mirroring `log_cleanup`. @@ -198,6 +200,28 @@ pub async fn try_start(db: &DB, from: DateTime, to: DateTime) -> error the live oldest-transaction boundary.)" ))); } + // The backfill (like the steady-state export) reads only `audit_partitioned`. Audit + // history from before partitioning was introduced lives in the legacy `audit` table + // and is intentionally not exported. Reject a window that reaches before the earliest + // partitioned row, so a backfill can't report success while silently omitting those + // legacy rows (every legacy row predates the partition cutover, so a `from` at or + // after the earliest partitioned timestamp can never overlap them). Non-macro query: + // no compile-time-checked entry needed. + let partition_floor: Option> = sqlx::query_scalar::<_, Option>>( + "SELECT min(timestamp) FROM audit_partitioned", + ) + .fetch_one(db) + .await?; + if let Some(floor) = partition_floor { + if from < floor { + return Err(error::Error::BadRequest(format!( + "audit backfill: `from` ({from}) is before the earliest partitioned audit row \ + ({floor}). Audit logs from before audit-log partitioning was introduced live in \ + the legacy `audit` table and are not exported to object storage; set `from` >= \ + {floor}." + ))); + } + } let claimed = background_task::try_claim( db, TASK_NAME, @@ -387,9 +411,19 @@ mod tests { store } - async fn clear_store() { - let mut settings = OBJECT_STORE_SETTINGS.write().await; - *settings = None; + /// Resets the test-only process globals on drop so a failing assertion can't + /// leak `PAGE_ROWS_OVERRIDE` or the in-memory object store into other tests + /// running in the same process. Best-effort store reset (`try_write`) is enough + /// here: it is uncontended at drop time, and the next store-using test installs a + /// fresh one regardless. + struct ResetGlobals; + impl Drop for ResetGlobals { + fn drop(&mut self) { + PAGE_ROWS_OVERRIDE.store(0, Ordering::Relaxed); + if let Ok(mut s) = OBJECT_STORE_SETTINGS.try_write() { + *s = None; + } + } } /// Insert an audit row `days` days in the past (creating the daily partition if @@ -466,10 +500,13 @@ mod tests { // than one object, and re-running is idempotent (same keys overwritten, no dupes). #[sqlx::test(migrations = "../migrations")] async fn backfill_exports_window_in_pages(db: DB) -> anyhow::Result<()> { + let _reset = ResetGlobals; // restores globals even if an assertion panics let store = install_in_memory_store().await; let dyn_store = windmill_object_store::get_object_store() .await .expect("store configured"); + // Force multi-page / page-spanning-day keyset behaviour with a handful of rows. + PAGE_ROWS_OVERRIDE.store(2, Ordering::Relaxed); // In window [now-6d, now-2d): days 5, 4, 3 ago. let mut want = Vec::new(); @@ -490,10 +527,8 @@ mod tests { let from = Utc::now() - chrono::Duration::days(6); let to = Utc::now() - chrono::Duration::days(2); - PAGE_ROWS_OVERRIDE.store(2, Ordering::Relaxed); let s = session(&db, from, to); run_backfill(&s, &db, &dyn_store, from, to).await?; - PAGE_ROWS_OVERRIDE.store(0, Ordering::Relaxed); let (ids, paths) = backfilled(&store).await; assert_eq!(ids, want, "exactly the in-window rows, each once: {ids:?}"); @@ -517,14 +552,11 @@ mod tests { } // Idempotent re-run: deterministic keys are overwritten, never duplicated. - PAGE_ROWS_OVERRIDE.store(2, Ordering::Relaxed); let s2 = session(&db, from, to); run_backfill(&s2, &db, &dyn_store, from, to).await?; - PAGE_ROWS_OVERRIDE.store(0, Ordering::Relaxed); let (ids2, _) = backfilled(&store).await; assert_eq!(ids2, want, "re-run stays exactly once per row: {ids2:?}"); - clear_store().await; Ok(()) } @@ -549,4 +581,30 @@ mod tests { .expect("a settled past window is accepted"); Ok(()) } + + // A window reaching before the earliest partitioned row is rejected: those + // pre-partitioning rows live in the legacy `audit` table and are not exported, + // so the backfill must not report success while silently omitting them. + #[sqlx::test(migrations = "../migrations")] + async fn backfill_rejects_pre_partition_window(db: DB) -> anyhow::Result<()> { + // Earliest partitioned row ~3 days ago. + insert_audit_days_ago(&db, "boundary", 3).await; + + // `from` before the boundary -> rejected. + let from = Utc::now() - chrono::Duration::days(10); + let to = Utc::now() - chrono::Duration::days(2); + let err = try_start(&db, from, to).await.unwrap_err(); + assert!( + matches!(err, error::Error::BadRequest(_)), + "a window starting before the earliest partitioned row must be rejected, got {err:?}" + ); + + // `from` within the partitioned era (after the boundary) -> accepted. + let from_ok = Utc::now() - chrono::Duration::hours(60); // ~2.5d ago, > boundary + let to_ok = Utc::now() - chrono::Duration::days(2); + try_start(&db, from_ok, to_ok) + .await + .expect("a window within the partitioned era is accepted"); + Ok(()) + } } diff --git a/frontend/src/lib/components/instanceSettings.ts b/frontend/src/lib/components/instanceSettings.ts index 6359d82b87423..e7d1d3fbe0e96 100644 --- a/frontend/src/lib/components/instanceSettings.ts +++ b/frontend/src/lib/components/instanceSettings.ts @@ -443,7 +443,7 @@ export const settings: Record = { { label: 'Store audit logs in object storage', description: - 'When enabled and instance object storage is configured, audit logs are also exported as newline-delimited JSON to the dedicated logs/audit/ folder (partitioned by day). Export is incremental and runs off the hot path. Enabling (or re-enabling) anchors the export at ~now: while it stays enabled, every audit log committed from that point on is exported (transactions in flight at the moment of enabling may include a bounded set of just-prior rows). Pre-existing history, and any window during which export was disabled, are NOT exported by this cursor — use the opt-in backfill API to export a chosen historical range: POST /settings/audit_logs_s3_backfill {from, to} (status at GET /settings/audit_logs_s3_backfill_status).', + 'When enabled and instance object storage is configured, audit logs are also exported as newline-delimited JSON to the dedicated logs/audit/ folder (partitioned by day). Export is incremental and runs off the hot path. Enabling (or re-enabling) anchors the export at ~now: while it stays enabled, every audit log committed from that point on is exported (transactions in flight at the moment of enabling may include a bounded set of just-prior rows). Pre-existing history, and any window during which export was disabled, are NOT exported by this cursor — use the opt-in backfill API to export a chosen historical range, back to when audit-log partitioning was introduced (older rows in the legacy audit table are not exported, and a window starting before that boundary is rejected): POST /settings/audit_logs_s3_backfill {from, to} (status at GET /settings/audit_logs_s3_backfill_status).', key: 'store_audit_logs_s3', fieldType: 'boolean', storage: 'setting', From e399fbdaf0f2e88afe39c7eae6bf83554621acfc Mon Sep 17 00:00:00 2001 From: Ruben Fiszel Date: Fri, 26 Jun 2026 17:55:50 +0000 Subject: [PATCH 08/10] fix(audit): backfill object keys per-window; require trustworthy settled cutoff MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Address review (two P1s): - Object-key overwrite loss: keys were `dt=/audit_backfill_.ndjson`. A narrower, overlapping backfill can start a day's page at the same first row (same min_id) but hold fewer rows, and `put` would overwrite a broader run's object — silently dropping the rows only that object held. Include the requested window in the key so different ranges write disjoint objects (same window re-runs stay idempotent; consumers dedupe overlapping rows by id). New regression test (verified red→green). - Untrustworthy settled cutoff: when min(xact_start) isn't trustworthy (role lacks pg_read_all_stats/superuser, or a prepared 2PC txn exists), the old now()-7d fallback could still let an old transaction commit rows inside an accepted window after the scan, so a "complete" backfill silently missed them. Since a backfill asserts completeness, reject in those cases instead of falling back. (The continuous exporter keeps its 7-day fallback — it only claims bounded lag.) Also makes the tests robust under the parallel runner: run_backfill takes the store as a param, so tests pass a local in-memory store (no global OBJECT_STORE_SETTINGS race) and serialize on the PAGE_ROWS override. Co-Authored-By: Claude Opus 4.8 (1M context) --- .../src/audit_logs_s3_backfill.rs | 163 +++++++++++++----- 1 file changed, 124 insertions(+), 39 deletions(-) diff --git a/backend/windmill-api-settings/src/audit_logs_s3_backfill.rs b/backend/windmill-api-settings/src/audit_logs_s3_backfill.rs index c9bdf66836b95..2ae936d510469 100644 --- a/backend/windmill-api-settings/src/audit_logs_s3_backfill.rs +++ b/backend/windmill-api-settings/src/audit_logs_s3_backfill.rs @@ -15,8 +15,10 @@ //! It does not touch the xmin cursor / checkpoint at all. //! //! Objects are written next to the steady-state ones under `logs/audit/dt=/` -//! as `audit_backfill_.ndjson`, with the exact same row shape, so a -//! consumer reads them uniformly. Re-running the same window is deterministic +//! as `audit_backfill__.ndjson`, with the exact same row shape, so +//! a consumer reads them uniformly. The key includes the requested window so two +//! different backfill ranges never overwrite each other (a per-page `min_id` alone +//! is not unique across windows). Re-running the *same* window is deterministic //! (audit history is append-only), so it overwrites the same objects rather than //! duplicating. A window that overlaps already-exported steady-state rows simply //! re-emits them under a different key; consumers dedupe by `id`. @@ -173,15 +175,20 @@ pub async fn try_start(db: &DB, from: DateTime, to: DateTime) -> error )); } // The backfill keyset-pages by `(timestamp, id)` over rows visible at scan time and - // declares completion once the scan runs dry. But a row's `timestamp` is its - // inserting transaction's `xact_start`, so a transaction that started inside + // declares the window fully exported once the scan runs dry. But a row's `timestamp` + // is its inserting transaction's `xact_start`, so a transaction that started inside // `[from, to)` yet commits after the scan has passed that timestamp — or any row - // committed when `to` is in the future — would be silently omitted from a - // "completed" backfill. Require `to` to be at or before the oldest in-flight - // `xact_start`: everything strictly older than the oldest running transaction is - // already committed and stable. Same trustworthy gating as the exporter's floor - // (a restricted stats role or a prepared 2PC txn makes the value unsafe); fall back - // to a 7-day-old cutoff then. + // committed when `to` is in the future — would be silently omitted. Require `to` to + // be at or before the oldest in-flight `xact_start`: everything strictly older than + // the oldest running transaction is already committed and stable. + // + // That bound is only sound when we can see every xmin-holding transaction. A role + // without pg_read_all_stats/superuser sees only its own sessions, and a prepared + // (2PC) transaction is invisible to pg_stat_activity — in either case an old + // transaction could still commit rows inside an accepted window after our scan ends. + // Since a backfill asserts completeness, we REJECT in those cases rather than fall + // back to a best-effort margin (NULL below). (The continuous exporter, which only + // claims bounded lag, keeps the 7-day fallback instead.) let settled_cutoff: Option> = sqlx::query_scalar!( r#"SELECT CASE WHEN (current_setting('is_superuser') = 'on' OR pg_has_role(current_user, 'pg_read_all_stats', 'USAGE')) @@ -191,13 +198,22 @@ pub async fn try_start(db: &DB, from: DateTime, to: DateTime) -> error ) .fetch_one(db) .await?; - let settled_cutoff = settled_cutoff.unwrap_or_else(|| Utc::now() - chrono::Duration::days(7)); + let Some(settled_cutoff) = settled_cutoff else { + return Err(error::Error::BadRequest( + "audit backfill: cannot determine a trustworthy settled-time boundary, so completeness \ + can't be guaranteed. The windmill DB role needs pg_read_all_stats (or superuser) and \ + there must be no prepared (2PC) transactions in progress — otherwise an old or \ + invisible transaction could later commit audit rows inside the requested window and \ + the backfill would miss them. Grant the privilege / resolve prepared transactions and \ + retry." + .to_string(), + )); + }; if to > settled_cutoff { return Err(error::Error::BadRequest(format!( - "audit backfill: `to` ({to}) must be at or before {settled_cutoff}, the point up to \ - which audit rows are guaranteed settled; choose an earlier upper bound. (Granting \ - pg_read_all_stats to the windmill DB user tightens this cutoff from a 7-day margin to \ - the live oldest-transaction boundary.)" + "audit backfill: `to` ({to}) must be at or before {settled_cutoff}, the newest point \ + guaranteed settled (the oldest in-flight transaction's start); choose an earlier \ + upper bound." ))); } // The backfill (like the steady-state export) reads only `audit_partitioned`. Audit @@ -313,6 +329,13 @@ async fn run_backfill( let mut cursor_ts = from; let mut cursor_id: i64 = -1; let page_rows = page_rows(); + // Namespace object keys by the requested window. The per-page `min_id` alone is not + // unique across runs: a narrower, overlapping backfill can start a day's page at the + // same first row (same `min_id`) but contain fewer rows, and `put` would overwrite a + // broader run's object — silently dropping the rows only that object held. Including + // the window makes different ranges write disjoint keys (same window re-runs stay + // idempotent); consumers already dedupe overlapping rows by `id`. + let window_key = format!("{}_{}", from.timestamp_millis(), to.timestamp_millis()); loop { let rows = sqlx::query!( @@ -358,7 +381,7 @@ async fn run_backfill( for (day, min_id, ndjson) in &by_day { let object_path = ObjectPath::from(format!( - "{LOGS_AUDIT}dt={day}/audit_backfill_{min_id}.ndjson" + "{LOGS_AUDIT}dt={day}/audit_backfill_{window_key}_{min_id}.ndjson" )); store .put(&object_path, ndjson.clone().into_bytes().into()) @@ -400,29 +423,29 @@ mod tests { use super::*; use futures::stream::StreamExt; use std::sync::atomic::Ordering; + use std::sync::Arc; use windmill_object_store::object_store_reexports::{InMemory, ObjectStore, Path as OsPath}; - use windmill_object_store::{ExpirableObjectStore, OBJECT_STORE_SETTINGS}; - - async fn install_in_memory_store() -> std::sync::Arc { - let store = std::sync::Arc::new(InMemory::new()); - let dynstore: std::sync::Arc = store.clone(); - let mut settings = OBJECT_STORE_SETTINGS.write().await; - *settings = Some(ExpirableObjectStore::from(dynstore)); - store + + /// A private, per-test object store. `run_backfill` takes the store as a parameter, + /// so tests use a local one and never touch the process-global `OBJECT_STORE_SETTINGS` + /// (which would otherwise race across the parallel test runner). + fn local_store() -> (Arc, Arc) { + let store = Arc::new(InMemory::new()); + let dynstore: Arc = store.clone(); + (store, dynstore) } - /// Resets the test-only process globals on drop so a failing assertion can't - /// leak `PAGE_ROWS_OVERRIDE` or the in-memory object store into other tests - /// running in the same process. Best-effort store reset (`try_write`) is enough - /// here: it is uncontended at drop time, and the next store-using test installs a - /// fresh one regardless. - struct ResetGlobals; - impl Drop for ResetGlobals { + /// Serializes the tests that touch the `PAGE_ROWS_OVERRIDE` process global (read + /// inside `run_backfill`) so they can't observe each other's value under the parallel + /// runner. + static PAGE_OVERRIDE_LOCK: tokio::sync::Mutex<()> = tokio::sync::Mutex::const_new(()); + + /// Resets `PAGE_ROWS_OVERRIDE` on drop so a failing assertion can't leak a non-default + /// page size into another test. + struct ResetPageOverride; + impl Drop for ResetPageOverride { fn drop(&mut self) { PAGE_ROWS_OVERRIDE.store(0, Ordering::Relaxed); - if let Ok(mut s) = OBJECT_STORE_SETTINGS.try_write() { - *s = None; - } } } @@ -451,6 +474,31 @@ mod tests { .expect("insert audit row") } + /// Insert an audit row at an exact timestamp (creating the daily partition if + /// needed), for tests that need distinct in-day timestamps. + async fn insert_audit_at(db: &DB, operation: &str, ts: DateTime) -> i64 { + sqlx::query(&format!( + "DO $$ DECLARE d date := '{}'; BEGIN \ + EXECUTE format('CREATE TABLE IF NOT EXISTS %I PARTITION OF audit_partitioned \ + FOR VALUES FROM (%L) TO (%L)', 'audit_'||to_char(d,'YYYYMMDD'), d, d + 1); END $$;", + ts.format("%Y-%m-%d") + )) + .execute(db) + .await + .ok(); + sqlx::query_scalar::<_, i64>( + "INSERT INTO audit_partitioned + (workspace_id, username, operation, action_kind, parameters, timestamp) + VALUES ('test-ws','tester',$1,'create'::action_kind,'{}'::jsonb,$2) + RETURNING id", + ) + .bind(operation) + .bind(ts) + .fetch_one(db) + .await + .expect("insert audit row") + } + /// All ids across every `audit_backfill_*.ndjson` object, and the set of object /// paths (to assert pagination/day keying). async fn backfilled(store: &InMemory) -> (Vec, Vec) { @@ -500,11 +548,9 @@ mod tests { // than one object, and re-running is idempotent (same keys overwritten, no dupes). #[sqlx::test(migrations = "../migrations")] async fn backfill_exports_window_in_pages(db: DB) -> anyhow::Result<()> { - let _reset = ResetGlobals; // restores globals even if an assertion panics - let store = install_in_memory_store().await; - let dyn_store = windmill_object_store::get_object_store() - .await - .expect("store configured"); + let _serial = PAGE_OVERRIDE_LOCK.lock().await; + let _reset = ResetPageOverride; // restores the page override even on panic + let (store, dyn_store) = local_store(); // Force multi-page / page-spanning-day keyset behaviour with a handful of rows. PAGE_ROWS_OVERRIDE.store(2, Ordering::Relaxed); @@ -560,6 +606,45 @@ mod tests { Ok(()) } + // A narrower backfill overlapping a broader one must not overwrite (and drop rows + // from) the broader run's object: the object key includes the window. The two + // windows share a day and the same first row (so the same `min_id`), but the + // narrower one holds fewer rows. + #[sqlx::test(migrations = "../migrations")] + async fn backfill_window_in_key_prevents_overwrite(db: DB) -> anyhow::Result<()> { + // Hold the lock so no concurrent test's PAGE_ROWS_OVERRIDE is observed; this test + // wants the default (large) page size so each day is one object per window. + let _serial = PAGE_OVERRIDE_LOCK.lock().await; + let (store, dyn_store) = local_store(); + + // Four rows on the same day at distinct times. + let base = Utc::now() - chrono::Duration::days(5); + let r0 = insert_audit_at(&db, "ov.0", base).await; + let r1 = insert_audit_at(&db, "ov.1", base + chrono::Duration::seconds(10)).await; + let r2 = insert_audit_at(&db, "ov.2", base + chrono::Duration::seconds(20)).await; + let r3 = insert_audit_at(&db, "ov.3", base + chrono::Duration::seconds(30)).await; + + // Broad run covers all four (one object for the day, keyed by r0). + let a_from = base - chrono::Duration::seconds(1); + let a_to = base + chrono::Duration::seconds(31); + run_backfill(&session(&db, a_from, a_to), &db, &dyn_store, a_from, a_to).await?; + + // Narrow run starts at the same first row (same min_id) but holds only r0, r1. + let b_from = base - chrono::Duration::seconds(1); + let b_to = base + chrono::Duration::seconds(15); + run_backfill(&session(&db, b_from, b_to), &db, &dyn_store, b_from, b_to).await?; + + let (ids, _) = backfilled(&store).await; + for id in [r0, r1, r2, r3] { + assert!( + ids.contains(&id), + "row {id} lost — a narrower overlapping window overwrote the broader run's \ + object: {ids:?}" + ); + } + Ok(()) + } + // The endpoint rejects a window whose upper bound is not yet settled (a row's // timestamp is its txn's xact_start, so a future/live `to` could miss late // commits), but accepts a window safely in the past. From ef980481367b30617b0d2b3ae671f4cc0ad4f975 Mon Sep 17 00:00:00 2001 From: Ruben Fiszel Date: Fri, 26 Jun 2026 18:00:55 +0000 Subject: [PATCH 09/10] fix(audit): reject backfill overlapping legacy table; regen deref openapi; trim migration comment Address review (1 P1 + 2 P2): - Empty-partition backfill (P1): the min(audit_partitioned) guard no-ops when audit_partitioned is empty, so an upgraded instance with legacy `audit` rows but no partitioned rows yet would accept a window and complete with zero rows, silently omitting the legacy rows. Check the legacy `audit` table directly: reject any window that overlaps a legacy row (subsumes the boundary check and covers the empty-partitioned case). Test updated accordingly. - openapi-deref (P2): regenerate openapi-deref.yaml/json (served via include_str!) so /openapi.{yaml,json} expose the new backfill endpoints. - Migration comment (P2): trim the PR-history narration to the durable constraints (why a recent floor and a monotonic cursor are required), per AGENTS.md. Co-Authored-By: Claude Opus 4.8 (1M context) --- ...51_audit_logs_s3_reanchor_on_enable.up.sql | 47 ++-- .../src/audit_logs_s3_backfill.rs | 76 ++++--- backend/windmill-api/openapi-deref.json | 211 +++++++++++++++++- backend/windmill-api/openapi-deref.yaml | 158 ++++++++++++- .../src/lib/components/instanceSettings.ts | 2 +- 5 files changed, 432 insertions(+), 62 deletions(-) diff --git a/backend/migrations/20260626132251_audit_logs_s3_reanchor_on_enable.up.sql b/backend/migrations/20260626132251_audit_logs_s3_reanchor_on_enable.up.sql index 175c0d275900e..3fd759da9e61c 100644 --- a/backend/migrations/20260626132251_audit_logs_s3_reanchor_on_enable.up.sql +++ b/backend/migrations/20260626132251_audit_logs_s3_reanchor_on_enable.up.sql @@ -1,23 +1,18 @@ --- Re-anchor the audit→object-store export cursor on every enable (including a --- re-enable after a disable), and anchor a *recent* timestamp floor instead of --- the epoch sentinel. +-- Anchors the audit→object-store export cursor when the setting is enabled. -- --- The previous version wrote `last_ts = epoch` and used `ON CONFLICT DO NOTHING` --- (preserve the old cursor). Two consequences hurt large instances: --- * the epoch floor disables partition pruning, so the first export run --- sequentially scans the entire `audit_partitioned` table — under a --- `statement_timeout` (e.g. Aiven) it never completes, so the cursor never --- advances and nothing is ever exported; --- * preserving the cursor across a long disable means re-enabling tries to --- backfill the whole disabled window by the same unindexable `age(xmin)` --- scan, with the same outcome. +-- `last_ts`/`last_oldest_inflight_ts` must be a *recent* floor, not epoch: the +-- export's `timestamp >= floor` predicate is the only partition-pruning bound (the +-- `age(xmin)` cursor is unindexable), so an epoch floor would scan the whole +-- `audit_partitioned` table on the first run and never finish under a +-- `statement_timeout`. The floor must be at or below the timestamp of any row whose +-- xid >= this snapshot xmin; the oldest in-flight `xact_start` is that bound when +-- stats are visible (no restricted role / prepared 2PC txn), else a bounded 7-day +-- window. -- --- Now the anchor records a recent floor (the oldest in-flight `xact_start` when --- the stats privilege is available — a sound lower bound for any row whose xid --- >= this snapshot xmin — else a bounded 7-day window), and `ON CONFLICT DO --- UPDATE` advances the cursor to the current snapshot xmin on re-enable (never --- backwards, so it stays HA-safe). Exporting the gap a disable left behind is --- the job of the opt-in historical backfill, not this steady-state cursor. +-- `ON CONFLICT DO UPDATE ... WHERE last_xmin <` keeps the cursor monotonic: a +-- re-enable re-anchors it forward (so the export resumes from ~now rather than +-- rescanning the disabled gap — that gap is the backfill's job), but it never moves +-- backwards, so it is HA-safe and can't be regressed by a slower concurrent writer. -- -- The task name literal must match -- `windmill_common::global_settings::AUDIT_LOGS_S3_EXPORT_TASK`. @@ -54,16 +49,12 @@ BEGIN END; $$ LANGUAGE plpgsql; --- One-time recovery for an instance upgrading while a legacy epoch-sentinel --- checkpoint is still in place (enabled on the old code but never drained — e.g. --- the very full-table scan this migration removes left it stuck). Such a --- checkpoint cannot be safely resumed: its un-drained backlog can be arbitrarily --- old, so stamping a recent floor over the old xmin would prune the older rows --- while the cursor advanced past them (silent loss), and keeping the epoch floor --- would reintroduce the full scan. Instead re-anchor it to now, exactly like a --- fresh enable — the export resumes cleanly from ~now, and the pre-upgrade window --- (which was never successfully exported) is recovered with the opt-in historical --- backfill rather than silently dropped. +-- Recovery for a legacy epoch-sentinel checkpoint (`last_ts = epoch`, never +-- drained). It cannot be safely resumed: its un-drained backlog can be arbitrarily +-- old, so stamping a recent floor over the old xmin would prune the older rows while +-- the cursor advanced past them (silent loss), and keeping the epoch floor would +-- reintroduce the full scan. Re-anchor it to now like a fresh enable; the pre-anchor +-- window is recoverable via the opt-in backfill, not silently dropped. UPDATE background_task_state SET value = jsonb_build_object( 'last_xmin', txid_snapshot_xmin(txid_current_snapshot())::bigint, diff --git a/backend/windmill-api-settings/src/audit_logs_s3_backfill.rs b/backend/windmill-api-settings/src/audit_logs_s3_backfill.rs index 2ae936d510469..e86ceb6448f66 100644 --- a/backend/windmill-api-settings/src/audit_logs_s3_backfill.rs +++ b/backend/windmill-api-settings/src/audit_logs_s3_backfill.rs @@ -25,8 +25,8 @@ //! //! Scope: like the steady-state export, this reads only `audit_partitioned`. The //! pre-partitioning `audit` table is intentionally not exported; a window that -//! starts before the earliest partitioned row is rejected (see [`try_start`]) so a -//! backfill never silently reports success while omitting those legacy rows. +//! overlaps any legacy `audit` row is rejected (see [`try_start`]) so a backfill +//! never silently reports success while omitting them. //! //! Progress is persisted in `background_task_state` (name [`TASK_NAME`]) so any //! API replica can serve the status endpoint, mirroring `log_cleanup`. @@ -218,25 +218,25 @@ pub async fn try_start(db: &DB, from: DateTime, to: DateTime) -> error } // The backfill (like the steady-state export) reads only `audit_partitioned`. Audit // history from before partitioning was introduced lives in the legacy `audit` table - // and is intentionally not exported. Reject a window that reaches before the earliest - // partitioned row, so a backfill can't report success while silently omitting those - // legacy rows (every legacy row predates the partition cutover, so a `from` at or - // after the earliest partitioned timestamp can never overlap them). Non-macro query: - // no compile-time-checked entry needed. - let partition_floor: Option> = sqlx::query_scalar::<_, Option>>( - "SELECT min(timestamp) FROM audit_partitioned", + // and is intentionally not exported. If the requested window overlaps any legacy row, + // reject — otherwise a "completed" backfill would silently omit them. Checking the + // legacy table directly (rather than min(audit_partitioned)) also covers an upgraded + // instance whose `audit_partitioned` is still empty, where a min() guard would no-op. + // Non-macro query: no compile-time-checked entry needed. + let overlaps_legacy: bool = sqlx::query_scalar::<_, bool>( + "SELECT EXISTS (SELECT 1 FROM audit WHERE timestamp >= $1 AND timestamp < $2)", ) + .bind(from) + .bind(to) .fetch_one(db) .await?; - if let Some(floor) = partition_floor { - if from < floor { - return Err(error::Error::BadRequest(format!( - "audit backfill: `from` ({from}) is before the earliest partitioned audit row \ - ({floor}). Audit logs from before audit-log partitioning was introduced live in \ - the legacy `audit` table and are not exported to object storage; set `from` >= \ - {floor}." - ))); - } + if overlaps_legacy { + return Err(error::Error::BadRequest( + "audit backfill: the requested window overlaps rows in the legacy (pre-partitioning) \ + `audit` table, which is not exported to object storage. Restrict the window to the \ + partitioned era (after audit-log partitioning was introduced)." + .to_string(), + )); } let claimed = background_task::try_claim( db, @@ -667,29 +667,43 @@ mod tests { Ok(()) } - // A window reaching before the earliest partitioned row is rejected: those - // pre-partitioning rows live in the legacy `audit` table and are not exported, - // so the backfill must not report success while silently omitting them. + /// Insert a row into the legacy (non-partitioned) `audit` table at an exact time. + async fn insert_legacy_audit_at(db: &DB, operation: &str, ts: DateTime) { + sqlx::query( + "INSERT INTO audit (workspace_id, username, operation, action_kind, parameters, timestamp) + VALUES ('test-ws','tester',$1,'create'::action_kind,'{}'::jsonb,$2)", + ) + .bind(operation) + .bind(ts) + .execute(db) + .await + .expect("insert legacy audit row"); + } + + // A window overlapping rows in the legacy (non-partitioned) `audit` table is rejected: + // those rows are not exported, so the backfill must not report success while silently + // omitting them. Covers the empty-`audit_partitioned` case (a min(partitioned) guard + // would no-op there). #[sqlx::test(migrations = "../migrations")] - async fn backfill_rejects_pre_partition_window(db: DB) -> anyhow::Result<()> { - // Earliest partitioned row ~3 days ago. - insert_audit_days_ago(&db, "boundary", 3).await; + async fn backfill_rejects_window_overlapping_legacy(db: DB) -> anyhow::Result<()> { + // A legacy row ~5 days ago, and no partitioned rows at all. + insert_legacy_audit_at(&db, "legacy.row", Utc::now() - chrono::Duration::days(5)).await; - // `from` before the boundary -> rejected. - let from = Utc::now() - chrono::Duration::days(10); + // A window covering it is rejected. + let from = Utc::now() - chrono::Duration::days(6); let to = Utc::now() - chrono::Duration::days(2); let err = try_start(&db, from, to).await.unwrap_err(); assert!( matches!(err, error::Error::BadRequest(_)), - "a window starting before the earliest partitioned row must be rejected, got {err:?}" + "a window overlapping legacy audit rows must be rejected, got {err:?}" ); - // `from` within the partitioned era (after the boundary) -> accepted. - let from_ok = Utc::now() - chrono::Duration::hours(60); // ~2.5d ago, > boundary - let to_ok = Utc::now() - chrono::Duration::days(2); + // A window clear of any legacy row is accepted. + let from_ok = Utc::now() - chrono::Duration::days(2); + let to_ok = Utc::now() - chrono::Duration::days(1); try_start(&db, from_ok, to_ok) .await - .expect("a window within the partitioned era is accepted"); + .expect("a window with no legacy overlap is accepted"); Ok(()) } } diff --git a/backend/windmill-api/openapi-deref.json b/backend/windmill-api/openapi-deref.json index 540d6bea52f5f..e511578821fa3 100644 --- a/backend/windmill-api/openapi-deref.json +++ b/backend/windmill-api/openapi-deref.json @@ -1,7 +1,7 @@ { "openapi": "3.0.3", "info": { - "version": "1.739.0", + "version": "1.740.0", "title": "Windmill API", "contact": { "name": "Windmill Team", @@ -2719,6 +2719,124 @@ } } }, + "/settings/audit_logs_s3_backfill": { + "post": { + "summary": "start an opt-in historical backfill of audit logs to object storage", + "operationId": "runAuditLogsS3Backfill", + "tags": [ + "setting" + ], + "requestBody": { + "required": true, + "content": { + "application/json": { + "schema": { + "type": "object", + "properties": { + "from": { + "type": "string", + "format": "date-time", + "description": "inclusive lower bound of the window to export" + }, + "to": { + "type": "string", + "format": "date-time", + "description": "exclusive upper bound of the window to export" + } + }, + "required": [ + "from", + "to" + ] + } + } + } + }, + "responses": { + "202": { + "description": "backfill started" + } + } + } + }, + "/settings/audit_logs_s3_backfill_status": { + "get": { + "summary": "get status of the audit-log object-store historical backfill", + "operationId": "getAuditLogsS3BackfillStatus", + "tags": [ + "setting" + ], + "responses": { + "200": { + "description": "current backfill status (null if never run)", + "content": { + "application/json": { + "schema": { + "nullable": true, + "type": "object", + "properties": { + "running": { + "type": "boolean" + }, + "started_at": { + "type": "string", + "format": "date-time" + }, + "finished_at": { + "type": "string", + "format": "date-time", + "nullable": true + }, + "phase": { + "type": "string" + }, + "from": { + "type": "string", + "format": "date-time" + }, + "to": { + "type": "string", + "format": "date-time" + }, + "rows_written": { + "type": "integer", + "format": "int64" + }, + "objects_written": { + "type": "integer", + "format": "int64" + }, + "last_ts": { + "type": "string", + "format": "date-time", + "nullable": true + }, + "errors": { + "type": "integer", + "format": "int64" + }, + "last_error": { + "type": "string", + "nullable": true + } + }, + "required": [ + "running", + "started_at", + "phase", + "from", + "to", + "rows_written", + "objects_written", + "errors" + ] + } + } + } + } + } + } + }, "/settings/send_stats": { "post": { "summary": "send stats", @@ -18440,6 +18558,14 @@ "type": "boolean" } }, + { + "name": "timeout", + "description": "custom timeout in seconds for this preview run", + "in": "query", + "schema": { + "type": "integer" + } + }, { "$ref": "#/components/parameters/NewJobId" } @@ -20465,6 +20591,89 @@ } } }, + "/w/{workspace}/jobs_u/get_flow_all_logs_structured/{id}": { + "get": { + "summary": "get all logs for a flow job in a structured format", + "operationId": "getFlowAllLogsStructured", + "tags": [ + "job" + ], + "parameters": [ + { + "$ref": "#/components/parameters/WorkspaceId" + }, + { + "$ref": "#/components/parameters/JobId" + } + ], + "responses": { + "200": { + "description": "structured logs of all flow steps, one entry per job", + "content": { + "application/json": { + "schema": { + "type": "array", + "items": { + "type": "object", + "properties": { + "job_id": { + "type": "string" + }, + "label": { + "type": "string", + "description": "human-readable label describing the job's position in the flow tree" + }, + "kind": { + "type": "string", + "description": "job kind (script, flow, forloopflow, ...)" + }, + "flow_step_id": { + "type": "string", + "nullable": true + }, + "step_path": { + "type": "string", + "nullable": true, + "description": "materialized step path (e.g. \"a/b\")" + }, + "depth": { + "type": "integer", + "description": "depth in the flow tree (0 for the root flow job)" + }, + "parent_module_type": { + "type": "string", + "nullable": true, + "description": "parent module type (forloopflow, branchall, ...)" + }, + "sibling_index": { + "type": "integer", + "description": "1-based index of this job among siblings sharing the same step" + }, + "sibling_count": { + "type": "integer", + "description": "total number of siblings sharing the same step" + }, + "logs": { + "type": "string" + } + }, + "required": [ + "job_id", + "label", + "kind", + "depth", + "sibling_index", + "sibling_count", + "logs" + ] + } + } + } + } + } + } + } + }, "/w/{workspace}/jobs_u/get_completed_logs_tail/{id}": { "get": { "summary": "get completed job logs tail", diff --git a/backend/windmill-api/openapi-deref.yaml b/backend/windmill-api/openapi-deref.yaml index 127fda6bed303..e17c38efaf484 100644 --- a/backend/windmill-api/openapi-deref.yaml +++ b/backend/windmill-api/openapi-deref.yaml @@ -1,6 +1,6 @@ openapi: 3.0.3 info: - version: 1.739.0 + version: 1.740.0 title: Windmill API contact: name: Windmill Team @@ -2727,6 +2727,90 @@ paths: - bootstrapping - last_run_exported - updated_at + /settings/audit_logs_s3_backfill: + post: + summary: start an opt-in historical backfill of audit logs to object storage + operationId: runAuditLogsS3Backfill + tags: + - setting + requestBody: + required: true + content: + application/json: + schema: + type: object + properties: + from: + type: string + format: date-time + description: inclusive lower bound of the window to export + to: + type: string + format: date-time + description: exclusive upper bound of the window to export + required: + - from + - to + responses: + '202': + description: backfill started + /settings/audit_logs_s3_backfill_status: + get: + summary: get status of the audit-log object-store historical backfill + operationId: getAuditLogsS3BackfillStatus + tags: + - setting + responses: + '200': + description: current backfill status (null if never run) + content: + application/json: + schema: + nullable: true + type: object + properties: + running: + type: boolean + started_at: + type: string + format: date-time + finished_at: + type: string + format: date-time + nullable: true + phase: + type: string + from: + type: string + format: date-time + to: + type: string + format: date-time + rows_written: + type: integer + format: int64 + objects_written: + type: integer + format: int64 + last_ts: + type: string + format: date-time + nullable: true + errors: + type: integer + format: int64 + last_error: + type: string + nullable: true + required: + - running + - started_at + - phase + - from + - to + - rows_written + - objects_written + - errors /settings/send_stats: post: summary: send stats @@ -18747,6 +18831,11 @@ paths: in: query schema: type: boolean + - name: timeout + description: custom timeout in seconds for this preview run + in: query + schema: + type: integer - name: job_id description: >- The job id to assign to the created job. if missing, job is chosen @@ -21710,6 +21799,73 @@ paths: text/plain: schema: type: string + /w/{workspace}/jobs_u/get_flow_all_logs_structured/{id}: + get: + summary: get all logs for a flow job in a structured format + operationId: getFlowAllLogsStructured + tags: + - job + parameters: + - name: workspace + in: path + required: true + schema: *ref_4 + - name: id + in: path + required: true + schema: *ref_178 + responses: + '200': + description: structured logs of all flow steps, one entry per job + content: + application/json: + schema: + type: array + items: + type: object + properties: + job_id: + type: string + label: + type: string + description: >- + human-readable label describing the job's position in + the flow tree + kind: + type: string + description: job kind (script, flow, forloopflow, ...) + flow_step_id: + type: string + nullable: true + step_path: + type: string + nullable: true + description: materialized step path (e.g. "a/b") + depth: + type: integer + description: depth in the flow tree (0 for the root flow job) + parent_module_type: + type: string + nullable: true + description: parent module type (forloopflow, branchall, ...) + sibling_index: + type: integer + description: >- + 1-based index of this job among siblings sharing the + same step + sibling_count: + type: integer + description: total number of siblings sharing the same step + logs: + type: string + required: + - job_id + - label + - kind + - depth + - sibling_index + - sibling_count + - logs /w/{workspace}/jobs_u/get_completed_logs_tail/{id}: get: summary: get completed job logs tail diff --git a/frontend/src/lib/components/instanceSettings.ts b/frontend/src/lib/components/instanceSettings.ts index e7d1d3fbe0e96..81a7b841b7004 100644 --- a/frontend/src/lib/components/instanceSettings.ts +++ b/frontend/src/lib/components/instanceSettings.ts @@ -443,7 +443,7 @@ export const settings: Record = { { label: 'Store audit logs in object storage', description: - 'When enabled and instance object storage is configured, audit logs are also exported as newline-delimited JSON to the dedicated logs/audit/ folder (partitioned by day). Export is incremental and runs off the hot path. Enabling (or re-enabling) anchors the export at ~now: while it stays enabled, every audit log committed from that point on is exported (transactions in flight at the moment of enabling may include a bounded set of just-prior rows). Pre-existing history, and any window during which export was disabled, are NOT exported by this cursor — use the opt-in backfill API to export a chosen historical range, back to when audit-log partitioning was introduced (older rows in the legacy audit table are not exported, and a window starting before that boundary is rejected): POST /settings/audit_logs_s3_backfill {from, to} (status at GET /settings/audit_logs_s3_backfill_status).', + 'When enabled and instance object storage is configured, audit logs are also exported as newline-delimited JSON to the dedicated logs/audit/ folder (partitioned by day). Export is incremental and runs off the hot path. Enabling (or re-enabling) anchors the export at ~now: while it stays enabled, every audit log committed from that point on is exported (transactions in flight at the moment of enabling may include a bounded set of just-prior rows). Pre-existing history, and any window during which export was disabled, are NOT exported by this cursor — use the opt-in backfill API to export a chosen historical range, back to when audit-log partitioning was introduced (older rows in the legacy audit table are not exported, and a window overlapping them is rejected): POST /settings/audit_logs_s3_backfill {from, to} (status at GET /settings/audit_logs_s3_backfill_status).', key: 'store_audit_logs_s3', fieldType: 'boolean', storage: 'setting', From 8e1af7f4fe36009f30b1e6d18342b449a193f1f5 Mon Sep 17 00:00:00 2001 From: "windmill-internal-app[bot]" Date: Fri, 26 Jun 2026 19:35:45 +0000 Subject: [PATCH 10/10] chore: update ee-repo-ref to b821fecccbcba2efed544890576bf2b84321d70d This commit updates the EE repository reference after PR #634 was merged in windmill-ee-private. Previous ee-repo-ref: 6b191b77aabcf77658ad4f9031576e0d7b66bf89 New ee-repo-ref: b821fecccbcba2efed544890576bf2b84321d70d Automated by sync-ee-ref workflow. --- backend/ee-repo-ref.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/backend/ee-repo-ref.txt b/backend/ee-repo-ref.txt index d8f55f59bc4ca..294707d46651f 100644 --- a/backend/ee-repo-ref.txt +++ b/backend/ee-repo-ref.txt @@ -1 +1 @@ -6b191b77aabcf77658ad4f9031576e0d7b66bf89 \ No newline at end of file +b821fecccbcba2efed544890576bf2b84321d70d