Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions backend/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
DROP TABLE IF EXISTS materialized_asset_schema;
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
-- Captured output schema of a managed `// materialize` asset (gap #2a).
-- After a managed materialize commits, the worker DESCRIBEs the written table
-- and records its column list here as asset-level metadata. Schema is a
-- property of the asset/table, not of a partition slice, so it lives in its own
-- table keyed by (workspace, asset_kind, asset_path) rather than as a column on
-- materialized_partition (which would duplicate the identical schema across
-- every partition row). This is the producer-side capture that #2b (save-time
-- consumer-ref contract enforcement) reads back.
--
-- Versioning across re-materializations: a new `version` row is inserted only
-- when the captured column set differs from the latest stored version for the
-- asset; an unchanged re-materialize re-affirms the latest row in place. So the
-- table is a compact schema-evolution history and MAX(version) is the current
-- contract.
CREATE TABLE IF NOT EXISTS materialized_asset_schema (
workspace_id VARCHAR(50) NOT NULL REFERENCES workspace(id) ON DELETE CASCADE ON UPDATE CASCADE,
asset_kind ASSET_KIND NOT NULL,
asset_path VARCHAR(255) NOT NULL,
-- Monotonic per (workspace, asset_kind, asset_path), starting at 1; only
-- bumped when the schema actually changes.
version BIGINT NOT NULL,
-- The captured columns, ordered as the table presents them:
-- [{"name": "...", "type": "..."}, ...].
columns JSONB NOT NULL,
-- DuckLake snapshot the schema was captured from (NULL for non-ducklake /
-- substrates without snapshots).
snapshot_id BIGINT,
job_id UUID,
captured_at TIMESTAMPTZ NOT NULL DEFAULT now(),
PRIMARY KEY (workspace_id, asset_kind, asset_path, version)
);

-- Default privileges (migration 20250205131523) only apply to objects created
-- by the role that set them, so grant explicitly — the API reads/writes this
-- table as the invoking role (same fix as script_trigger in 20260619112847).
GRANT ALL ON materialized_asset_schema TO windmill_user;
GRANT ALL ON materialized_asset_schema TO windmill_admin;
61 changes: 60 additions & 1 deletion backend/parsers/windmill-parser/src/sql_materialize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -551,10 +551,41 @@ pub fn materialize_result_sql(
String::new(),
)
};
// Capture the materialized output schema (gap #2a) in the same summary row —
// no extra round-trip. `DESCRIBE SELECT * FROM <target>` yields one row per
// column (`column_name`, `column_type`); fold them into a list-of-struct the
// worker reads back and persists as asset metadata. The write just
// committed, so the latest snapshot (no `AT (VERSION)` needed) is exactly the
// slice recorded in `snapshot_id`.
//
// Two correctness details:
// - `_wm_ord` (a `row_number()` over the DESCRIBE) is captured so the
// list-of-struct is ordered *explicitly* (`list(... ORDER BY _wm_ord)`).
// DESCRIBE returns columns in physical order; without the explicit ORDER
// the `list()` aggregate could reorder them and spuriously bump the schema
// version on a re-materialize.
// - For a `// partitioned` asset the physical table carries the synthetic
// `_wm_partition` column; it must be filtered out so the recorded schema is
// the producer's logical output, not Windmill's storage detail (this is the
// grain #2b contract enforcement reads back).
let partition_filter = if partitioned {
format!(
" WHERE column_name <> '{}'",
partition_col.replace('\'', "''")
)
} else {
String::new()
};
let schema_capture = format!(
"(SELECT list({{'name': column_name, 'type': column_type}} ORDER BY _wm_ord) \
FROM (SELECT column_name, column_type, row_number() OVER () AS _wm_ord \
FROM (DESCRIBE SELECT * FROM {target_qualified}){partition_filter})) AS output_schema"
);
Comment on lines +579 to +583

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 — for partitioned assets, the captured output_schema includes the synthetic _wm_partition column.

This DESCRIBEs the target table ({target_qualified}), not the user's output SELECT. For a // partitioned materialize, the managed table is bootstrapped with the extra partition column:

CREATE TABLE IF NOT EXISTS {t} AS
  SELECT *, CAST(NULL AS VARCHAR) AS _wm_partition FROM ({sel}) WHERE false;
ALTER TABLE {t} SET PARTITIONED BY (_wm_partition);

So DESCRIBE SELECT * FROM {target_qualified} returns the producer's columns plus {"name":"_wm_partition","type":"VARCHAR"}, and every partitioned asset's recorded schema carries that windmill-internal column. Since this is exactly the contract #2b enforcement will read back (MAX(version) = current contract), a consumer that declares only the real producer columns would mismatch on the leaked _wm_partition.

Two options: DESCRIBE the user's output (DESCRIBE SELECT * FROM ({user_select})) instead of the physical table, or filter the partition column out of the captured list. The unpartitioned path is unaffected. Worth confirming this is intentional before it sets the grain #2b depends on.

Fix this →

let base_cols = format!(
"'ducklake://{asset_path}' AS materialized, \
{partition_sel}{count_expr} AS rows, \
(SELECT max(snapshot_id) FROM ducklake_snapshots('{TARGET_ALIAS}')) AS snapshot_id"
(SELECT max(snapshot_id) FROM ducklake_snapshots('{TARGET_ALIAS}')) AS snapshot_id, \
{schema_capture}"
);
if checks.is_empty() {
return format!("SELECT {base_cols};");
Expand Down Expand Up @@ -1366,5 +1397,33 @@ mod tests {
);
assert!(plain.starts_with("SELECT 'ducklake://analytics/orders' AS materialized"));
assert!(!plain.contains("data_tests"));
// Schema capture (gap #2a) is in every summary, tests or not. Unpartitioned
// → explicit ordering, no partition-column filter.
for s in [&sql, &plain] {
assert!(s.contains(
"(SELECT list({'name': column_name, 'type': column_type} ORDER BY _wm_ord) \
FROM (SELECT column_name, column_type, row_number() OVER () AS _wm_ord \
FROM (DESCRIBE SELECT * FROM _wm_target.orders))) AS output_schema"
));
assert!(!s.contains("WHERE column_name <>"));
}
}

#[test]
fn materialize_result_sql_schema_excludes_partition_column() {
// Partitioned → the synthetic `_wm_partition` column is filtered out so
// the captured schema is the producer's logical output only.
let sql = materialize_result_sql(
"_wm_target.orders_daily",
"analytics/orders_daily",
"_wm_partition",
"'2026-06-19'",
true,
&[],
);
assert!(sql.contains(
"FROM (DESCRIBE SELECT * FROM _wm_target.orders_daily) \
WHERE column_name <> '_wm_partition')) AS output_schema"
));
}
}
1 change: 1 addition & 0 deletions backend/windmill-api-assets/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,3 +16,4 @@ chrono.workspace = true
serde.workspace = true
serde_json.workspace = true
sqlx.workspace = true
tracing.workspace = true
Loading
Loading