Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion backend/ee-repo-ref.txt
Original file line number Diff line number Diff line change
@@ -1 +1 @@
7b92c8e0de4cfc6d986499d60a5f79cd1c6b9d0b
b821fecccbcba2efed544890576bf2b84321d70d
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
-- Restore the previous anchor: epoch sentinel + preserve-cursor (DO NOTHING).
CREATE OR REPLACE FUNCTION audit_logs_s3_anchor_on_enable()
RETURNS TRIGGER AS $$
BEGIN
IF NEW.value = to_jsonb(true)
AND (TG_OP = 'INSERT' OR OLD.value IS DISTINCT FROM NEW.value) THEN
INSERT INTO background_task_state (name, value)
VALUES (
'audit_logs_s3_export',
jsonb_build_object(
'last_xmin', txid_snapshot_xmin(txid_current_snapshot())::bigint,
'last_ts', '1970-01-01T00:00:00+00:00'
)
)
ON CONFLICT (name) DO NOTHING;
END IF;
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
-- Anchors the audit→object-store export cursor when the setting is enabled.
--
-- `last_ts`/`last_oldest_inflight_ts` must be a *recent* floor, not epoch: the
-- export's `timestamp >= floor` predicate is the only partition-pruning bound (the
-- `age(xmin)` cursor is unindexable), so an epoch floor would scan the whole
-- `audit_partitioned` table on the first run and never finish under a
-- `statement_timeout`. The floor must be at or below the timestamp of any row whose
-- xid >= this snapshot xmin; the oldest in-flight `xact_start` is that bound when
-- stats are visible (no restricted role / prepared 2PC txn), else a bounded 7-day
-- window.
--
-- `ON CONFLICT DO UPDATE ... WHERE last_xmin <` keeps the cursor monotonic: a
-- re-enable re-anchors it forward (so the export resumes from ~now rather than
-- rescanning the disabled gap — that gap is the backfill's job), but it never moves
-- backwards, so it is HA-safe and can't be regressed by a slower concurrent writer.
--
-- The task name literal must match
-- `windmill_common::global_settings::AUDIT_LOGS_S3_EXPORT_TASK`.

CREATE OR REPLACE FUNCTION audit_logs_s3_anchor_on_enable()
RETURNS TRIGGER AS $$
DECLARE
v_floor timestamptz;
BEGIN
IF NEW.value = to_jsonb(true)
AND (TG_OP = 'INSERT' OR OLD.value IS DISTINCT FROM NEW.value) THEN
v_floor := COALESCE(
CASE WHEN (current_setting('is_superuser') = 'on'
OR pg_has_role(current_user, 'pg_read_all_stats', 'USAGE'))
AND NOT EXISTS (SELECT 1 FROM pg_prepared_xacts)
THEN (SELECT min(xact_start) FROM pg_stat_activity WHERE xact_start IS NOT NULL)
Comment thread
cubic-dev-ai[bot] marked this conversation as resolved.
ELSE NULL END,
now() - interval '7 days');
INSERT INTO background_task_state (name, value)
VALUES (
'audit_logs_s3_export',
jsonb_build_object(
'last_xmin', txid_snapshot_xmin(txid_current_snapshot())::bigint,
'last_ts', now(),
'last_oldest_inflight_ts', v_floor
)
)
ON CONFLICT (name) DO UPDATE
SET value = EXCLUDED.value
WHERE (background_task_state.value->>'last_xmin')::bigint
< (EXCLUDED.value->>'last_xmin')::bigint;
END IF;
RETURN NEW;
END;
$$ LANGUAGE plpgsql;

-- Recovery for a legacy epoch-sentinel checkpoint (`last_ts = epoch`, never
-- drained). It cannot be safely resumed: its un-drained backlog can be arbitrarily
-- old, so stamping a recent floor over the old xmin would prune the older rows while
-- the cursor advanced past them (silent loss), and keeping the epoch floor would
-- reintroduce the full scan. Re-anchor it to now like a fresh enable; the pre-anchor
-- window is recoverable via the opt-in backfill, not silently dropped.
UPDATE background_task_state
SET value = jsonb_build_object(
'last_xmin', txid_snapshot_xmin(txid_current_snapshot())::bigint,
'last_ts', to_jsonb(now()),
'last_oldest_inflight_ts', to_jsonb(COALESCE(
CASE WHEN (current_setting('is_superuser') = 'on'
OR pg_has_role(current_user, 'pg_read_all_stats', 'USAGE'))
AND NOT EXISTS (SELECT 1 FROM pg_prepared_xacts)
THEN (SELECT min(xact_start) FROM pg_stat_activity WHERE xact_start IS NOT NULL)
ELSE NULL END,
now() - interval '7 days')))
WHERE name = 'audit_logs_s3_export'
AND (value->>'last_ts')::timestamptz <= 'epoch'::timestamptz;
9 changes: 5 additions & 4 deletions backend/windmill-api-settings/src/audit_logs_s3.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,11 @@ use windmill_common::DB;
pub struct AuditLogsS3ExportStatus {
/// xid cursor: rows of transactions below this have been exported.
pub last_xmin: i64,
/// Partition-pruning floor (the epoch sentinel while still bootstrapping).
/// Partition-pruning floor: the latest audit-row timestamp the cursor has
/// reached (also the read side's 7-day-fallback anchor).
pub last_ts: Option<DateTime<Utc>>,
/// True until the initial post-enable backlog has been fully drained.
/// True while the exporter is draining a backlog — the last run was capped
/// at `MAX_XID_INTERVAL` xids and has not yet caught up to the live snapshot.
pub bootstrapping: bool,
/// The latest audit-row timestamp actually written to object storage so
/// far (monotonic) — the "how current is the mirror" figure.
Expand Down Expand Up @@ -50,8 +52,7 @@ pub async fn get_status(db: &DB) -> error::Result<Option<AuditLogsS3ExportStatus
.map(|d| d.with_timezone(&Utc))
};
let last_ts = parse_dt("last_ts");
let epoch = DateTime::<Utc>::from_timestamp(0, 0).unwrap();
let bootstrapping = last_ts.map(|t| t <= epoch).unwrap_or(true);
let bootstrapping = v.get("draining").and_then(|x| x.as_bool()).unwrap_or(false);
Ok(Some(AuditLogsS3ExportStatus {
last_xmin: v.get("last_xmin").and_then(|x| x.as_i64()).unwrap_or(0),
last_ts,
Expand Down
Loading
Loading