From 69ae09fe96d45209797f0057294edad3dc4ee74c Mon Sep 17 00:00:00 2001 From: Ruben Fiszel Date: Fri, 26 Jun 2026 10:19:57 +0000 Subject: [PATCH 1/6] feat: capture managed-materialize output schema as asset metadata (#2a) After a managed `// materialize` run, capture the producer's output schema via a DESCRIBE folded into the existing one-row summary read (no extra round-trip) and persist it in a new versioned `materialized_asset_schema` sidecar table. This is the producer-side capture that pipeline parity gap #2b (save-time consumer-ref contract enforcement) will read back. - materialized_asset_schema sidecar (asset-level grain), versioned: a new version row is inserted only when the captured column set changes. - output_schema column added to the materialize summary codegen. - worker extracts + records the schema on a successful materialize. - /assets/asset_schemas read endpoint exposing the evolution history. Co-Authored-By: Claude Opus 4.8 --- ...b5829657224a743af9c0b2d5a140ad70e13dd.json | 33 ++++ ...061d2562c3ac4af8d088f9eea9f0bc11d5fe7.json | 44 +++++ ...cf0d8d2a3d7e8552d8ad09512d2d6bf27c3fb.json | 62 ++++++++ ...66eb7a83e312b88f0244d1c517b03a9bcdea0.json | 34 ++++ ...6d0f58f49a45647525658acb909a494877238.json | 22 +++ backend/Cargo.lock | 1 + ...840_add_materialized_asset_schema.down.sql | 1 + ...95840_add_materialized_asset_schema.up.sql | 37 +++++ .../windmill-parser/src/sql_materialize.rs | 21 ++- backend/windmill-api-assets/Cargo.toml | 1 + backend/windmill-api-assets/src/lib.rs | 53 ++++++- .../windmill-common/src/materialization.rs | 139 +++++++++++++++- .../tests/asset_schema_capture.rs | 105 ++++++++++++ .../windmill-worker/src/duckdb_executor.rs | 150 ++++++++++++++++-- 14 files changed, 685 insertions(+), 18 deletions(-) create mode 100644 backend/.sqlx/query-01732ca02b1888145c48c4e51e5b5829657224a743af9c0b2d5a140ad70e13dd.json create mode 100644 backend/.sqlx/query-231475dc825518aa88f562698f8061d2562c3ac4af8d088f9eea9f0bc11d5fe7.json create mode 100644 backend/.sqlx/query-2a28f13c1f06a77bc9a164393cccf0d8d2a3d7e8552d8ad09512d2d6bf27c3fb.json create mode 100644 backend/.sqlx/query-8e829a7358ea47100c99e78067266eb7a83e312b88f0244d1c517b03a9bcdea0.json create mode 100644 backend/.sqlx/query-b876b16b660bd1c3799abadc2c16d0f58f49a45647525658acb909a494877238.json create mode 100644 backend/migrations/20260626095840_add_materialized_asset_schema.down.sql create mode 100644 backend/migrations/20260626095840_add_materialized_asset_schema.up.sql create mode 100644 backend/windmill-common/tests/asset_schema_capture.rs diff --git a/backend/.sqlx/query-01732ca02b1888145c48c4e51e5b5829657224a743af9c0b2d5a140ad70e13dd.json b/backend/.sqlx/query-01732ca02b1888145c48c4e51e5b5829657224a743af9c0b2d5a140ad70e13dd.json new file mode 100644 index 0000000000000..a0d8b66a2600c --- /dev/null +++ b/backend/.sqlx/query-01732ca02b1888145c48c4e51e5b5829657224a743af9c0b2d5a140ad70e13dd.json @@ -0,0 +1,33 @@ +{ + "db_name": "PostgreSQL", + "query": "UPDATE materialized_asset_schema\n SET snapshot_id = $5, job_id = $6, captured_at = now()\n WHERE workspace_id = $1 AND asset_kind = $2 AND asset_path = $3\n AND version = $4", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "Text", + { + "Custom": { + "name": "asset_kind", + "kind": { + "Enum": [ + "s3object", + "resource", + "variable", + "ducklake", + "datatable", + "volume" + ] + } + } + }, + "Text", + "Int8", + "Int8", + "Uuid" + ] + }, + "nullable": [] + }, + "hash": "01732ca02b1888145c48c4e51e5b5829657224a743af9c0b2d5a140ad70e13dd" +} diff --git a/backend/.sqlx/query-231475dc825518aa88f562698f8061d2562c3ac4af8d088f9eea9f0bc11d5fe7.json b/backend/.sqlx/query-231475dc825518aa88f562698f8061d2562c3ac4af8d088f9eea9f0bc11d5fe7.json new file mode 100644 index 0000000000000..e3f72d9c3a97a --- /dev/null +++ b/backend/.sqlx/query-231475dc825518aa88f562698f8061d2562c3ac4af8d088f9eea9f0bc11d5fe7.json @@ -0,0 +1,44 @@ +{ + "db_name": "PostgreSQL", + "query": "SELECT version, columns AS \"columns: Json>\"\n FROM materialized_asset_schema\n WHERE workspace_id = $1 AND asset_kind = $2 AND asset_path = $3\n ORDER BY version DESC\n LIMIT 1", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "version", + "type_info": "Int8" + }, + { + "ordinal": 1, + "name": "columns: Json>", + "type_info": "Jsonb" + } + ], + "parameters": { + "Left": [ + "Text", + { + "Custom": { + "name": "asset_kind", + "kind": { + "Enum": [ + "s3object", + "resource", + "variable", + "ducklake", + "datatable", + "volume" + ] + } + } + }, + "Text" + ] + }, + "nullable": [ + false, + false + ] + }, + "hash": "231475dc825518aa88f562698f8061d2562c3ac4af8d088f9eea9f0bc11d5fe7" +} diff --git a/backend/.sqlx/query-2a28f13c1f06a77bc9a164393cccf0d8d2a3d7e8552d8ad09512d2d6bf27c3fb.json b/backend/.sqlx/query-2a28f13c1f06a77bc9a164393cccf0d8d2a3d7e8552d8ad09512d2d6bf27c3fb.json new file mode 100644 index 0000000000000..f940f25b4a07f --- /dev/null +++ b/backend/.sqlx/query-2a28f13c1f06a77bc9a164393cccf0d8d2a3d7e8552d8ad09512d2d6bf27c3fb.json @@ -0,0 +1,62 @@ +{ + "db_name": "PostgreSQL", + "query": "SELECT version, columns AS \"columns: Json>\",\n snapshot_id, job_id, captured_at\n FROM materialized_asset_schema\n WHERE workspace_id = $1 AND asset_kind = $2 AND asset_path = $3\n ORDER BY version DESC", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "version", + "type_info": "Int8" + }, + { + "ordinal": 1, + "name": "columns: Json>", + "type_info": "Jsonb" + }, + { + "ordinal": 2, + "name": "snapshot_id", + "type_info": "Int8" + }, + { + "ordinal": 3, + "name": "job_id", + "type_info": "Uuid" + }, + { + "ordinal": 4, + "name": "captured_at", + "type_info": "Timestamptz" + } + ], + "parameters": { + "Left": [ + "Text", + { + "Custom": { + "name": "asset_kind", + "kind": { + "Enum": [ + "s3object", + "resource", + "variable", + "ducklake", + "datatable", + "volume" + ] + } + } + }, + "Text" + ] + }, + "nullable": [ + false, + false, + true, + true, + false + ] + }, + "hash": "2a28f13c1f06a77bc9a164393cccf0d8d2a3d7e8552d8ad09512d2d6bf27c3fb" +} diff --git a/backend/.sqlx/query-8e829a7358ea47100c99e78067266eb7a83e312b88f0244d1c517b03a9bcdea0.json b/backend/.sqlx/query-8e829a7358ea47100c99e78067266eb7a83e312b88f0244d1c517b03a9bcdea0.json new file mode 100644 index 0000000000000..eac4547361474 --- /dev/null +++ b/backend/.sqlx/query-8e829a7358ea47100c99e78067266eb7a83e312b88f0244d1c517b03a9bcdea0.json @@ -0,0 +1,34 @@ +{ + "db_name": "PostgreSQL", + "query": "INSERT INTO materialized_asset_schema\n (workspace_id, asset_kind, asset_path, version, columns,\n snapshot_id, job_id, captured_at)\n VALUES ($1, $2, $3, $4, $5, $6, $7, now())", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "Varchar", + { + "Custom": { + "name": "asset_kind", + "kind": { + "Enum": [ + "s3object", + "resource", + "variable", + "ducklake", + "datatable", + "volume" + ] + } + } + }, + "Varchar", + "Int8", + "Jsonb", + "Int8", + "Uuid" + ] + }, + "nullable": [] + }, + "hash": "8e829a7358ea47100c99e78067266eb7a83e312b88f0244d1c517b03a9bcdea0" +} diff --git a/backend/.sqlx/query-b876b16b660bd1c3799abadc2c16d0f58f49a45647525658acb909a494877238.json b/backend/.sqlx/query-b876b16b660bd1c3799abadc2c16d0f58f49a45647525658acb909a494877238.json new file mode 100644 index 0000000000000..5b0f0b319c30c --- /dev/null +++ b/backend/.sqlx/query-b876b16b660bd1c3799abadc2c16d0f58f49a45647525658acb909a494877238.json @@ -0,0 +1,22 @@ +{ + "db_name": "PostgreSQL", + "query": "SELECT pg_advisory_xact_lock(hashtextextended($1, 0::int8))", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "pg_advisory_xact_lock", + "type_info": "Void" + } + ], + "parameters": { + "Left": [ + "Text" + ] + }, + "nullable": [ + null + ] + }, + "hash": "b876b16b660bd1c3799abadc2c16d0f58f49a45647525658acb909a494877238" +} diff --git a/backend/Cargo.lock b/backend/Cargo.lock index ac79413400a1d..9830bed379e0a 100644 --- a/backend/Cargo.lock +++ b/backend/Cargo.lock @@ -14030,6 +14030,7 @@ dependencies = [ "serde", "serde_json", "sqlx", + "tracing", "windmill-api-auth", "windmill-common", ] diff --git a/backend/migrations/20260626095840_add_materialized_asset_schema.down.sql b/backend/migrations/20260626095840_add_materialized_asset_schema.down.sql new file mode 100644 index 0000000000000..8a653a984276f --- /dev/null +++ b/backend/migrations/20260626095840_add_materialized_asset_schema.down.sql @@ -0,0 +1 @@ +DROP TABLE IF EXISTS materialized_asset_schema; diff --git a/backend/migrations/20260626095840_add_materialized_asset_schema.up.sql b/backend/migrations/20260626095840_add_materialized_asset_schema.up.sql new file mode 100644 index 0000000000000..9c548aded71fe --- /dev/null +++ b/backend/migrations/20260626095840_add_materialized_asset_schema.up.sql @@ -0,0 +1,37 @@ +-- Captured output schema of a managed `// materialize` asset (gap #2a). +-- After a managed materialize commits, the worker DESCRIBEs the written table +-- and records its column list here as asset-level metadata. Schema is a +-- property of the asset/table, not of a partition slice, so it lives in its own +-- table keyed by (workspace, asset_kind, asset_path) rather than as a column on +-- materialized_partition (which would duplicate the identical schema across +-- every partition row). This is the producer-side capture that #2b (save-time +-- consumer-ref contract enforcement) reads back. +-- +-- Versioning across re-materializations: a new `version` row is inserted only +-- when the captured column set differs from the latest stored version for the +-- asset; an unchanged re-materialize re-affirms the latest row in place. So the +-- table is a compact schema-evolution history and MAX(version) is the current +-- contract. +CREATE TABLE IF NOT EXISTS materialized_asset_schema ( + workspace_id VARCHAR(50) NOT NULL REFERENCES workspace(id) ON DELETE CASCADE ON UPDATE CASCADE, + asset_kind ASSET_KIND NOT NULL, + asset_path VARCHAR(255) NOT NULL, + -- Monotonic per (workspace, asset_kind, asset_path), starting at 1; only + -- bumped when the schema actually changes. + version BIGINT NOT NULL, + -- The captured columns, ordered as the table presents them: + -- [{"name": "...", "type": "..."}, ...]. + columns JSONB NOT NULL, + -- DuckLake snapshot the schema was captured from (NULL for non-ducklake / + -- substrates without snapshots). + snapshot_id BIGINT, + job_id UUID, + captured_at TIMESTAMPTZ NOT NULL DEFAULT now(), + PRIMARY KEY (workspace_id, asset_kind, asset_path, version) +); + +-- Default privileges (migration 20250205131523) only apply to objects created +-- by the role that set them, so grant explicitly — the API reads/writes this +-- table as the invoking role (same fix as script_trigger in 20260619112847). +GRANT ALL ON materialized_asset_schema TO windmill_user; +GRANT ALL ON materialized_asset_schema TO windmill_admin; diff --git a/backend/parsers/windmill-parser/src/sql_materialize.rs b/backend/parsers/windmill-parser/src/sql_materialize.rs index d5e145bfa98eb..ccdf36653e963 100644 --- a/backend/parsers/windmill-parser/src/sql_materialize.rs +++ b/backend/parsers/windmill-parser/src/sql_materialize.rs @@ -551,10 +551,22 @@ pub fn materialize_result_sql( String::new(), ) }; + // Capture the materialized output schema (gap #2a) in the same summary row — + // no extra round-trip. `DESCRIBE SELECT * FROM ` yields one row per + // column (`column_name`, `column_type`); fold them into an ordered + // list-of-struct the worker reads back and persists as asset metadata. The + // write just committed, so the latest snapshot (no `AT (VERSION)` needed) is + // exactly the slice recorded in `snapshot_id`. `list(...)` preserves the + // DESCRIBE row order (column order); the read is a single small row. + let schema_capture = format!( + "(SELECT list({{'name': column_name, 'type': column_type}}) \ + FROM (DESCRIBE SELECT * FROM {target_qualified})) AS output_schema" + ); let base_cols = format!( "'ducklake://{asset_path}' AS materialized, \ {partition_sel}{count_expr} AS rows, \ - (SELECT max(snapshot_id) FROM ducklake_snapshots('{TARGET_ALIAS}')) AS snapshot_id" + (SELECT max(snapshot_id) FROM ducklake_snapshots('{TARGET_ALIAS}')) AS snapshot_id, \ + {schema_capture}" ); if checks.is_empty() { return format!("SELECT {base_cols};"); @@ -1366,5 +1378,12 @@ mod tests { ); assert!(plain.starts_with("SELECT 'ducklake://analytics/orders' AS materialized")); assert!(!plain.contains("data_tests")); + // Schema capture (gap #2a) is in every summary, tests or not. + for s in [&sql, &plain] { + assert!(s.contains( + "(SELECT list({'name': column_name, 'type': column_type}) \ + FROM (DESCRIBE SELECT * FROM _wm_target.orders)) AS output_schema" + )); + } } } diff --git a/backend/windmill-api-assets/Cargo.toml b/backend/windmill-api-assets/Cargo.toml index 1ea16c3f0d732..c6ba973f80a2d 100644 --- a/backend/windmill-api-assets/Cargo.toml +++ b/backend/windmill-api-assets/Cargo.toml @@ -16,3 +16,4 @@ chrono.workspace = true serde.workspace = true serde_json.workspace = true sqlx.workspace = true +tracing.workspace = true diff --git a/backend/windmill-api-assets/src/lib.rs b/backend/windmill-api-assets/src/lib.rs index e6542d42ecc0c..da9aa9a9283c7 100644 --- a/backend/windmill-api-assets/src/lib.rs +++ b/backend/windmill-api-assets/src/lib.rs @@ -23,6 +23,7 @@ pub fn workspaced_service() -> Router { .route("/graph", get(asset_graph)) .route("/pipelines", get(list_pipeline_folders)) .route("/partitions", get(list_partitions)) + .route("/asset_schemas", get(list_asset_schemas)) .route("/record_materialization", post(record_materialization)) } @@ -53,17 +54,40 @@ async fn list_partitions( Ok(Json(rows)) } +// Per-asset captured output schema versions for a ducklake asset (gap #2a) — +// the schema-evolution history persisted after each managed `// materialize`. +// Newest version first; materialization targets are ducklake-only in v1, so the +// kind is fixed. +async fn list_asset_schemas( + authed: ApiAuthed, + Path(w_id): Path, + Extension(user_db): Extension, + Query(q): Query, +) -> JsonResult> { + let mut tx = user_db.begin(&authed).await?; + let rows = windmill_common::materialization::list_asset_schemas( + &mut *tx, + &w_id, + AssetKind::Ducklake, + &q.path, + ) + .await?; + tx.commit().await?; + Ok(Json(rows)) +} + // Record a materialization outcome from a polyglot (Python/TS) `wmill.ducklake` // helper running as a pipeline step. The DuckDB `// materialize` engine records // this itself; the SDK helpers post here instead so SDK-materialized slices show -// up in the grid identically. RLS-scoped to the caller's workspace. +// up in the grid identically. When the helper also captured the output schema, +// that schema version is upserted too. RLS-scoped to the caller's workspace. async fn record_materialization( authed: ApiAuthed, Path(w_id): Path, Extension(user_db): Extension, Json(req): Json, ) -> JsonResult<()> { - let mut tx = user_db.begin(&authed).await?; + let mut tx = user_db.clone().begin(&authed).await?; windmill_common::materialization::record_materialization( &mut *tx, &w_id, @@ -78,6 +102,31 @@ async fn record_materialization( ) .await?; tx.commit().await?; + // Schema capture is independently best-effort (its own transaction for the + // per-asset advisory lock) and must never roll back the partition record + // above — mirroring the worker's `record_mat`. A lost schema version + // degrades the history, not the run. + if let Some(columns) = req.schema.as_ref() { + let res: windmill_common::error::Result<()> = async { + let mut tx = user_db.clone().begin(&authed).await?; + windmill_common::materialization::record_asset_schema( + &mut tx, + &w_id, + req.asset_kind, + &req.asset_path, + columns, + req.snapshot_id, + req.job_id, + ) + .await?; + tx.commit().await?; + Ok(()) + } + .await; + if let Err(e) = res { + tracing::warn!("failed to record captured asset schema: {e:#}"); + } + } Ok(Json(())) } diff --git a/backend/windmill-common/src/materialization.rs b/backend/windmill-common/src/materialization.rs index ccc1f7782ecbf..c2ef18287783d 100644 --- a/backend/windmill-common/src/materialization.rs +++ b/backend/windmill-common/src/materialization.rs @@ -13,7 +13,8 @@ use chrono::{DateTime, Utc}; use serde::{Deserialize, Serialize}; -use sqlx::PgExecutor; +use sqlx::types::Json; +use sqlx::{PgExecutor, Postgres, Transaction}; use uuid::Uuid; use crate::assets::AssetKind; @@ -34,6 +35,17 @@ pub enum MaterializationStatus { Failed, } +/// One column of a captured asset output schema: its name and substrate type +/// (e.g. `{"name": "order_id", "type": "BIGINT"}`). `type` is the substrate's +/// own type spelling (DuckDB for ducklake) — kept verbatim so #2b can compare +/// declared vs. captured without a lossy normalization step. +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +pub struct SchemaColumn { + pub name: String, + #[serde(rename = "type")] + pub data_type: String, +} + /// The materialization outcome an agent worker (`Connection::Http`, no direct /// DB) sends to the API to be recorded. Mirrors the `record_materialization` /// args; the API handler unpacks it and calls that function with its own DB. @@ -47,6 +59,13 @@ pub struct RecordMaterializationRequest { pub row_count: Option, pub job_id: Option, pub error: Option, + /// Captured output schema of the materialized asset (`None` when the + /// substrate/run produced no schema, e.g. a failed run or a polyglot helper + /// that doesn't DESCRIBE). When present, the recorder also upserts a + /// `materialized_asset_schema` version. Defaults to `None` so older agents + /// stay wire-compatible. + #[serde(default)] + pub schema: Option>, } /// Upsert the latest materialization state for one (asset, partition) slice. @@ -133,3 +152,121 @@ pub async fn list_materialized_partitions<'e>( .await?; Ok(rows) } + +/// One captured schema version of an asset, newest first — the schema-evolution +/// history surfaced on the asset node and read by #2b contract enforcement. +#[derive(sqlx::FromRow, Debug, Clone, Serialize)] +pub struct AssetSchemaVersion { + pub version: i64, + pub columns: Json>, + pub snapshot_id: Option, + pub job_id: Option, + pub captured_at: DateTime, +} + +/// Record the captured output schema of a freshly-materialized asset. +/// +/// Versioning across re-materializations: a new `version` row is inserted only +/// when `columns` differs from the latest stored version; an unchanged +/// re-materialize re-affirms the latest row in place (updates its +/// `snapshot_id`/`job_id`/`captured_at`). The result is a compact +/// schema-evolution history where `MAX(version)` is the current contract. +/// +/// Runs in a transaction guarded by a per-asset advisory lock so two concurrent +/// materializations of the same asset can't both insert the same next version +/// or interleave a stale comparison. Returns `true` if a new version was +/// inserted (the schema changed), `false` if the latest was re-affirmed. +pub async fn record_asset_schema( + tx: &mut Transaction<'_, Postgres>, + workspace_id: &str, + asset_kind: AssetKind, + asset_path: &str, + columns: &[SchemaColumn], + snapshot_id: Option, + job_id: Option, +) -> Result { + // Serialize concurrent captures of the *same* asset; the lock auto-releases + // at tx end. Hash the identity into the bigint advisory-lock key space. + sqlx::query!( + "SELECT pg_advisory_xact_lock(hashtextextended($1, 0::int8))", + format!("materialized_asset_schema:{workspace_id}:{asset_kind:?}:{asset_path}"), + ) + .fetch_one(&mut **tx) + .await?; + + let latest = sqlx::query!( + r#"SELECT version, columns AS "columns: Json>" + FROM materialized_asset_schema + WHERE workspace_id = $1 AND asset_kind = $2 AND asset_path = $3 + ORDER BY version DESC + LIMIT 1"#, + workspace_id, + asset_kind as AssetKind, + asset_path, + ) + .fetch_optional(&mut **tx) + .await?; + + let columns_json = Json(columns.to_vec()); + let next_version = match latest { + Some(latest) if latest.columns.0.as_slice() == columns => { + // Unchanged schema — re-affirm the latest version in place. + sqlx::query!( + "UPDATE materialized_asset_schema + SET snapshot_id = $5, job_id = $6, captured_at = now() + WHERE workspace_id = $1 AND asset_kind = $2 AND asset_path = $3 + AND version = $4", + workspace_id, + asset_kind as AssetKind, + asset_path, + latest.version, + snapshot_id, + job_id, + ) + .execute(&mut **tx) + .await?; + return Ok(false); + } + Some(latest) => latest.version + 1, + None => 1, + }; + sqlx::query!( + "INSERT INTO materialized_asset_schema + (workspace_id, asset_kind, asset_path, version, columns, + snapshot_id, job_id, captured_at) + VALUES ($1, $2, $3, $4, $5, $6, $7, now())", + workspace_id, + asset_kind as AssetKind, + asset_path, + next_version, + columns_json as Json>, + snapshot_id, + job_id, + ) + .execute(&mut **tx) + .await?; + Ok(true) +} + +/// All captured schema versions for one asset, newest version first. +pub async fn list_asset_schemas<'e>( + executor: impl PgExecutor<'e>, + workspace_id: &str, + asset_kind: AssetKind, + asset_path: &str, +) -> Result> { + let rows = sqlx::query_as!( + AssetSchemaVersion, + r#"SELECT version, columns AS "columns: Json>", + snapshot_id, job_id, captured_at + FROM materialized_asset_schema + WHERE workspace_id = $1 AND asset_kind = $2 AND asset_path = $3 + ORDER BY version DESC"#, + workspace_id, + asset_kind as AssetKind, + asset_path, + ) + .fetch_all(executor) + .await?; + Ok(rows) +} diff --git a/backend/windmill-common/tests/asset_schema_capture.rs b/backend/windmill-common/tests/asset_schema_capture.rs new file mode 100644 index 0000000000000..f12ff3cc149ba --- /dev/null +++ b/backend/windmill-common/tests/asset_schema_capture.rs @@ -0,0 +1,105 @@ +/*! + * Tests the schema-capture versioning contract (gap #2a): + * `record_asset_schema` inserts a new `materialized_asset_schema` version only + * when the captured column set changes, re-affirms the latest row in place when + * it doesn't, and `list_asset_schemas` returns the evolution history newest + * first. + */ + +use sqlx::{Pool, Postgres}; +use windmill_common::assets::AssetKind; +use windmill_common::materialization::{list_asset_schemas, record_asset_schema, SchemaColumn}; + +const WS: &str = "test-workspace"; +const PATH: &str = "analytics/orders"; + +fn col(name: &str, ty: &str) -> SchemaColumn { + SchemaColumn { name: name.to_string(), data_type: ty.to_string() } +} + +async fn record(db: &Pool, cols: &[SchemaColumn], snapshot_id: i64) -> bool { + let mut tx = db.begin().await.expect("begin"); + let inserted = record_asset_schema( + &mut tx, + WS, + AssetKind::Ducklake, + PATH, + cols, + Some(snapshot_id), + None, + ) + .await + .expect("record asset schema"); + tx.commit().await.expect("commit"); + inserted +} + +#[sqlx::test(migrations = "../migrations", fixtures("base"))] +async fn first_capture_inserts_version_one(db: Pool) { + let cols = [col("order_id", "BIGINT"), col("status", "VARCHAR")]; + assert!( + record(&db, &cols, 10).await, + "first capture inserts a version" + ); + + let versions = list_asset_schemas(&db, WS, AssetKind::Ducklake, PATH) + .await + .unwrap(); + assert_eq!(versions.len(), 1); + assert_eq!(versions[0].version, 1); + assert_eq!(versions[0].snapshot_id, Some(10)); + assert_eq!(versions[0].columns.0, cols); +} + +#[sqlx::test(migrations = "../migrations", fixtures("base"))] +async fn unchanged_schema_reaffirms_without_new_version(db: Pool) { + let cols = [col("order_id", "BIGINT")]; + record(&db, &cols, 10).await; + // Identical column set on a later snapshot: no new version, but the latest + // row's snapshot_id advances. + assert!( + !record(&db, &cols, 20).await, + "unchanged schema must not insert a new version" + ); + + let versions = list_asset_schemas(&db, WS, AssetKind::Ducklake, PATH) + .await + .unwrap(); + assert_eq!(versions.len(), 1, "still a single version"); + assert_eq!(versions[0].version, 1); + assert_eq!(versions[0].snapshot_id, Some(20), "snapshot re-affirmed"); +} + +#[sqlx::test(migrations = "../migrations", fixtures("base"))] +async fn changed_schema_bumps_version_newest_first(db: Pool) { + record(&db, &[col("order_id", "BIGINT")], 10).await; + // A column added → schema changed → new version. + let evolved = [col("order_id", "BIGINT"), col("amount", "DOUBLE")]; + assert!(record(&db, &evolved, 20).await, "changed schema inserts v2"); + + let versions = list_asset_schemas(&db, WS, AssetKind::Ducklake, PATH) + .await + .unwrap(); + assert_eq!(versions.len(), 2); + // Newest first. + assert_eq!(versions[0].version, 2); + assert_eq!(versions[0].columns.0, evolved); + assert_eq!(versions[1].version, 1); +} + +#[sqlx::test(migrations = "../migrations", fixtures("base"))] +async fn column_order_change_is_a_new_version(db: Pool) { + let a = [col("a", "BIGINT"), col("b", "VARCHAR")]; + let reordered = [col("b", "VARCHAR"), col("a", "BIGINT")]; + record(&db, &a, 10).await; + // Same columns, different physical order: the captured list is ordered, so a + // reorder is a real schema change (downstream `SELECT *` consumers see it). + assert!( + record(&db, &reordered, 20).await, + "column reorder is a distinct schema version" + ); + let versions = list_asset_schemas(&db, WS, AssetKind::Ducklake, PATH) + .await + .unwrap(); + assert_eq!(versions.len(), 2); +} diff --git a/backend/windmill-worker/src/duckdb_executor.rs b/backend/windmill-worker/src/duckdb_executor.rs index 9ba0c4b9417ac..1574e83ee42b6 100644 --- a/backend/windmill-worker/src/duckdb_executor.rs +++ b/backend/windmill-worker/src/duckdb_executor.rs @@ -341,6 +341,43 @@ fn extract_data_tests(result: &RawValue) -> Vec { out } +// Pull the captured output schema out of the materialize summary's +// `output_schema` column (gap #2a): a list-of-struct `[{name, type}, …]` the +// codegen built from a `DESCRIBE`. Like `data_tests`, the FFI may surface it as +// a nested JSON array or a JSON string — accept both. Returns `None` when the +// column is absent (literal mode, manual mode, or capture failed) so the worker +// records the run without a schema rather than an empty one. +fn extract_schema( + result: &RawValue, +) -> Option> { + use windmill_common::materialization::SchemaColumn; + fn collect(v: &Value) -> Option> { + let Value::Array(arr) = v else { return None }; + let mut out = Vec::with_capacity(arr.len()); + for item in arr { + let o = item.as_object()?; + let name = o.get("name")?.as_str()?.to_string(); + let data_type = o.get("type")?.as_str()?.to_string(); + out.push(SchemaColumn { name, data_type }); + } + Some(out) + } + fn find_field(v: &Value) -> Option<&Value> { + match v { + Value::Object(o) => o.get("output_schema"), + Value::Array(a) => a.iter().find_map(find_field), + _ => None, + } + } + let root = serde_json::from_str::(result.get()).ok()?; + match find_field(&root)? { + arr @ Value::Array(_) => collect(arr), + // FFI serialized the list-of-struct as a JSON string — parse it. + Value::String(s) => collect(&serde_json::from_str::(s).ok()?), + _ => None, + } +} + // Render the full pass/fail breakdown for a failed data-test run — every test, // not just the first failure, so the user sees the whole picture in one place. fn format_data_test_breakdown(asset_path: &str, tests: &[DataTestOutcome]) -> String { @@ -371,6 +408,9 @@ async fn record_mat( status: windmill_common::materialization::MaterializationStatus, snapshot_id: Option, row_count: Option, + // Captured output schema (gap #2a). Only set on a successful materialize; + // when present, also upserts a `materialized_asset_schema` version. + schema: Option>, error: Option<&str>, ) { let req = windmill_common::materialization::RecordMaterializationRequest { @@ -382,22 +422,44 @@ async fn record_mat( row_count, job_id: Some(job_id), error: error.map(|e| e.to_string()), + schema: schema.clone(), }; let res: anyhow::Result<()> = match conn { - Connection::Sql(db) => windmill_common::materialization::record_materialization( - db, - w_id, - req.asset_kind, - &req.asset_path, - &req.partition, - req.status, - req.snapshot_id, - req.row_count, - req.job_id, - req.error.as_deref(), - ) - .await - .map_err(|e| anyhow::anyhow!("{e:#}")), + Connection::Sql(db) => { + let partition_res = windmill_common::materialization::record_materialization( + db, + w_id, + req.asset_kind, + &req.asset_path, + &req.partition, + req.status, + req.snapshot_id, + req.row_count, + req.job_id, + req.error.as_deref(), + ) + .await + .map_err(|e| anyhow::anyhow!("{e:#}")); + // Schema capture is a separate, independently best-effort write (its + // own transaction for the per-asset advisory lock); a failure here + // must not lose the partition row above. + if let Some(cols) = schema.as_ref() { + if let Err(e) = record_asset_schema_best_effort( + db, + w_id, + meta.asset_kind, + &meta.asset_path, + cols, + snapshot_id, + job_id, + ) + .await + { + tracing::warn!("failed to record captured asset schema: {e:#}"); + } + } + partition_res + } Connection::Http(client) => { crate::agent_workers::record_materialization_from_agent_http(client, w_id, &req).await } @@ -407,6 +469,33 @@ async fn record_mat( } } +// Open a short transaction (needed for the per-asset advisory lock) and upsert +// the captured schema version. Isolated so its tx lifetime doesn't entangle the +// partition write. +async fn record_asset_schema_best_effort( + db: &windmill_common::DB, + w_id: &str, + asset_kind: windmill_common::assets::AssetKind, + asset_path: &str, + columns: &[windmill_common::materialization::SchemaColumn], + snapshot_id: Option, + job_id: Uuid, +) -> anyhow::Result<()> { + let mut tx = db.begin().await?; + windmill_common::materialization::record_asset_schema( + &mut tx, + w_id, + asset_kind, + asset_path, + columns, + snapshot_id, + Some(job_id), + ) + .await?; + tx.commit().await?; + Ok(()) +} + pub async fn do_duckdb( job: &MiniPulledJob, client: &AuthedClient, @@ -630,6 +719,7 @@ pub async fn do_duckdb( windmill_common::materialization::MaterializationStatus::Failed, None, None, + None, Some(&e.to_string()), ) .await; @@ -656,6 +746,10 @@ pub async fn do_duckdb( // cascade stops. The error lists *every* test so the user sees the // whole picture, not just the first failure. let tests = extract_data_tests(&result); + // Captured output schema (gap #2a) — recorded only on the successful + // path below, not on the failure paths (a failed run shouldn't + // advance the asset's recorded schema version). + let schema = extract_schema(&result); // Defense-in-depth: codegen embedded `n_data_tests` checks, so the // summary row must carry that many outcomes. Recovering fewer means // the `data_tests` column was dropped/reshaped before we read it — @@ -676,6 +770,7 @@ pub async fn do_duckdb( windmill_common::materialization::MaterializationStatus::Failed, snapshot_id, row_count, + None, Some(&msg), ) .await; @@ -691,6 +786,7 @@ pub async fn do_duckdb( windmill_common::materialization::MaterializationStatus::Failed, snapshot_id, row_count, + None, Some(&breakdown), ) .await; @@ -704,6 +800,7 @@ pub async fn do_duckdb( windmill_common::materialization::MaterializationStatus::Materialized, snapshot_id, row_count, + schema, None, ) .await; @@ -1806,6 +1903,31 @@ mod tests { assert!(extract_data_tests(&raw(r#"[{"rows":3}]"#)).is_empty()); } + #[test] + fn extract_schema_parses_nested_and_string_encoded() { + // Real shape: the summary row carries a nested `output_schema` + // list-of-struct from the DESCRIBE fold. + let r = raw( + r#"[{"materialized":"ducklake://a/b","rows":3,"snapshot_id":17, + "output_schema":[{"name":"order_id","type":"BIGINT"}, + {"name":"status","type":"VARCHAR"}]}]"#, + ); + let cols = extract_schema(&r).expect("schema present"); + assert_eq!(cols.len(), 2); + assert_eq!(cols[0].name, "order_id"); + assert_eq!(cols[0].data_type, "BIGINT"); + assert_eq!(cols[1].name, "status"); + assert_eq!(cols[1].data_type, "VARCHAR"); + // Fallback: FFI serialised the list-of-struct as a JSON string. + let s = raw(r#"{"output_schema":"[{\"name\":\"x\",\"type\":\"INTEGER\"}]"}"#); + let cols = extract_schema(&s).expect("schema present"); + assert_eq!(cols.len(), 1); + assert_eq!(cols[0].name, "x"); + assert_eq!(cols[0].data_type, "INTEGER"); + // Absent column (literal/manual mode) -> None, no panic. + assert!(extract_schema(&raw(r#"[{"rows":3}]"#)).is_none()); + } + #[test] fn format_data_test_breakdown_lists_all_with_marks() { let tests = vec![ From 996b6d933bc7ecbd8edb48e50c7065aed1acf660 Mon Sep 17 00:00:00 2001 From: Ruben Fiszel Date: Fri, 26 Jun 2026 13:11:12 +0000 Subject: [PATCH 2/6] fix: address CI review on schema capture (partition col, order, status gate) - exclude the synthetic `_wm_partition` column from the captured schema for partitioned assets, so the recorded contract is the producer's logical output, not Windmill's storage detail (claude/cubic P1). - make the captured column list explicitly ordered (`row_number()` over the DESCRIBE + `list(... ORDER BY)`), so the `list()` aggregate can't reorder columns and spuriously bump the schema version (cubic P2). - gate the API `record_materialization` schema upsert on a `Materialized` status, so a failed/running write (or a client attaching a schema to one) can't advance the schema history (cubic P2). Co-Authored-By: Claude Opus 4.8 --- .../windmill-parser/src/sql_materialize.rs | 60 +++++++++++++++---- backend/windmill-api-assets/src/lib.rs | 10 +++- 2 files changed, 58 insertions(+), 12 deletions(-) diff --git a/backend/parsers/windmill-parser/src/sql_materialize.rs b/backend/parsers/windmill-parser/src/sql_materialize.rs index ccdf36653e963..7a3c72bdce7bd 100644 --- a/backend/parsers/windmill-parser/src/sql_materialize.rs +++ b/backend/parsers/windmill-parser/src/sql_materialize.rs @@ -553,14 +553,33 @@ pub fn materialize_result_sql( }; // Capture the materialized output schema (gap #2a) in the same summary row — // no extra round-trip. `DESCRIBE SELECT * FROM ` yields one row per - // column (`column_name`, `column_type`); fold them into an ordered - // list-of-struct the worker reads back and persists as asset metadata. The - // write just committed, so the latest snapshot (no `AT (VERSION)` needed) is - // exactly the slice recorded in `snapshot_id`. `list(...)` preserves the - // DESCRIBE row order (column order); the read is a single small row. + // column (`column_name`, `column_type`); fold them into a list-of-struct the + // worker reads back and persists as asset metadata. The write just + // committed, so the latest snapshot (no `AT (VERSION)` needed) is exactly the + // slice recorded in `snapshot_id`. + // + // Two correctness details: + // - `_wm_ord` (a `row_number()` over the DESCRIBE) is captured so the + // list-of-struct is ordered *explicitly* (`list(... ORDER BY _wm_ord)`). + // DESCRIBE returns columns in physical order; without the explicit ORDER + // the `list()` aggregate could reorder them and spuriously bump the schema + // version on a re-materialize. + // - For a `// partitioned` asset the physical table carries the synthetic + // `_wm_partition` column; it must be filtered out so the recorded schema is + // the producer's logical output, not Windmill's storage detail (this is the + // grain #2b contract enforcement reads back). + let partition_filter = if partitioned { + format!( + " WHERE column_name <> '{}'", + partition_col.replace('\'', "''") + ) + } else { + String::new() + }; let schema_capture = format!( - "(SELECT list({{'name': column_name, 'type': column_type}}) \ - FROM (DESCRIBE SELECT * FROM {target_qualified})) AS output_schema" + "(SELECT list({{'name': column_name, 'type': column_type}} ORDER BY _wm_ord) \ + FROM (SELECT column_name, column_type, row_number() OVER () AS _wm_ord \ + FROM (DESCRIBE SELECT * FROM {target_qualified}){partition_filter})) AS output_schema" ); let base_cols = format!( "'ducklake://{asset_path}' AS materialized, \ @@ -1378,12 +1397,33 @@ mod tests { ); assert!(plain.starts_with("SELECT 'ducklake://analytics/orders' AS materialized")); assert!(!plain.contains("data_tests")); - // Schema capture (gap #2a) is in every summary, tests or not. + // Schema capture (gap #2a) is in every summary, tests or not. Unpartitioned + // → explicit ordering, no partition-column filter. for s in [&sql, &plain] { assert!(s.contains( - "(SELECT list({'name': column_name, 'type': column_type}) \ - FROM (DESCRIBE SELECT * FROM _wm_target.orders)) AS output_schema" + "(SELECT list({'name': column_name, 'type': column_type} ORDER BY _wm_ord) \ + FROM (SELECT column_name, column_type, row_number() OVER () AS _wm_ord \ + FROM (DESCRIBE SELECT * FROM _wm_target.orders))) AS output_schema" )); + assert!(!s.contains("WHERE column_name <>")); } } + + #[test] + fn materialize_result_sql_schema_excludes_partition_column() { + // Partitioned → the synthetic `_wm_partition` column is filtered out so + // the captured schema is the producer's logical output only. + let sql = materialize_result_sql( + "_wm_target.orders_daily", + "analytics/orders_daily", + "_wm_partition", + "'2026-06-19'", + true, + &[], + ); + assert!(sql.contains( + "FROM (DESCRIBE SELECT * FROM _wm_target.orders_daily) \ + WHERE column_name <> '_wm_partition')) AS output_schema" + )); + } } diff --git a/backend/windmill-api-assets/src/lib.rs b/backend/windmill-api-assets/src/lib.rs index da9aa9a9283c7..77edf6b82f676 100644 --- a/backend/windmill-api-assets/src/lib.rs +++ b/backend/windmill-api-assets/src/lib.rs @@ -105,8 +105,14 @@ async fn record_materialization( // Schema capture is independently best-effort (its own transaction for the // per-asset advisory lock) and must never roll back the partition record // above — mirroring the worker's `record_mat`. A lost schema version - // degrades the history, not the run. - if let Some(columns) = req.schema.as_ref() { + // degrades the history, not the run. Only a successful (`Materialized`) write + // advances the recorded schema — a failed/running write must not (and a + // client shouldn't be able to bump the history by attaching a schema to one). + let is_materialized = matches!( + req.status, + windmill_common::materialization::MaterializationStatus::Materialized + ); + if let (true, Some(columns)) = (is_materialized, req.schema.as_ref()) { let res: windmill_common::error::Result<()> = async { let mut tx = user_db.clone().begin(&authed).await?; windmill_common::materialization::record_asset_schema( From 356aff3170b36526036741266081148f07ae6dbb Mon Sep 17 00:00:00 2001 From: Ruben Fiszel Date: Fri, 26 Jun 2026 13:41:55 +0000 Subject: [PATCH 3/6] fix: address Codex review (manual-mode schema gate + auth contract docs) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - gate output_schema extraction on the managed (`Some((Some(_), _))`) path so a `// materialize manual` run — whose result is the user's own query output — can't persist a caller-shaped `output_schema` into materialized_asset_schema (Codex P2). Verified e2e: a manual run returning a fabricated `output_schema:[{injected,EVIL}]` records the partition but writes no schema version, while the managed path still captures normally. - document the authorization contract on the new public `record_asset_schema` and `list_asset_schemas` helpers: they perform no access control (mirroring the materialized_partition siblings) and require callers to pass a workspace-authorized executor (Codex P1). Co-Authored-By: Claude Opus 4.8 --- backend/windmill-common/src/materialization.rs | 12 ++++++++++++ backend/windmill-worker/src/duckdb_executor.rs | 13 +++++++++++-- 2 files changed, 23 insertions(+), 2 deletions(-) diff --git a/backend/windmill-common/src/materialization.rs b/backend/windmill-common/src/materialization.rs index c2ef18287783d..8d9e31899f834 100644 --- a/backend/windmill-common/src/materialization.rs +++ b/backend/windmill-common/src/materialization.rs @@ -166,6 +166,13 @@ pub struct AssetSchemaVersion { /// Record the captured output schema of a freshly-materialized asset. /// +/// **Authorization:** like the sibling `record_materialization`, this performs +/// no access control of its own — it writes the row for whatever `workspace_id` +/// it is given. Callers MUST pass a workspace-authorized executor and a +/// `workspace_id` the caller is allowed to write: an RLS-scoped `user_db` +/// transaction for API / agent-worker entry points, or the trusted worker DB +/// pool for the in-worker recorder. Do not expose it to an unauthenticated path. +/// /// Versioning across re-materializations: a new `version` row is inserted only /// when `columns` differs from the latest stored version; an unchanged /// re-materialize re-affirms the latest row in place (updates its @@ -249,6 +256,11 @@ pub async fn record_asset_schema( } /// All captured schema versions for one asset, newest version first. +/// +/// **Authorization:** performs no access control (mirrors +/// `list_materialized_partitions`); the caller must pass a workspace-authorized +/// executor (an RLS-scoped `user_db` transaction on the API read path) and a +/// `workspace_id` it is allowed to read. pub async fn list_asset_schemas<'e>( executor: impl PgExecutor<'e>, workspace_id: &str, diff --git a/backend/windmill-worker/src/duckdb_executor.rs b/backend/windmill-worker/src/duckdb_executor.rs index 1574e83ee42b6..e4b94acabf230 100644 --- a/backend/windmill-worker/src/duckdb_executor.rs +++ b/backend/windmill-worker/src/duckdb_executor.rs @@ -748,8 +748,17 @@ pub async fn do_duckdb( let tests = extract_data_tests(&result); // Captured output schema (gap #2a) — recorded only on the successful // path below, not on the failure paths (a failed run shouldn't - // advance the asset's recorded schema version). - let schema = extract_schema(&result); + // advance the asset's recorded schema version). Managed mode ONLY: + // in `// materialize manual` the result is the user's own query + // output (we generate no summary), so an `output_schema` field there + // is caller-shaped and must not be trusted — `materialize` is + // `Some((Some(_), _))` for managed, `Some((None, _))` for manual. + let is_managed = matches!(&materialize, Some((Some(_), _))); + let schema = if is_managed { + extract_schema(&result) + } else { + None + }; // Defense-in-depth: codegen embedded `n_data_tests` checks, so the // summary row must carry that many outcomes. Recovering fewer means // the `data_tests` column was dropped/reshaped before we read it — From b985650035fb2e4265322f12c7fc935714fd49f8 Mon Sep 17 00:00:00 2001 From: Ruben Fiszel Date: Fri, 26 Jun 2026 17:15:20 +0000 Subject: [PATCH 4/6] feat(frontend): schema-history tab on the ducklake asset node (#2a) Adds a "Schema" tab to DucklakeAssetPanel surfacing the captured output-schema versions persisted by the materialize run. Master-detail (mirrors the History tab): the version list (newest first, newest auto-selected) shows column count + snapshot + capture time; selecting a version renders its column/type table. Reads the GET /assets/asset_schemas endpoint via raw fetch, matching the sibling PartitionStatusGrid convention (these materialization endpoints are not in the generated client). Co-Authored-By: Claude Opus 4.8 --- .../AssetGraph/DucklakeAssetPanel.svelte | 56 +++---- .../AssetGraph/SchemaHistoryPanel.svelte | 142 ++++++++++++++++++ 2 files changed, 172 insertions(+), 26 deletions(-) create mode 100644 frontend/src/lib/components/assets/AssetGraph/SchemaHistoryPanel.svelte diff --git a/frontend/src/lib/components/assets/AssetGraph/DucklakeAssetPanel.svelte b/frontend/src/lib/components/assets/AssetGraph/DucklakeAssetPanel.svelte index 1e826b4060127..bd1e3aae26901 100644 --- a/frontend/src/lib/components/assets/AssetGraph/DucklakeAssetPanel.svelte +++ b/frontend/src/lib/components/assets/AssetGraph/DucklakeAssetPanel.svelte @@ -11,7 +11,7 @@ // can't be read), so a selectable version always exists in the table. import ToggleButtonGroup from '$lib/components/common/toggleButton-v2/ToggleButtonGroup.svelte' import ToggleButton from '$lib/components/common/toggleButton-v2/ToggleButton.svelte' - import { Table2, History } from 'lucide-svelte' + import { Table2, History, Columns3 } from 'lucide-svelte' import { Pane, Splitpanes } from 'svelte-splitpanes' import { resource } from 'runed' import { fetchDucklakeSnapshots } from '$lib/components/dbOps' @@ -19,6 +19,7 @@ import PartitionStatusGrid from './PartitionStatusGrid.svelte' import DucklakeSnapshotHistory from './DucklakeSnapshotHistory.svelte' import DucklakeVersionPreview from './DucklakeVersionPreview.svelte' + import SchemaHistoryPanel from './SchemaHistoryPanel.svelte' interface Props { // The materialized ducklake asset path (`/`). @@ -27,7 +28,7 @@ } let { path, workspace }: Props = $props() - let tab = $state<'partitions' | 'history'>('partitions') + let tab = $state<'partitions' | 'history' | 'schema'>('partitions') // The user's explicit snapshot pick (undefined until they click a row). let selectedVersion = $state(undefined) @@ -78,6 +79,7 @@ (tab = e.detail)}> {#snippet children({ item })} + {/snippet} @@ -86,31 +88,33 @@
{#if tab === 'partitions'} + {:else if tab === 'schema'} + {:else} -
- - - snapshots.refetch()} - selectedVersion={effectiveVersion} - onSelect={(v) => (selectedVersion = v)} - /> - - -
- + + + snapshots.refetch()} + selectedVersion={effectiveVersion} + onSelect={(v) => (selectedVersion = v)} /> -
-
-
-
- {/if} -
+ + +
+ +
+
+ + + {/if} + {/if} diff --git a/frontend/src/lib/components/assets/AssetGraph/SchemaHistoryPanel.svelte b/frontend/src/lib/components/assets/AssetGraph/SchemaHistoryPanel.svelte new file mode 100644 index 0000000000000..df990c19b3047 --- /dev/null +++ b/frontend/src/lib/components/assets/AssetGraph/SchemaHistoryPanel.svelte @@ -0,0 +1,142 @@ + + +
+
+ Captured schema +
+ +
+ {#if schemas.loading} +
+ Loading schema… +
+ {:else if schemas.error} +

Failed to load: {schemas.error.message}

+ {:else if !schemas.current?.length} +

+ No schema captured yet. The output schema is recorded automatically after a // materialize run. +

+ {:else} +
+ + +
+ {#each schemas.current as s, i (s.version)} + + {/each} +
+
+ +
+ {#if selected} +
+ + + + + + + + {#each selected.columns as col (col.name)} + + + + + {/each} + +
ColumnType
{col.name}{col.type}
+ {/if} + + + + + {/if} + + From 2938cc234151e34b6008944c755a20a054e606ee Mon Sep 17 00:00:00 2001 From: Ruben Fiszel Date: Fri, 26 Jun 2026 17:38:05 +0000 Subject: [PATCH 5/6] feat: schema tab is strategy-aware (history vs fixed schema) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Only a whole-table `replace` producer (CREATE OR REPLACE) can change columns run-to-run; `append`/`merge`/partitioned writes INSERT into a fixed-schema table, so their schema is pinned at first materialize and the "history" framing is degenerate (always one version). - backend: surface the managed `materialize_strategy` (`replace`/`append`/ `merge`) on the asset-graph runnable node, alongside the existing `partition_kind` (same parse-from-annotation path). - frontend: the pipeline page derives `schemaCanEvolve` for the selected asset from its write-producer (`replace` && not partitioned) and threads it to the Schema tab. Evolvable → master-detail version history; fixed → a single current-schema table with a short "schema is fixed" note. Unknown defaults to evolvable so real history is never hidden. Co-Authored-By: Claude Opus 4.8 --- backend/windmill-api-assets/src/lib.rs | 18 +++++ .../AssetGraph/AssetGraphDetailsPane.svelte | 7 +- .../AssetGraph/DucklakeAssetPanel.svelte | 7 +- .../AssetGraph/SchemaHistoryPanel.svelte | 77 +++++++++++++------ .../lib/components/assets/AssetGraph/types.ts | 5 ++ .../(logged)/pipeline/[folder]/+page.svelte | 17 ++++ 6 files changed, 104 insertions(+), 27 deletions(-) diff --git a/backend/windmill-api-assets/src/lib.rs b/backend/windmill-api-assets/src/lib.rs index 77edf6b82f676..d25bb4db5fc6e 100644 --- a/backend/windmill-api-assets/src/lib.rs +++ b/backend/windmill-api-assets/src/lib.rs @@ -519,6 +519,13 @@ struct GraphRunnableNode { retry: Option, #[serde(skip_serializing_if = "Vec::is_empty", default)] data_tests: Vec, + // Managed `// materialize` write strategy (`replace` | `append` | `merge`), + // absent for non-materializing or `manual` scripts. Surfaced so the asset + // panel can tell whether the captured schema can evolve: only whole-table + // `replace` (CREATE OR REPLACE) can change columns run-to-run; `append` / + // `merge` / any partitioned write INSERTs into a fixed-schema table. + #[serde(skip_serializing_if = "Option::is_none", default)] + materialize_strategy: Option, } // The partition's kind word for the node badge (the full PartitionSpec carries @@ -923,6 +930,17 @@ async fn asset_graph( tag: ann.and_then(|a| a.tag.clone()), retry: ann.and_then(|a| a.retry.clone()), data_tests: ann.map(|a| a.data_tests.clone()).unwrap_or_default(), + materialize_strategy: ann.and_then(|a| a.materialize.as_ref()).and_then(|m| { + if m.manual { + None + } else if m.append { + Some("append".to_string()) + } else if m.unique_key.is_some() { + Some("merge".to_string()) + } else { + Some("replace".to_string()) + } + }), path, usage_kind, } diff --git a/frontend/src/lib/components/assets/AssetGraph/AssetGraphDetailsPane.svelte b/frontend/src/lib/components/assets/AssetGraph/AssetGraphDetailsPane.svelte index f2e60100975b3..b69db2e04ff9a 100644 --- a/frontend/src/lib/components/assets/AssetGraph/AssetGraphDetailsPane.svelte +++ b/frontend/src/lib/components/assets/AssetGraph/AssetGraphDetailsPane.svelte @@ -114,6 +114,10 @@ path: string unsaved?: boolean }> + // Whether the selected ducklake asset's schema can evolve (whole-table + // `replace` producer). Forwarded to the Schema tab: version history when + // true, a single fixed-schema view when false. Defaults to true (unknown). + schemaCanEvolve?: boolean // Bumped by the parent after dispatching a run so the runs panel // re-fetches the listing immediately (rather than waiting on its // background poll tick). @@ -214,6 +218,7 @@ onScriptRenamed, onScriptRemoved, selectionProducers = [], + schemaCanEvolve = true, runsRefreshKey, runsPendingJobId, onRunCompleted, @@ -994,7 +999,7 @@ {#key selection.path} - + {/key} {:else}
diff --git a/frontend/src/lib/components/assets/AssetGraph/DucklakeAssetPanel.svelte b/frontend/src/lib/components/assets/AssetGraph/DucklakeAssetPanel.svelte index bd1e3aae26901..5f7ad050e4b52 100644 --- a/frontend/src/lib/components/assets/AssetGraph/DucklakeAssetPanel.svelte +++ b/frontend/src/lib/components/assets/AssetGraph/DucklakeAssetPanel.svelte @@ -25,8 +25,11 @@ // The materialized ducklake asset path (`/`). path: string workspace: string + // Whether this asset's schema can evolve (whole-table `replace` producer); + // drives the Schema tab between version history and a single fixed schema. + schemaCanEvolve?: boolean } - let { path, workspace }: Props = $props() + let { path, workspace, schemaCanEvolve = true }: Props = $props() let tab = $state<'partitions' | 'history' | 'schema'>('partitions') // The user's explicit snapshot pick (undefined until they click a row). @@ -89,7 +92,7 @@ {#if tab === 'partitions'} {:else if tab === 'schema'} - + {:else}
diff --git a/frontend/src/lib/components/assets/AssetGraph/SchemaHistoryPanel.svelte b/frontend/src/lib/components/assets/AssetGraph/SchemaHistoryPanel.svelte index df990c19b3047..fbccccf860bad 100644 --- a/frontend/src/lib/components/assets/AssetGraph/SchemaHistoryPanel.svelte +++ b/frontend/src/lib/components/assets/AssetGraph/SchemaHistoryPanel.svelte @@ -1,22 +1,30 @@ +{#snippet columnsTable(cols: SchemaColumn[])} +
+ + + + + + + + {#each cols as col (col.name)} + + + + + {/each} + +
ColumnType
{col.name}{col.type}
+{/snippet} +
Captured schema @@ -80,6 +107,23 @@ class="font-mono">// materialize run.

+ {:else if !canEvolve} + +
+
+ + This asset's schema is fixed — an append / merge / partitioned materialize INSERTs into + a fixed-schema table, so the columns can't change run-to-run. +
+ {#if selected} + {@render columnsTable(selected.columns)} + {/if} +
{:else}
@@ -116,22 +160,7 @@
{#if selected} - - - - - - - - - {#each selected.columns as col (col.name)} - - - - - {/each} - -
ColumnType
{col.name}{col.type}
+ {@render columnsTable(selected.columns)} {/if}
diff --git a/frontend/src/lib/components/assets/AssetGraph/types.ts b/frontend/src/lib/components/assets/AssetGraph/types.ts index 4a5d16d693ea7..c942d289349e4 100644 --- a/frontend/src/lib/components/assets/AssetGraph/types.ts +++ b/frontend/src/lib/components/assets/AssetGraph/types.ts @@ -33,6 +33,11 @@ export interface AssetGraphRunnableNode { // asset. Surfaced as a count badge (with a per-test breakdown in the title) // so test coverage is visible on the node without opening the pane. data_tests?: DataTest[] + // Managed `// materialize` write strategy. Absent for non-materializing or + // `manual` scripts. Used (with `partition_kind`) to decide whether a + // produced asset's schema can evolve: only whole-table `replace` can, since + // `append`/`merge`/partitioned writes INSERT into a fixed-schema table. + materialize_strategy?: 'replace' | 'append' | 'merge' // Synthesized by the page from a local draft; the script doesn't exist // in the DB yet. Drives a dashed/lower-opacity rendering to mirror how // unsaved triggers are styled — visually distinct from persisted nodes. diff --git a/frontend/src/routes/(root)/(logged)/pipeline/[folder]/+page.svelte b/frontend/src/routes/(root)/(logged)/pipeline/[folder]/+page.svelte index e7f52ddbd484a..3f462164e0212 100644 --- a/frontend/src/routes/(root)/(logged)/pipeline/[folder]/+page.svelte +++ b/frontend/src/routes/(root)/(logged)/pipeline/[folder]/+page.svelte @@ -1958,6 +1958,22 @@ .map((e) => ({ kind: e.runnable_kind, path: e.runnable_path, unsaved: e.unsaved })) }) + // Whether the selected ducklake asset's captured schema can *evolve* (drives + // the asset panel's Schema tab: version history vs. a single fixed schema). + // Only a whole-table `replace` producer (CREATE OR REPLACE) can change + // columns run-to-run; `append`/`merge`/partitioned writes INSERT into a + // fixed-schema table, so their schema is pinned at first materialize. Unknown + // (no producer node, e.g. a draft) defaults to evolvable so we never hide + // real history. + let schemaCanEvolve = $derived.by(() => { + const sel = selection + if (!sel || sel.kind !== 'asset' || sel.asset_kind !== 'ducklake') return true + const producerPaths = new Set(selectionProducers.map((p) => p.path)) + const producers = graphWithDraft.runnables.filter((r) => producerPaths.has(r.path)) + if (producers.length === 0) return true + return producers.some((r) => r.materialize_strategy === 'replace' && !r.partition_kind) + }) + // Downstream subscriber count for the currently-edited script. Drives // the Test button's cascade UX: when > 0, ScriptEditor renders a split // button exposing "just this step" (default, with `_wmill_skip_asset_dispatch`) @@ -2531,6 +2547,7 @@ onRunByPath={runByPathLegit} selection={activeDraft ? undefined : selection} selectionProducers={activeDraft ? [] : selectionProducers} + {schemaCanEvolve} {runsRefreshKey} {runsPendingJobId} {activeRunnable} From 6e5712894ca432da041ef401f0b6bac5bc317634 Mon Sep 17 00:00:00 2001 From: Ruben Fiszel Date: Fri, 26 Jun 2026 17:46:17 +0000 Subject: [PATCH 6/6] fix: schemaCanEvolve fails open on unknown producer strategy MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Previously a producer present but missing `materialize_strategy` (e.g. a draft-overlay runnable, synthesized without the field) fell through to canEvolve=false, hiding captured history behind the fixed-schema view — contradicting the "unknown defaults to evolvable" intent. Now the fixed view shows only when *every* producer is a known insert-style write (append/merge, or partitioned replace); any producer with unknown (missing) strategy is treated as evolvable, so real history is never hidden. Co-Authored-By: Claude Opus 4.8 --- .../(logged)/pipeline/[folder]/+page.svelte | 15 ++++++++++----- 1 file changed, 10 insertions(+), 5 deletions(-) diff --git a/frontend/src/routes/(root)/(logged)/pipeline/[folder]/+page.svelte b/frontend/src/routes/(root)/(logged)/pipeline/[folder]/+page.svelte index 3f462164e0212..3548ba60179b0 100644 --- a/frontend/src/routes/(root)/(logged)/pipeline/[folder]/+page.svelte +++ b/frontend/src/routes/(root)/(logged)/pipeline/[folder]/+page.svelte @@ -1962,16 +1962,21 @@ // the asset panel's Schema tab: version history vs. a single fixed schema). // Only a whole-table `replace` producer (CREATE OR REPLACE) can change // columns run-to-run; `append`/`merge`/partitioned writes INSERT into a - // fixed-schema table, so their schema is pinned at first materialize. Unknown - // (no producer node, e.g. a draft) defaults to evolvable so we never hide - // real history. + // fixed-schema table, so their schema is pinned at first materialize. + // + // Fail open: show the fixed view only when we're *sure* — every producer is a + // known insert-style write. A producer with no `materialize_strategy` + // metadata (e.g. a draft-overlay runnable, which the graph synthesizes + // without it) is treated as unknown → evolvable, so captured history is never + // hidden behind a stale "fixed" verdict. let schemaCanEvolve = $derived.by(() => { const sel = selection if (!sel || sel.kind !== 'asset' || sel.asset_kind !== 'ducklake') return true const producerPaths = new Set(selectionProducers.map((p) => p.path)) const producers = graphWithDraft.runnables.filter((r) => producerPaths.has(r.path)) - if (producers.length === 0) return true - return producers.some((r) => r.materialize_strategy === 'replace' && !r.partition_kind) + const knownFixed = (r: (typeof producers)[number]) => + !!r.materialize_strategy && !(r.materialize_strategy === 'replace' && !r.partition_kind) + return producers.length === 0 || !producers.every(knownFixed) }) // Downstream subscriber count for the currently-edited script. Drives