diff --git a/backend/.sqlx/query-87ca1f2a34f54dade76f5a2cde5ecdac6806e1a306dca880943d97bb6d6a889d.json b/backend/.sqlx/query-87ca1f2a34f54dade76f5a2cde5ecdac6806e1a306dca880943d97bb6d6a889d.json new file mode 100644 index 0000000000000..ffce4491e374c --- /dev/null +++ b/backend/.sqlx/query-87ca1f2a34f54dade76f5a2cde5ecdac6806e1a306dca880943d97bb6d6a889d.json @@ -0,0 +1,67 @@ +{ + "db_name": "PostgreSQL", + "query": "\n SELECT DISTINCT ON (path) path AS \"path!\", content AS \"content!\",\n language AS \"language!: windmill_common::scripts::ScriptLang\"\n FROM script\n WHERE workspace_id = $1\n AND auto_kind = 'pipeline'\n AND archived = false\n AND deleted = false\n AND ($2::text IS NULL OR path LIKE $2)\n ORDER BY path, created_at DESC\n ", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "path!", + "type_info": "Varchar" + }, + { + "ordinal": 1, + "name": "content!", + "type_info": "Text" + }, + { + "ordinal": 2, + "name": "language!: windmill_common::scripts::ScriptLang", + "type_info": { + "Custom": { + "name": "script_lang", + "kind": { + "Enum": [ + "python3", + "deno", + "go", + "bash", + "postgresql", + "nativets", + "bun", + "mysql", + "bigquery", + "snowflake", + "graphql", + "powershell", + "mssql", + "php", + "bunnative", + "rust", + "ansible", + "csharp", + "oracledb", + "nu", + "java", + "duckdb", + "ruby", + "rlang" + ] + } + } + } + } + ], + "parameters": { + "Left": [ + "Text", + "Text" + ] + }, + "nullable": [ + false, + false, + false + ] + }, + "hash": "87ca1f2a34f54dade76f5a2cde5ecdac6806e1a306dca880943d97bb6d6a889d" +} diff --git a/backend/Cargo.lock b/backend/Cargo.lock index 9830bed379e0a..3f6b67f2396a3 100644 --- a/backend/Cargo.lock +++ b/backend/Cargo.lock @@ -14033,6 +14033,7 @@ dependencies = [ "tracing", "windmill-api-auth", "windmill-common", + "windmill-parser-sql-asset", ] [[package]] diff --git a/backend/parsers/windmill-parser-sql-asset/src/asset_parser.rs b/backend/parsers/windmill-parser-sql-asset/src/asset_parser.rs index 812bfa785d6d1..f7c578a274ebc 100644 --- a/backend/parsers/windmill-parser-sql-asset/src/asset_parser.rs +++ b/backend/parsers/windmill-parser-sql-asset/src/asset_parser.rs @@ -9,8 +9,9 @@ use sqlparser::{ parser::Parser, }; use windmill_parser::asset_parser::{ - asset_was_used, merge_assets, parse_asset_syntax, parse_pipeline_annotations, AssetKind, - AssetUsageAccessType, ParseAssetsOutput, ParseAssetsResult, + asset_was_used, merge_assets, merge_column_lineage, parse_asset_syntax, + parse_pipeline_annotations, AssetKind, AssetUsageAccessType, ColumnLineage, ColumnRef, + ParseAssetsOutput, ParseAssetsResult, }; use AssetUsageAccessType::*; @@ -33,7 +34,55 @@ pub fn parse_assets(input: &str) -> anyhow::Result { } } - let pipeline = parse_pipeline_annotations(input); + let mut pipeline = parse_pipeline_annotations(input); + // Scope inferred lineage to a single output asset so columns from an + // auxiliary CTAS aren't attributed to the materialized one (the flat list + // has no per-entry output on the wire). The `// materialize` target, when + // declared, IS the output: keep entries tagged with it plus untagged + // top-level-SELECT entries (which describe that target). Without a declared + // target, keep inference only when every tagged entry shares one output + // asset — otherwise it's ambiguous which asset the flat list describes, so + // drop it rather than show false dependencies. + let target = pipeline + .materialize + .as_ref() + .map(|m| (m.target_kind, m.target_path.clone())); + let inferred: Vec = match &target { + Some(t) => collector + .column_lineage + .into_iter() + .filter(|(out, _)| out.as_ref().map_or(true, |o| o == t)) + .map(|(_, cl)| cl) + .collect(), + None => { + let mut first: Option<&(AssetKind, String)> = None; + let mut ambiguous = false; + for (out, _) in &collector.column_lineage { + if let Some(o) = out { + match first { + None => first = Some(o), + Some(f) if f != o => { + ambiguous = true; + break; + } + _ => {} + } + } + } + if ambiguous { + Vec::new() + } else { + collector + .column_lineage + .into_iter() + .map(|(_, cl)| cl) + .collect() + } + } + }; + // Body-inferred column lineage, with `// column` annotations taking + // precedence per output column (explicit declaration overrides inference). + pipeline.column_lineage = merge_column_lineage(inferred, pipeline.column_lineage); Ok(ParseAssetsOutput::new( merge_assets(collector.assets), Vec::new(), @@ -54,6 +103,16 @@ struct AssetCollector { cte_name_stack: Vec>, // Locally created tables (not attached to an asset) local_table_names: HashSet, + // Inferred column-level lineage: one entry per output column of an + // output-producing query, mapping it to the upstream source columns its + // expression reads. Each is tagged with the *output asset* it belongs to — + // `Some((kind, path))` for a CTAS / CREATE VIEW into a real asset, `None` + // for a top-level managed-materialize SELECT (its output is the `// + // materialize` target, known only in `parse_assets`). `parse_assets` uses + // the tag to scope the flat list to a single output asset so columns from an + // auxiliary output don't get attributed to the materialized one. Best-effort: + // dynamic/raw SQL, INSERT…SELECT, and wildcards are left to annotations. + column_lineage: Vec<(Option<(AssetKind, String)>, ColumnLineage)>, } impl AssetCollector { @@ -65,15 +124,24 @@ impl AssetCollector { currently_used_asset: None, cte_name_stack: Vec::new(), local_table_names: HashSet::new(), + column_lineage: Vec::new(), } } - /// If the name resolves to an attached asset, record it. Otherwise, register it as a local - /// table/view so that subsequent references are not mistakenly attributed to the active asset. - fn track_table_definition(&mut self, name: &ObjectName) { - if let Some(asset) = self.get_associated_asset_from_obj_name(name, Some(W)) { - self.assets.push(asset); - } else if let Some(simple_name) = get_trivial_obj_name(name) { + /// Record a `CREATE TABLE`/`VIEW` target. A *temporary* table/view is always + /// local — even a one-part name under an active `USE dl`, which would + /// otherwise resolve to an asset (`ducklake://…/tmp`) and then leak as a + /// column source for later references. A non-temp name that resolves to an + /// attached asset is recorded as that asset; anything else is registered + /// local so subsequent references aren't attributed to the active asset. + fn track_table_definition(&mut self, name: &ObjectName, is_temporary: bool) { + if !is_temporary { + if let Some(asset) = self.get_associated_asset_from_obj_name(name, Some(W)) { + self.assets.push(asset); + return; + } + } + if let Some(simple_name) = get_trivial_obj_name(name) { self.local_table_names.insert(simple_name.to_lowercase()); } } @@ -280,6 +348,22 @@ impl AssetCollector { } } + // Infer the output-column lineage of a query that produces an asset, tagging + // each entry with its `output` asset. Called only for an output-producing + // query — a top-level managed-materialize SELECT (`output: None`, resolved + // to the `// materialize` target later) or a CTAS / CREATE VIEW into a real + // asset (`output: Some`). A CTAS into a local/temp staging table is never + // an output, so it's simply not passed here. + fn infer_query_output( + &mut self, + query: &sqlparser::ast::Query, + output: Option<(AssetKind, String)>, + ) { + if let Some(select) = query.body.as_select() { + self.infer_column_lineage(&select.projection, &select.from, output); + } + } + fn handle_table_with_joins( &mut self, table_with_joins: &sqlparser::ast::TableWithJoins, @@ -302,17 +386,57 @@ impl AssetCollector { } } - // Extract columns from SELECT items and create individual asset results for each column - // Only processes columns that reference known assets to avoid false positives - fn extract_column_assets( - &mut self, - projection: &[SelectItem], + // The alias-map entry (key → asset) for one FROM/JOIN table factor, or + // `None` if it isn't an asset-backed table. The key is its alias, else the + // bare table name; S3 table-functions and string-literal tables are only + // keyed when aliased (an unaliased one is ambiguous). Returns the asset with + // a single matched relation so the caller can attribute qualified columns. + fn table_alias_entry(&self, relation: &TableFactor) -> Option<(String, ParseAssetsResult)> { + let TableFactor::Table { name, alias, args, .. } = relation else { + return None; + }; + let has_args = args.as_ref().map_or(false, |a| !a.args.is_empty()); + if has_args { + let alias = alias.as_ref()?; + let asset = self.get_s3_asset_from_table_function(relation)?; + return Some((alias.name.value.clone(), asset)); + } + let asset = self + .get_associated_asset_from_obj_name(name, Some(R)) + .or_else(|| self.get_s3_asset_from_str_literal_table(relation))?; + if get_str_lit_from_obj_name(name).is_some() { + // String-literal S3 table: only unambiguous when aliased. + let alias = alias.as_ref()?; + return Some((alias.name.value.clone(), asset)); + } + let key = match alias { + Some(a) => a.name.value.clone(), + None => name + .0 + .last() + .and_then(|id| id.as_ident()) + .map(|id| id.value.clone()) + .unwrap_or_default(), + }; + Some((key, asset)) + } + + // Resolve a query's FROM clause into (single-table asset, alias→asset map). + // `single_table` is `Some` only for an unambiguous one-table FROM with no + // joins (so bare column refs can be attributed); `table_to_asset` keys by + // alias/table name for qualified refs and includes every JOINed table. + // Shared by `extract_column_assets` (read columns) and `infer_column_lineage` + // (output→input edges) so both resolve identically. + fn build_from_maps( + &self, from_tables: &[sqlparser::ast::TableWithJoins], + ) -> ( + Option, + BTreeMap, ) { - // Check if this is a single-table SELECT (to avoid ambiguity). - // For S3 table functions (read_parquet/read_csv/read_json), detect the asset even - // though args are present, since we know the file path from the string literal arg. - let single_table = if from_tables.len() == 1 { + // Single unambiguous table only when there's exactly one FROM entry AND + // it has no joins — otherwise a bare column could belong to any side. + let single_table = if from_tables.len() == 1 && from_tables[0].joins.is_empty() { let relation = &from_tables[0].relation; if let TableFactor::Table { name, args, .. } = relation { let has_args = args.as_ref().map_or(false, |a| !a.args.is_empty()); @@ -329,51 +453,30 @@ impl AssetCollector { None }; - // Build a map of table aliases/names to assets for multi-table queries. - // For S3 table functions, only aliased references are unambiguous - // (e.g. SELECT t.col1 FROM read_parquet('s3://...') AS t). + // Alias → asset for qualified column refs, across every FROM entry AND + // its JOINed tables (so `c.col` in `FROM a JOIN c` resolves). let mut table_to_asset: BTreeMap = BTreeMap::new(); for table_with_joins in from_tables { - if let TableFactor::Table { name, alias, args, .. } = &table_with_joins.relation { - let has_args = args.as_ref().map_or(false, |a| !a.args.is_empty()); - if has_args { - // For table functions, only add to the alias map when an alias is present - if let Some(alias) = alias { - if let Some(asset) = - self.get_s3_asset_from_table_function(&table_with_joins.relation) - { - table_to_asset.insert(alias.name.value.clone(), asset); - } - } - } else if let Some(asset) = self - .get_associated_asset_from_obj_name(name, Some(R)) - .or_else(|| { - self.get_s3_asset_from_str_literal_table(&table_with_joins.relation) - }) - { - // For string literal S3 tables (e.g. FROM 's3:///file.parquet'), only add to - // the alias map when an alias is present (to avoid false positives). - // For regular named tables, use alias or table name as key. - let is_str_literal = get_str_lit_from_obj_name(name).is_some(); - if is_str_literal { - if let Some(alias) = alias { - table_to_asset.insert(alias.name.value.clone(), asset); - } - } else { - let table_key = if let Some(alias) = alias { - alias.name.value.clone() - } else { - name.0 - .last() - .and_then(|id| id.as_ident()) - .map(|id| id.value.clone()) - .unwrap_or_default() - }; - table_to_asset.insert(table_key, asset); - } + if let Some((k, a)) = self.table_alias_entry(&table_with_joins.relation) { + table_to_asset.insert(k, a); + } + for join in &table_with_joins.joins { + if let Some((k, a)) = self.table_alias_entry(&join.relation) { + table_to_asset.insert(k, a); } } } + (single_table, table_to_asset) + } + + // Extract columns from SELECT items and create individual asset results for each column + // Only processes columns that reference known assets to avoid false positives + fn extract_column_assets( + &mut self, + projection: &[SelectItem], + from_tables: &[sqlparser::ast::TableWithJoins], + ) { + let (single_table, table_to_asset) = self.build_from_maps(from_tables); // Process each SELECT item for item in projection { @@ -446,6 +549,136 @@ impl AssetCollector { } } } + + // Infer column-level lineage for an output-producing query's projection: + // each *named* output column → the upstream source columns its expression + // reads. Covers passthroughs (`amount`) and computed columns + // (`amount + tax AS total`) alike. Skipped: wildcards and unaliased + // expressions (no stable output name), and inputs that don't resolve to a + // known asset. A column with no resolved inputs is dropped. + // + // Best-effort and intentionally flat: results from every output query in the + // script accumulate into one list (the graph hangs them off the materialize + // write-edge), with no per-output-table association. This is exact for the + // common single-output member (a managed-materialize SELECT, or one CTAS), + // but a multi-statement script that stages through a TEMP table reports the + // *intermediate* column names (the final SELECT reads the temp table, whose + // columns don't resolve to an asset, so they drop out). A `// column` + // annotation overrides any output column where inference is wrong or coarse. + fn infer_column_lineage( + &mut self, + projection: &[SelectItem], + from_tables: &[sqlparser::ast::TableWithJoins], + output: Option<(AssetKind, String)>, + ) { + let (single_table, table_to_asset) = self.build_from_maps(from_tables); + for item in projection { + let (out_col, expr) = match item { + SelectItem::ExprWithAlias { expr, alias } => (alias.value.clone(), expr), + SelectItem::UnnamedExpr(expr @ Expr::Identifier(id)) => (id.value.clone(), expr), + SelectItem::UnnamedExpr(expr @ Expr::CompoundIdentifier(parts)) => { + match parts.last() { + Some(last) => (last.value.clone(), expr), + None => continue, + } + } + _ => continue, + }; + let mut collector = ColumnIdentCollector { refs: Vec::new(), query_depth: 0 }; + let _ = expr.visit(&mut collector); + let mut inputs: Vec = Vec::new(); + for parts in &collector.refs { + if let Some(cr) = self.resolve_column_ref(parts, &single_table, &table_to_asset) { + if !inputs.contains(&cr) { + inputs.push(cr); + } + } + } + if !inputs.is_empty() { + self.column_lineage + .push((output.clone(), ColumnLineage { column: out_col, inputs })); + } + } + } + + // Resolve identifier `parts` (e.g. `["t","amount"]` or `["amount"]`) to the + // source asset column it reads, mirroring `extract_column_assets`' + // resolution: a bare ident needs an unambiguous single-table FROM; a + // qualified ident resolves its prefix via the alias map, or (≥3 parts) as a + // db/schema-qualified object name. + fn resolve_column_ref( + &self, + parts: &[String], + single_table: &Option, + table_to_asset: &BTreeMap, + ) -> Option { + let asset_to_ref = |asset: &ParseAssetsResult, col: &str| ColumnRef { + from_kind: asset.kind, + from_path: asset.path.clone(), + from_column: col.to_string(), + }; + match parts { + [col] => single_table.as_ref().map(|a| asset_to_ref(a, col)), + [.., col] => { + let prefix = parts.first()?; + if let Some(asset) = table_to_asset.get(prefix) { + Some(asset_to_ref(asset, col)) + } else if parts.len() >= 3 { + let obj_parts: Vec = parts[..parts.len() - 1] + .iter() + .map(|p| ObjectNamePart::Identifier(sqlparser::ast::Ident::new(p.clone()))) + .collect(); + let asset = + self.get_associated_asset_from_obj_name(&ObjectName(obj_parts), Some(R))?; + Some(asset_to_ref(&asset, col)) + } else { + None + } + } + [] => None, + } + } +} + +// Collects the identifier paths an expression reads, for column-lineage +// inference: `Expr::Identifier(a)` → `["a"]`, `Expr::CompoundIdentifier(t.a)` → +// `["t","a"]`. The derived `Visit` walk recurses through operators, functions, +// casts and CASE, so every leaf identifier of the outer expression is captured. +struct ColumnIdentCollector { + refs: Vec>, + // Depth of nested (sub)queries inside the expression. Identifiers are only + // captured at depth 0: a scalar/correlated subquery's columns belong to ITS + // own FROM, not the outer projection's, so descending would misattribute + // (e.g. `(SELECT x FROM other) AS c FROM orders` must NOT bind `c` to + // `orders.x`). Subquery-derived columns are simply left to annotations. + query_depth: usize, +} + +impl Visitor for ColumnIdentCollector { + type Break = (); + + fn pre_visit_query(&mut self, _query: &sqlparser::ast::Query) -> std::ops::ControlFlow<()> { + self.query_depth += 1; + std::ops::ControlFlow::Continue(()) + } + + fn post_visit_query(&mut self, _query: &sqlparser::ast::Query) -> std::ops::ControlFlow<()> { + self.query_depth = self.query_depth.saturating_sub(1); + std::ops::ControlFlow::Continue(()) + } + + fn pre_visit_expr(&mut self, expr: &Expr) -> std::ops::ControlFlow { + if self.query_depth == 0 { + match expr { + Expr::Identifier(id) => self.refs.push(vec![id.value.clone()]), + Expr::CompoundIdentifier(parts) => self + .refs + .push(parts.iter().map(|id| id.value.clone()).collect()), + _ => {} + } + } + std::ops::ControlFlow::Continue(()) + } } impl Visitor for AssetCollector { @@ -505,7 +738,11 @@ impl Visitor for AssetCollector { ) -> std::ops::ControlFlow { match statement { sqlparser::ast::Statement::Query(q) => { + // A top-level SELECT is the managed-materialize output, so its + // columns ARE the materialized asset's columns (output resolved + // to the `// materialize` target in `parse_assets`). self.handle_query_reads(q); + self.infer_query_output(q, None); } sqlparser::ast::Statement::Insert(insert) => { @@ -658,18 +895,32 @@ impl Visitor for AssetCollector { } sqlparser::ast::Statement::CreateTable(create_table) => { - self.track_table_definition(&create_table.name); + self.track_table_definition(&create_table.name, create_table.temporary); // `CREATE TABLE x AS SELECT … FROM y` reads y. The AS-query // isn't a `Statement::Query`, so its FROM tables are only - // caught here. + // caught here. Only infer output lineage when `x` is a real + // asset — a CTAS into a local/temp staging table is not the + // materialized output (its columns aren't the asset's). if let Some(query) = &create_table.query { self.handle_query_reads(query); + // Infer only when `x` is a real asset (its output columns + // ARE that asset's), tagged with it so `parse_assets` can + // scope lineage per output. A local/temp staging table is + // not an asset → not inferred. + if let Some(asset) = + self.get_associated_asset_from_obj_name(&create_table.name, Some(W)) + { + self.infer_query_output(query, Some((asset.kind, asset.path))); + } } } - sqlparser::ast::Statement::CreateView { name, query, .. } => { - self.track_table_definition(name); + sqlparser::ast::Statement::CreateView { name, query, temporary, .. } => { + self.track_table_definition(name, *temporary); self.handle_query_reads(query); + if let Some(asset) = self.get_associated_asset_from_obj_name(name, Some(W)) { + self.infer_query_output(query, Some((asset.kind, asset.path))); + } } // DROP TABLE/VIEW is a write to the dropped object — the @@ -687,7 +938,9 @@ impl Visitor for AssetCollector { | sqlparser::ast::ObjectType::MaterializedView ) { for name in names { - self.track_table_definition(name); + // DROP is a write to the named object; resolve it as an + // asset if it is one (not a temp-creation context). + self.track_table_definition(name, false); } } } @@ -1920,8 +2173,9 @@ mod ctas_read_tests { assets ); assert!( - assets.iter().any(|a| a.path == "main/exciting_809" - && a.access_type == Some(W)), + assets + .iter() + .any(|a| a.path == "main/exciting_809" && a.access_type == Some(W)), "expected write of main/exciting_809, got {:?}", assets ); @@ -1935,9 +2189,292 @@ mod ctas_read_tests { "#; let assets = parse_assets(input).unwrap().assets; assert!( - assets.iter().any(|a| a.path == "main/fx_rates" && a.access_type == Some(R)), + assets + .iter() + .any(|a| a.path == "main/fx_rates" && a.access_type == Some(R)), "expected read of main/fx_rates, got {:?}", assets ); } + + fn lineage(input: &str) -> Vec { + parse_assets(input).unwrap().column_lineage + } + + #[test] + fn test_infer_lineage_computed_and_passthrough() { + // CTAS with a computed column (amount + tax) and a passthrough (id). + let input = r#" + ATTACH 'ducklake://warehouse' AS dl; + CREATE TABLE dl.orders_daily AS + SELECT dl.orders.id, dl.orders.amount + dl.orders.tax AS order_total + FROM dl.orders; + "#; + let got = lineage(input); + assert_eq!( + got, + vec![ + ColumnLineage { + column: "id".to_string(), + inputs: vec![ColumnRef { + from_kind: AssetKind::Ducklake, + from_path: "warehouse/orders".to_string(), + from_column: "id".to_string(), + }], + }, + ColumnLineage { + column: "order_total".to_string(), + inputs: vec![ + ColumnRef { + from_kind: AssetKind::Ducklake, + from_path: "warehouse/orders".to_string(), + from_column: "amount".to_string(), + }, + ColumnRef { + from_kind: AssetKind::Ducklake, + from_path: "warehouse/orders".to_string(), + from_column: "tax".to_string(), + }, + ], + }, + ] + ); + } + + #[test] + fn test_infer_lineage_bare_column_single_table() { + // Managed-materialize form: a plain top-level SELECT, bare columns + // attributed to the single FROM table. + let input = r#" + ATTACH 'ducklake://warehouse' AS dl; + USE dl; + SELECT amount AS revenue FROM orders; + "#; + let got = lineage(input); + assert_eq!( + got, + vec![ColumnLineage { + column: "revenue".to_string(), + inputs: vec![ColumnRef { + from_kind: AssetKind::Ducklake, + from_path: "warehouse/orders".to_string(), + from_column: "amount".to_string(), + }], + }] + ); + } + + #[test] + fn test_infer_lineage_annotation_overrides() { + // The `// column` annotation for `order_total` wins; `id` stays inferred. + let input = r#" + -- column order_total <- datatable://prod/manual.grand_total + ATTACH 'ducklake://warehouse' AS dl; + CREATE TABLE dl.orders_daily AS + SELECT dl.orders.id, dl.orders.amount + dl.orders.tax AS order_total + FROM dl.orders; + "#; + let got = lineage(input); + // Annotation entry is authoritative and listed first. + assert_eq!(got[0].column, "order_total"); + assert_eq!(got[0].inputs[0].from_path, "prod/manual"); + assert_eq!(got[0].inputs[0].from_column, "grand_total"); + // Inferred `id` survives; inferred `order_total` dropped (no dup). + assert!(got.iter().any(|c| c.column == "id")); + assert_eq!(got.iter().filter(|c| c.column == "order_total").count(), 1); + } + + #[test] + fn test_infer_lineage_skips_local_staging_ctas() { + // A CTAS into a TEMP/local table is NOT the materialized output, so its + // columns must not be reported (they'd be anchored to the script's + // `// materialize` target as if they were the final asset's columns). + // The final SELECT reads the local staging table → unresolved → empty. + let input = r#" + ATTACH 'ducklake://warehouse' AS dl; + CREATE TEMP TABLE tmp AS SELECT dl.orders.amount AS amt FROM dl.orders; + SELECT amt AS total FROM tmp; + "#; + assert!( + lineage(input).is_empty(), + "staging columns must not be reported as final output; got {:?}", + lineage(input) + ); + } + + #[test] + fn test_infer_lineage_ctas_into_asset_still_inferred() { + // A CTAS whose target IS an asset is the output, so it's still inferred. + let input = r#" + ATTACH 'ducklake://warehouse' AS dl; + CREATE TABLE dl.orders_daily AS SELECT dl.orders.amount AS amt FROM dl.orders; + "#; + assert_eq!( + lineage(input), + vec![ColumnLineage { + column: "amt".to_string(), + inputs: vec![ColumnRef { + from_kind: AssetKind::Ducklake, + from_path: "warehouse/orders".to_string(), + from_column: "amount".to_string(), + }], + }] + ); + } + + #[test] + fn test_infer_lineage_temp_table_under_use_is_local() { + // A one-part TEMP table name under an active `USE dl` must NOT resolve to + // an asset (`warehouse/tmp`); it's local, so the final SELECT reading it + // can't invent `warehouse/tmp.amt` as a column source for the output. + let input = r#" + -- materialize ducklake://warehouse/final + ATTACH 'ducklake://warehouse' AS dl; + USE dl; + CREATE TEMP TABLE tmp AS SELECT amount AS amt FROM orders; + SELECT amt AS total FROM tmp; + "#; + let got = lineage(input); + assert!( + got.is_empty(), + "temp staging under USE must not leak warehouse/tmp as a source; got {:?}", + got + ); + // And no phantom `warehouse/tmp` asset is recorded. + let assets = parse_assets(input).unwrap().assets; + assert!( + !assets.iter().any(|a| a.path == "warehouse/tmp"), + "temp table must not be recorded as an asset; got {:?}", + assets + ); + } + + #[test] + fn test_infer_lineage_scopes_to_materialize_target() { + // A script with a `// materialize` target plus an AUXILIARY CTAS into a + // different asset: only the materialized target's columns are reported; + // the auxiliary output's columns must not be attributed to it. + let input = r#" + -- materialize ducklake://warehouse/final + ATTACH 'ducklake://warehouse' AS dl; + CREATE TABLE dl.audit AS SELECT dl.orders.id AS aid FROM dl.orders; + SELECT dl.orders.amount AS total FROM dl.orders; + "#; + assert_eq!( + lineage(input), + vec![ColumnLineage { + column: "total".to_string(), + inputs: vec![ColumnRef { + from_kind: AssetKind::Ducklake, + from_path: "warehouse/orders".to_string(), + from_column: "amount".to_string(), + }], + }], + "auxiliary `audit` columns must not appear on the materialized target" + ); + } + + #[test] + fn test_infer_lineage_drops_ambiguous_multi_output() { + // No `// materialize` target and two real CTAS outputs: which asset the + // flat lineage describes is ambiguous, so inference is dropped rather + // than attributed to an arbitrary one. + let input = r#" + ATTACH 'ducklake://warehouse' AS dl; + CREATE TABLE dl.a AS SELECT dl.orders.id AS x FROM dl.orders; + CREATE TABLE dl.b AS SELECT dl.orders.amount AS y FROM dl.orders; + "#; + assert!( + lineage(input).is_empty(), + "ambiguous multi-output must drop inference" + ); + } + + #[test] + fn test_infer_lineage_wildcard_yields_nothing() { + // `SELECT *` has no enumerable output columns → no inferred lineage. + let input = r#" + ATTACH 'ducklake://warehouse' AS dl; + CREATE TABLE dl.orders_daily AS SELECT * FROM dl.orders; + "#; + assert!(lineage(input).is_empty()); + } + + #[test] + fn test_infer_lineage_resolves_joined_inputs() { + // Columns from BOTH sides of an explicit JOIN must resolve, incl. a + // computed column mixing the two. A bare column is dropped (ambiguous + // across the join) rather than misattributed to the first table. + let input = r#" + ATTACH 'ducklake://warehouse' AS dl; + CREATE TABLE dl.orders_daily AS + SELECT o.id, c.region AS cust_region, o.amount + c.discount AS net + FROM dl.orders o + JOIN dl.customers c ON c.id = o.customer_id; + "#; + let got = lineage(input); + assert_eq!( + got, + vec![ + ColumnLineage { + column: "id".to_string(), + inputs: vec![ColumnRef { + from_kind: AssetKind::Ducklake, + from_path: "warehouse/orders".to_string(), + from_column: "id".to_string(), + }], + }, + ColumnLineage { + column: "cust_region".to_string(), + inputs: vec![ColumnRef { + from_kind: AssetKind::Ducklake, + from_path: "warehouse/customers".to_string(), + from_column: "region".to_string(), + }], + }, + ColumnLineage { + column: "net".to_string(), + inputs: vec![ + ColumnRef { + from_kind: AssetKind::Ducklake, + from_path: "warehouse/orders".to_string(), + from_column: "amount".to_string(), + }, + ColumnRef { + from_kind: AssetKind::Ducklake, + from_path: "warehouse/customers".to_string(), + from_column: "discount".to_string(), + }, + ], + }, + ] + ); + } + + #[test] + fn test_infer_lineage_does_not_descend_into_subqueries() { + // A scalar subquery's bare column (`amount`) belongs to the subquery's + // own FROM, NOT the outer `dl.orders` — it must not be attributed to the + // outer table. The subquery column is left to annotations; the + // passthrough `id` still resolves. + let input = r#" + ATTACH 'ducklake://warehouse' AS dl; + CREATE TABLE dl.orders_daily AS + SELECT dl.orders.id, (SELECT amount FROM dl.other LIMIT 1) AS c + FROM dl.orders; + "#; + let got = lineage(input); + assert_eq!( + got, + vec![ColumnLineage { + column: "id".to_string(), + inputs: vec![ColumnRef { + from_kind: AssetKind::Ducklake, + from_path: "warehouse/orders".to_string(), + from_column: "id".to_string(), + }], + }], + "subquery column `c` must be dropped, not misattributed to orders" + ); + } } diff --git a/backend/parsers/windmill-parser/src/asset_parser.rs b/backend/parsers/windmill-parser/src/asset_parser.rs index 2103e9937bc68..2b7f6ecdbfcda 100644 --- a/backend/parsers/windmill-parser/src/asset_parser.rs +++ b/backend/parsers/windmill-parser/src/asset_parser.rs @@ -117,6 +117,11 @@ pub struct ParseAssetsOutput { // lines allowed). Drives the worker's post-materialize verifier probes. #[serde(skip_serializing_if = "Vec::is_empty", default)] pub data_tests: Vec, + // `// column <- .[, …]` — declared column-level lineage, + // one entry per output column. Accumulating. Pure metadata: drives the + // column-lineage graph view, executes nothing. + #[serde(skip_serializing_if = "Vec::is_empty", default)] + pub column_lineage: Vec, } #[derive(Serialize, Debug, PartialEq, Clone)] @@ -275,6 +280,39 @@ pub enum DataTest { Custom { path: String }, } +// `// column <- .[, …]` — declared column-level +// lineage: one output column of this script's produced asset and the upstream +// source columns it derives from. A sibling of `DataTest` in the extensible +// annotation family (`docs/pipelines-vs-dbt.md` §3): same parse shape — a head +// token (the output column) then a per-variant tail — but accumulating, one +// line per output column. Unlike `data_test` these are pure metadata: they +// drive the column-lineage graph view, never a runtime probe. +// +// dbt derives column lineage from SQL-AST parsing; Windmill is polyglot +// (Python/TS/Bash/SQL in one DAG), so a uniform AST is not available. The +// annotation is the explicit, language-agnostic declaration — the same +// "annotations are real comments parsed strictly" stance as the rest of the +// pipeline grammar. Body-inferred per-asset column *sets* (`columns` on +// `ParseAssetsResult`) complement it but cannot express column→column edges. +#[derive(Serialize, Debug, PartialEq, Clone)] +pub struct ColumnLineage { + // The produced asset's output column this line describes. + pub column: String, + // Upstream source columns it derives from (≥1; malformed refs dropped). + pub inputs: Vec, +} + +// One `.` upstream reference inside a `// column` line. The +// asset URI accepts the default-syntax shorthands (like `// materialize` / +// `// data_test relationships`); the column is the segment after the final +// `.` (so a schema-qualified `warehouse/main.orders.amount` keeps `amount`). +#[derive(Serialize, Debug, PartialEq, Clone)] +pub struct ColumnRef { + pub from_kind: AssetKind, + pub from_path: String, + pub from_column: String, +} + // `// trigger any` (default) vs `// trigger all`. `Any` = OR: any trigger // firing runs the script (current behaviour). `All` = AND: the script // runs only once every partition-bearing input has materialized at the @@ -307,6 +345,7 @@ pub struct PipelineAnnotations { pub retry: Option, pub materialize: Option, pub data_tests: Vec, + pub column_lineage: Vec, } impl ParseAssetsOutput { @@ -332,10 +371,33 @@ impl ParseAssetsOutput { retry: pipeline.retry, materialize: pipeline.materialize, data_tests: pipeline.data_tests, + column_lineage: pipeline.column_lineage, } } } +// Combine column lineage inferred from the body (SQL AST) with lineage declared +// via `// column` annotations. The annotation is the *override*: where both +// describe the same output column, the explicit declaration wins and the +// inferred entry is dropped. Inferred entries are also deduped by output column +// among themselves (first wins). Used by the language asset-parsers so a +// `// column` line can correct a mis-inferred edge without disabling inference +// for the rest of the columns. +pub fn merge_column_lineage( + inferred: Vec, + annotated: Vec, +) -> Vec { + let mut seen: std::collections::HashSet = + annotated.iter().map(|c| c.column.clone()).collect(); + let mut out = annotated; + for c in inferred { + if seen.insert(c.column.clone()) { + out.push(c); + } + } + out +} + #[derive(Debug, Clone, Serialize)] pub struct DelegateToGitRepoDetails { pub resource: String, @@ -680,6 +742,17 @@ pub fn parse_pipeline_annotations(code: &str) -> PipelineAnnotations { continue; } + // `// column <- .[, …]` — accumulating column lineage. + // A complete word, so it never swallows a body comment that happens to + // start with `column` followed by non-lineage prose (that has no `<-` + // and is dropped fail-safe). Checked before `on`/asset shorthands. + if let Some(after_kw) = consume_keyword(rest, "column") { + if let Some(spec) = parse_column_lineage_spec(after_kw.trim()) { + out.column_lineage.push(spec); + } + continue; + } + if let Some(after_kw) = consume_keyword(rest, "on") { let spec_text = after_kw.trim(); if spec_text.is_empty() { @@ -841,6 +914,38 @@ fn parse_relationships(s: &str) -> Option { Some(DataTest::Relationships { column, to_kind, to_path: to_path.to_string(), to_column }) } +// Parse a `// column <- [, …]` right-hand side. The head +// (before `<-`) is the output column; the tail is a comma-separated list of +// `.` upstream references. Mirrors `parse_accepted_values`' +// "drop empties, require ≥1" stance: individually malformed refs are dropped +// and the line is kept iff at least one ref parses; a missing `<-`, a non-ident +// output column, or zero valid refs drops the whole line (fail-safe). +fn parse_column_lineage_spec(s: &str) -> Option { + let (out_col, refs) = s.split_once("<-")?; + let column = single_ident(out_col)?; + let inputs: Vec = refs + .split(',') + .filter_map(|r| parse_column_ref(r.trim())) + .collect(); + if inputs.is_empty() { + return None; + } + Some(ColumnLineage { column, inputs }) +} + +// `.` — the referenced column is the segment after the final +// `.`; everything before it is the asset URI (default-syntax shorthands +// enabled, like `// materialize`). Same shape as `parse_relationships`' target. +fn parse_column_ref(s: &str) -> Option { + let (asset_uri, ref_col) = s.rsplit_once('.')?; + let from_column = single_ident(ref_col)?; + let (from_kind, from_path) = parse_asset_syntax(asset_uri.trim(), true)?; + if from_path.is_empty() { + return None; + } + Some(ColumnRef { from_kind, from_path: from_path.to_string(), from_column }) +} + // Parse a `// partitioned [opts]` right-hand side. Recognized kinds: // `daily`, `hourly`, `weekly`, `monthly` (with optional tz/format/start), // and `dynamic key=""` (plus optional format). @@ -1545,4 +1650,132 @@ mod pipeline_annotation_tests { vec![DataTest::Unique { column: "id".to_string() }] ); } + + #[test] + fn column_lineage_basic() { + let code = concat!( + "// column order_total <- ducklake://warehouse/orders.amount, ducklake://warehouse/orders.tax\n", + "// column user_name <- datatable://prod/users.name\n", + ); + let out = parse_pipeline_annotations(code); + assert_eq!( + out.column_lineage, + vec![ + ColumnLineage { + column: "order_total".to_string(), + inputs: vec![ + ColumnRef { + from_kind: AssetKind::Ducklake, + from_path: "warehouse/orders".to_string(), + from_column: "amount".to_string(), + }, + ColumnRef { + from_kind: AssetKind::Ducklake, + from_path: "warehouse/orders".to_string(), + from_column: "tax".to_string(), + }, + ], + }, + ColumnLineage { + column: "user_name".to_string(), + inputs: vec![ColumnRef { + from_kind: AssetKind::DataTable, + from_path: "prod/users".to_string(), + from_column: "name".to_string(), + }], + }, + ] + ); + } + + #[test] + fn column_lineage_schema_qualified_keeps_last_dot_as_column() { + // The column is the segment after the FINAL dot, so a schema-qualified + // ducklake table (`main.dim_products`) survives intact. + let out = parse_pipeline_annotations( + "// column sku <- ducklake://warehouse/main.dim_products.sku", + ); + assert_eq!( + out.column_lineage, + vec![ColumnLineage { + column: "sku".to_string(), + inputs: vec![ColumnRef { + from_kind: AssetKind::Ducklake, + from_path: "warehouse/main.dim_products".to_string(), + from_column: "sku".to_string(), + }], + }] + ); + } + + #[test] + fn column_lineage_drops_malformed_refs_keeps_valid() { + // `bad_no_dot` has no `.col` and is dropped; the line survives on its + // one valid ref. Mirrors accepted_values' drop-empties-keep-≥1 stance. + let out = parse_pipeline_annotations( + "// column total <- bad_no_dot, datatable://prod/orders.amount", + ); + assert_eq!( + out.column_lineage, + vec![ColumnLineage { + column: "total".to_string(), + inputs: vec![ColumnRef { + from_kind: AssetKind::DataTable, + from_path: "prod/orders".to_string(), + from_column: "amount".to_string(), + }], + }] + ); + } + + #[test] + fn merge_column_lineage_annotation_overrides_inferred() { + let inferred = vec![ + ColumnLineage { + column: "total".to_string(), + inputs: vec![ColumnRef { + from_kind: AssetKind::Ducklake, + from_path: "w/o".to_string(), + from_column: "amount".to_string(), + }], + }, + ColumnLineage { + column: "qty".to_string(), + inputs: vec![ColumnRef { + from_kind: AssetKind::Ducklake, + from_path: "w/o".to_string(), + from_column: "qty".to_string(), + }], + }, + ]; + // Annotation redefines `total` (wins) and leaves `qty` to inference. + let annotated = vec![ColumnLineage { + column: "total".to_string(), + inputs: vec![ColumnRef { + from_kind: AssetKind::DataTable, + from_path: "prod/x".to_string(), + from_column: "grand_total".to_string(), + }], + }]; + let merged = merge_column_lineage(inferred, annotated); + assert_eq!(merged.len(), 2); + // Annotation entry kept first and authoritative. + assert_eq!(merged[0].column, "total"); + assert_eq!(merged[0].inputs[0].from_column, "grand_total"); + // Inferred `qty` survives (no annotation for it); inferred `total` dropped. + assert_eq!(merged[1].column, "qty"); + } + + #[test] + fn column_lineage_malformed_lines_dropped_fail_safe() { + // No arrow, a multi-token output column, and a line whose every ref is + // malformed are all dropped entirely. + let out = parse_pipeline_annotations(concat!( + "// column no_arrow datatable://prod/x.y\n", // missing `<-` + "// column a b <- datatable://prod/x.y\n", // output not a single ident + "// column total <- bad_no_dot\n", // no valid ref + "// column\n", // bare keyword + )); + assert!(out.column_lineage.is_empty()); + } } diff --git a/backend/parsers/windmill-parser/tests/fixtures/pipeline_annotations.json b/backend/parsers/windmill-parser/tests/fixtures/pipeline_annotations.json index 304a8c3842b7b..4ecbc992bc61d 100644 --- a/backend/parsers/windmill-parser/tests/fixtures/pipeline_annotations.json +++ b/backend/parsers/windmill-parser/tests/fixtures/pipeline_annotations.json @@ -415,5 +415,96 @@ "retry": null, "data_tests": [{ "type": "unique", "column": "id" }] } + }, + { + "name": "column lineage maps output columns to upstream sources", + "code": "// column order_total <- ducklake://warehouse/orders.amount, ducklake://warehouse/orders.tax\n// column user_name <- datatable://prod/users.name\nSELECT 1;", + "expected": { + "in_pipeline": false, + "asset_triggers": [], + "native_triggers": [], + "partition": null, + "freshness": null, + "tag": null, + "retry": null, + "column_lineage": [ + { + "column": "order_total", + "inputs": [ + { + "from_kind": "ducklake", + "from_path": "warehouse/orders", + "from_column": "amount" + }, + { + "from_kind": "ducklake", + "from_path": "warehouse/orders", + "from_column": "tax" + } + ] + }, + { + "column": "user_name", + "inputs": [ + { "from_kind": "datatable", "from_path": "prod/users", "from_column": "name" } + ] + } + ] + } + }, + { + "name": "column lineage keeps duplicate input refs (dedup is a view concern)", + "code": "// column total <- ducklake://warehouse/orders.amount, ducklake://warehouse/orders.amount\nSELECT 1;", + "expected": { + "in_pipeline": false, + "asset_triggers": [], + "native_triggers": [], + "partition": null, + "freshness": null, + "tag": null, + "retry": null, + "column_lineage": [ + { + "column": "total", + "inputs": [ + { + "from_kind": "ducklake", + "from_path": "warehouse/orders", + "from_column": "amount" + }, + { + "from_kind": "ducklake", + "from_path": "warehouse/orders", + "from_column": "amount" + } + ] + } + ] + } + }, + { + "name": "column lineage keeps schema-qualified table, drops malformed refs", + "code": "// column sku <- ducklake://warehouse/main.dim_products.sku, bad_no_dot\n// column no_arrow datatable://prod/x.y\n// column total <- bad_no_dot\nSELECT 1;", + "expected": { + "in_pipeline": false, + "asset_triggers": [], + "native_triggers": [], + "partition": null, + "freshness": null, + "tag": null, + "retry": null, + "column_lineage": [ + { + "column": "sku", + "inputs": [ + { + "from_kind": "ducklake", + "from_path": "warehouse/main.dim_products", + "from_column": "sku" + } + ] + } + ] + } } ] diff --git a/backend/parsers/windmill-parser/tests/pipeline_annotations_parity.rs b/backend/parsers/windmill-parser/tests/pipeline_annotations_parity.rs index 094419300b2b5..883ddebcbc0b9 100644 --- a/backend/parsers/windmill-parser/tests/pipeline_annotations_parity.rs +++ b/backend/parsers/windmill-parser/tests/pipeline_annotations_parity.rs @@ -42,6 +42,11 @@ struct Expected { // compared against `serde_json::to_value(got.data_tests)`. Absent === []. #[serde(default)] data_tests: Vec, + // Snake_case `ColumnLineage` serde shape (e.g. {"column":"x","inputs": + // [{"from_kind":"datatable","from_path":"p","from_column":"c"}]}), compared + // against `to_value(got.column_lineage)`. Absent === []. + #[serde(default)] + column_lineage: Vec, } #[derive(Deserialize)] @@ -200,5 +205,13 @@ fn pipeline_annotation_fixtures_match() { serde_json::Value::Array(f.expected.data_tests.clone()), "{ctx}: data tests" ); + + let got_lineage = + serde_json::to_value(&got.column_lineage).expect("column_lineage serialize"); + assert_eq!( + got_lineage, + serde_json::Value::Array(f.expected.column_lineage.clone()), + "{ctx}: column lineage" + ); } } diff --git a/backend/windmill-api-assets/Cargo.toml b/backend/windmill-api-assets/Cargo.toml index c6ba973f80a2d..c6e7676b6bb27 100644 --- a/backend/windmill-api-assets/Cargo.toml +++ b/backend/windmill-api-assets/Cargo.toml @@ -11,6 +11,7 @@ path = "src/lib.rs" [dependencies] windmill-api-auth.workspace = true windmill-common = { workspace = true, default-features = false } +windmill-parser-sql-asset.workspace = true axum.workspace = true chrono.workspace = true serde.workspace = true diff --git a/backend/windmill-api-assets/src/lib.rs b/backend/windmill-api-assets/src/lib.rs index d25bb4db5fc6e..29122bd7934d0 100644 --- a/backend/windmill-api-assets/src/lib.rs +++ b/backend/windmill-api-assets/src/lib.rs @@ -519,6 +519,17 @@ struct GraphRunnableNode { retry: Option, #[serde(skip_serializing_if = "Vec::is_empty", default)] data_tests: Vec, + // `// column <- .` declared column-level lineage, surfaced + // so the canvas can draw the column-lineage view on deployed nodes (not + // only live drafts). Lockstep with TS `AssetGraphRunnableNode.column_lineage`. + #[serde(skip_serializing_if = "Vec::is_empty", default)] + column_lineage: Vec, + // `// materialize ` target — the asset this script's `column_lineage` + // describes. Lets the column-graph anchor lineage to the exact output asset + // instead of guessing a ducklake write-edge (a multi-output script writes + // several). Absent for scripts with no `// materialize` annotation. + #[serde(skip_serializing_if = "Option::is_none", default)] + materialize_target: Option, // Managed `// materialize` write strategy (`replace` | `append` | `merge`), // absent for non-materializing or `manual` scripts. Surfaced so the asset // panel can tell whether the captured schema can evolve: only whole-table @@ -528,6 +539,14 @@ struct GraphRunnableNode { materialize_strategy: Option, } +// The output asset a producer's column lineage belongs to (the `// materialize` +// target). Kept minimal — the column graph only needs (kind, path) to anchor. +#[derive(Serialize, Debug)] +struct MaterializeTargetNode { + kind: windmill_common::assets::AssetKind, + path: String, +} + // The partition's kind word for the node badge (the full PartitionSpec carries // tz/format/start, which the badge doesn't need). fn partition_kind_word(kind: &windmill_common::assets::PartitionKind) -> &'static str { @@ -738,7 +757,8 @@ async fn asset_graph( // defensive against transient overlaps). let pipeline_member_paths = sqlx::query!( r#" - SELECT DISTINCT ON (path) path AS "path!", content AS "content!" + SELECT DISTINCT ON (path) path AS "path!", content AS "content!", + language AS "language!: windmill_common::scripts::ScriptLang" FROM script WHERE workspace_id = $1 AND auto_kind = 'pipeline' @@ -790,6 +810,34 @@ async fn asset_graph( ) }) .collect(); + // Column-level lineage per member. The annotation-only lineage (already + // parsed above) is the baseline. For DuckDB scripts we additionally run the + // full SQL asset parser to infer output→input column edges from the AST; it + // merges them with the `// column` annotations (annotation wins). If the SQL + // can't be parsed (DuckDB accepts grammar `sqlparser` rejects), we fall back + // to the annotation-only baseline rather than dropping explicit annotations. + let column_lineage_by_path: std::collections::HashMap< + String, + Vec, + > = pipeline_member_paths + .iter() + .map(|r| { + let annotated = || { + annotations_by_path + .get(&r.path) + .map(|a| a.column_lineage.clone()) + .unwrap_or_default() + }; + let lineage = if r.language == windmill_common::scripts::ScriptLang::DuckDb { + windmill_parser_sql_asset::parse_assets(&r.content) + .map(|o| o.column_lineage) + .unwrap_or_else(|_| annotated()) + } else { + annotated() + }; + (r.path.clone(), lineage) + }) + .collect(); let pipeline_member_script_paths: std::collections::HashSet = pipeline_member_paths.into_iter().map(|r| r.path).collect(); let existing_script_paths: std::collections::HashSet = @@ -930,6 +978,19 @@ async fn asset_graph( tag: ann.and_then(|a| a.tag.clone()), retry: ann.and_then(|a| a.retry.clone()), data_tests: ann.map(|a| a.data_tests.clone()).unwrap_or_default(), + // Inferred (DuckDB AST) + annotation column lineage, gated to + // scripts like the badges above. + column_lineage: (usage_kind == AssetUsageKind::Script) + .then(|| column_lineage_by_path.get(&path)) + .flatten() + .cloned() + .unwrap_or_default(), + materialize_target: ann.and_then(|a| a.materialize.as_ref()).map(|m| { + MaterializeTargetNode { + kind: windmill_common::assets::asset_kind_from_parser(m.target_kind), + path: m.target_path.clone(), + } + }), materialize_strategy: ann.and_then(|a| a.materialize.as_ref()).and_then(|m| { if m.manual { None diff --git a/backend/windmill-common/src/assets.rs b/backend/windmill-common/src/assets.rs index e24b6806a29da..c186f591419e0 100644 --- a/backend/windmill-common/src/assets.rs +++ b/backend/windmill-common/src/assets.rs @@ -5,8 +5,8 @@ use sqlx::{PgExecutor, Postgres, Transaction}; use crate::{error, scripts::ScriptHash}; pub use windmill_parser::asset_parser::{ - parse_pipeline_annotations, DataTest, PartitionKind, PipelineAnnotations, RetrySpec, - TriggerSpec, PARTITION_TOKEN, + merge_column_lineage, parse_pipeline_annotations, ColumnLineage, ColumnRef, DataTest, + PartitionKind, PipelineAnnotations, RetrySpec, TriggerSpec, PARTITION_TOKEN, }; pub use windmill_types::assets::*; diff --git a/docs/pipelines-vs-dbt.md b/docs/pipelines-vs-dbt.md index 5dd93b5a37e70..4344ef2296fe5 100644 --- a/docs/pipelines-vs-dbt.md +++ b/docs/pipelines-vs-dbt.md @@ -48,7 +48,7 @@ Asset-centric, polyglot, annotation-driven, event-aware: |---|---|---| | Data tests | No | **Shipped** (`// data_test`) | | Incremental materializations | No, but pick a philosophy | TODO with design decision | -| Column lineage + docs site | No | Pure TODO | +| Column lineage | No | **Shipped** (`// column`); docs site still TODO | | Snapshots / SCD2 | No | New output kind | | Selective execution grammar | No | UI/CLI surface | | Schema contracts | No, but design metadata model | TODO with design work | @@ -84,10 +84,38 @@ See [Incremental deep-dive](#incremental-deep-dive) below. dbt: SQL-AST parsing for column-level deps; `dbt docs serve` produces a static lineage site with descriptions. -Today: graph is asset-level. `SqlQueryDetails` in the parser -(`backend/parsers/windmill-parser/src/asset_parser.rs:44`) already has a -column map — the scaffolding exists. No `// column` annotation, no docs -surface. Pure TODO; no abstraction stands in the way. +**Shipped**, inferred-first. For DuckDB scripts the column lineage is **derived +automatically from the SQL AST** — `windmill-parser-sql-asset` walks each +output-producing query's projection and maps every output column to the source +columns its expression reads (passthroughs *and* computed columns like +`amount + tax AS total`), resolving each input to its asset via the same +`ATTACH`/alias machinery the asset parser already uses. This is the dbt-style +AST lineage, and it needs no annotation. + +The `// column <- .[, …]` annotation is the +**override / escape hatch**, for the cases inference can't reach: polyglot +transforms (Python/TS/Bash — no SQL AST), dynamic SQL (`${sql.raw(...)}`, flagged +by `SqlQueryDetails.has_raw_interpolation`), or correcting a mis-inferred edge. +Inferred and annotated lineage are merged per output column with the annotation +winning (`merge_column_lineage`). The annotation is the second *extensible* +annotation family after `// data_test` — same head-then-tail parse shape (see +`ColumnLineage`/`ColumnRef` in `asset_parser.rs`) — and is pure metadata: it +drives the graph surface, never a runtime probe. + +Surfaced two ways in the asset graph: a count badge on the +producer→materialized-asset write-edge, and a **transitive column-lineage +trace** (`ColumnLineageTrace.svelte`, over the cross-script graph built by +`columnLineageGraph.ts`) in the asset details pane — select an asset to see its +columns and their full upstream/downstream lineage, click any column to +highlight its complete impact set across the pipeline (forward + backward). + +SQL-AST inference runs both **server-side** (the graph endpoint + deploy via +`parse_assets`, for *deployed* members) and **in the live editor** — the same +parser compiled to WASM (`windmill-parser-wasm-asset`) runs on the open draft's +buffer, and `resolveGraph` merges its `column_lineage` with the buffer's +`// column` annotations under the same annotation-wins precedence, so the draft +preview matches what deploys. The `dbt docs serve`-style static lineage *site* +is still TODO. ### 4. Snapshots / SCD2 diff --git a/frontend/package-lock.json b/frontend/package-lock.json index beb9b9fe12452..a8a75923e2cf7 100644 --- a/frontend/package-lock.json +++ b/frontend/package-lock.json @@ -79,7 +79,7 @@ "vscode-languageclient": "~9.0.1", "vscode-uri": "~3.1.0", "vscode-ws-jsonrpc": "~3.5.0", - "windmill-parser-wasm-asset": "1.728.1", + "windmill-parser-wasm-asset": "1.740.0", "windmill-parser-wasm-csharp": "1.510.1", "windmill-parser-wasm-go": "1.510.1", "windmill-parser-wasm-java": "1.510.1", @@ -878,7 +878,6 @@ "version": "1.10.0", "resolved": "https://registry.npmjs.org/@emnapi/core/-/core-1.10.0.tgz", "integrity": "sha512-yq6OkJ4p82CAfPl0u9mQebQHKPJkY7WrIuk205cTYnYe+k2Z8YBh11FrbRG/H6ihirqcacOgl2BIO8oyMQLeXw==", - "dev": true, "license": "MIT", "optional": true, "dependencies": { @@ -890,7 +889,6 @@ "version": "1.10.0", "resolved": "https://registry.npmjs.org/@emnapi/runtime/-/runtime-1.10.0.tgz", "integrity": "sha512-ewvYlk86xUoGI0zQRNq/mC+16R1QeDlKQy21Ki3oSYXNgLb45GV1P6A0M+/s6nyCuNDqe5VpaY84BzXGwVbwFA==", - "dev": true, "license": "MIT", "optional": true, "dependencies": { @@ -901,7 +899,6 @@ "version": "1.2.1", "resolved": "https://registry.npmjs.org/@emnapi/wasi-threads/-/wasi-threads-1.2.1.tgz", "integrity": "sha512-uTII7OYF+/Mes/MrcIOYp5yOtSMLBWSIoLPpcgwipoiKbli6k322tcoFsxoIIxPDqW01SQGAgko4EzZi2BNv2w==", - "dev": true, "license": "MIT", "optional": true, "dependencies": { @@ -1417,7 +1414,6 @@ "version": "1.1.4", "resolved": "https://registry.npmjs.org/@napi-rs/wasm-runtime/-/wasm-runtime-1.1.4.tgz", "integrity": "sha512-3NQNNgA1YSlJb/kMH1ildASP9HW7/7kYnRI2szWJaofaS1hWmbGI4H+d3+22aGzXXN9IJ+n+GiFVcGipJP18ow==", - "dev": true, "license": "MIT", "optional": true, "dependencies": { @@ -1566,7 +1562,6 @@ "cpu": [ "arm64" ], - "dev": true, "license": "MIT", "optional": true, "os": [ @@ -1583,7 +1578,6 @@ "cpu": [ "arm64" ], - "dev": true, "license": "MIT", "optional": true, "os": [ @@ -1600,7 +1594,6 @@ "cpu": [ "x64" ], - "dev": true, "license": "MIT", "optional": true, "os": [ @@ -1617,7 +1610,6 @@ "cpu": [ "x64" ], - "dev": true, "license": "MIT", "optional": true, "os": [ @@ -1634,7 +1626,6 @@ "cpu": [ "arm" ], - "dev": true, "license": "MIT", "optional": true, "os": [ @@ -1651,7 +1642,6 @@ "cpu": [ "arm64" ], - "dev": true, "license": "MIT", "optional": true, "os": [ @@ -1668,7 +1658,6 @@ "cpu": [ "arm64" ], - "dev": true, "license": "MIT", "optional": true, "os": [ @@ -1685,7 +1674,6 @@ "cpu": [ "ppc64" ], - "dev": true, "license": "MIT", "optional": true, "os": [ @@ -1702,7 +1690,6 @@ "cpu": [ "s390x" ], - "dev": true, "license": "MIT", "optional": true, "os": [ @@ -1719,7 +1706,6 @@ "cpu": [ "x64" ], - "dev": true, "license": "MIT", "optional": true, "os": [ @@ -1736,7 +1722,6 @@ "cpu": [ "x64" ], - "dev": true, "license": "MIT", "optional": true, "os": [ @@ -1753,7 +1738,6 @@ "cpu": [ "arm64" ], - "dev": true, "license": "MIT", "optional": true, "os": [ @@ -1770,7 +1754,6 @@ "cpu": [ "wasm32" ], - "dev": true, "license": "MIT", "optional": true, "dependencies": { @@ -1789,7 +1772,6 @@ "cpu": [ "arm64" ], - "dev": true, "license": "MIT", "optional": true, "os": [ @@ -1806,7 +1788,6 @@ "cpu": [ "x64" ], - "dev": true, "license": "MIT", "optional": true, "os": [ @@ -2112,7 +2093,6 @@ "version": "0.10.2", "resolved": "https://registry.npmjs.org/@tybys/wasm-util/-/wasm-util-0.10.2.tgz", "integrity": "sha512-RoBvJ2X0wuKlWFIjrwffGw1IqZHKQqzIchKaadZZfnNpsAYp2mM0h36JtPCjNDAHGgYez/15uMBpfGwchhiMgg==", - "dev": true, "license": "MIT", "optional": true, "dependencies": { @@ -7348,7 +7328,7 @@ "version": "1.21.7", "resolved": "https://registry.npmjs.org/jiti/-/jiti-1.21.7.tgz", "integrity": "sha512-/imKNG4EbWNrVjoNC/1H5/9GFy+tqjGBHCaSsN+P2RnPqjsLmv6UD3Ej+Kj8nBWaRAwyk7kK5ZUc+OEatnTR3A==", - "dev": true, + "devOptional": true, "license": "MIT", "bin": { "jiti": "bin/jiti.js" @@ -7883,7 +7863,6 @@ "cpu": [ "arm64" ], - "dev": true, "license": "MPL-2.0", "optional": true, "os": [ @@ -7904,7 +7883,6 @@ "cpu": [ "arm64" ], - "dev": true, "license": "MPL-2.0", "optional": true, "os": [ @@ -7925,7 +7903,6 @@ "cpu": [ "x64" ], - "dev": true, "license": "MPL-2.0", "optional": true, "os": [ @@ -7946,7 +7923,6 @@ "cpu": [ "x64" ], - "dev": true, "license": "MPL-2.0", "optional": true, "os": [ @@ -7967,7 +7943,6 @@ "cpu": [ "arm" ], - "dev": true, "license": "MPL-2.0", "optional": true, "os": [ @@ -7988,7 +7963,6 @@ "cpu": [ "arm64" ], - "dev": true, "license": "MPL-2.0", "optional": true, "os": [ @@ -8009,7 +7983,6 @@ "cpu": [ "arm64" ], - "dev": true, "license": "MPL-2.0", "optional": true, "os": [ @@ -8030,7 +8003,6 @@ "cpu": [ "x64" ], - "dev": true, "license": "MPL-2.0", "optional": true, "os": [ @@ -8051,7 +8023,6 @@ "cpu": [ "x64" ], - "dev": true, "license": "MPL-2.0", "optional": true, "os": [ @@ -8072,7 +8043,6 @@ "cpu": [ "arm64" ], - "dev": true, "license": "MPL-2.0", "optional": true, "os": [ @@ -8093,7 +8063,6 @@ "cpu": [ "x64" ], - "dev": true, "license": "MPL-2.0", "optional": true, "os": [ @@ -12781,21 +12750,6 @@ } } }, - "node_modules/svelte-check/node_modules/picomatch": { - "version": "4.0.4", - "resolved": "https://registry.npmjs.org/picomatch/-/picomatch-4.0.4.tgz", - "integrity": "sha512-QP88BAKvMam/3NxH6vj2o21R6MjxZUAd6nlwAS/pnGvN9IVLocLHxGYIzFhg6fUQ+5th6P4dv4eW9jX3DSIj7A==", - "dev": true, - "license": "MIT", - "optional": true, - "peer": true, - "engines": { - "node": ">=12" - }, - "funding": { - "url": "https://github.com/sponsors/jonschlinkert" - } - }, "node_modules/svelte-eslint-parser": { "version": "0.43.0", "resolved": "https://registry.npmjs.org/svelte-eslint-parser/-/svelte-eslint-parser-0.43.0.tgz", @@ -13535,7 +13489,7 @@ "version": "5.9.3", "resolved": "https://registry.npmjs.org/typescript/-/typescript-5.9.3.tgz", "integrity": "sha512-jl1vZzPDinLr9eUt3J/t7V6FgNEw9QjvBPdysz9KfQDD41fQrC2Y4vKQdiaUpFT4bXlb1RHhLpp8wtm6M5TgSw==", - "dev": true, + "devOptional": true, "license": "Apache-2.0", "bin": { "tsc": "bin/tsc", @@ -14321,9 +14275,9 @@ } }, "node_modules/windmill-parser-wasm-asset": { - "version": "1.728.1", - "resolved": "https://registry.npmjs.org/windmill-parser-wasm-asset/-/windmill-parser-wasm-asset-1.728.1.tgz", - "integrity": "sha512-73cyU6XM3gYEjFBx3qOKnv+VV1t70eAr6OiT+x0QobjFVNmqZFEdA7ayMYYVCnnix8OZxcsTNEF1hT61w1XiKw==" + "version": "1.740.0", + "resolved": "https://registry.npmjs.org/windmill-parser-wasm-asset/-/windmill-parser-wasm-asset-1.740.0.tgz", + "integrity": "sha512-Dgn5sQ93vJpqTQkv9iAy25+bqH2bUnIMdCfERb2Tia0Ql9T6iHbQhi4yzH9FbGW4Drv9GP46Qyetphqvp8vFOw==" }, "node_modules/windmill-parser-wasm-csharp": { "version": "1.510.1", diff --git a/frontend/package.json b/frontend/package.json index db92ba42f29d5..7d44ccbd7f536 100644 --- a/frontend/package.json +++ b/frontend/package.json @@ -154,7 +154,7 @@ "vscode-languageclient": "~9.0.1", "vscode-uri": "~3.1.0", "vscode-ws-jsonrpc": "~3.5.0", - "windmill-parser-wasm-asset": "1.728.1", + "windmill-parser-wasm-asset": "1.740.0", "windmill-parser-wasm-csharp": "1.510.1", "windmill-parser-wasm-go": "1.510.1", "windmill-parser-wasm-java": "1.510.1", diff --git a/frontend/src/lib/components/ScriptEditor.svelte b/frontend/src/lib/components/ScriptEditor.svelte index 54be27f787e43..f1944c73cccd6 100644 --- a/frontend/src/lib/components/ScriptEditor.svelte +++ b/frontend/src/lib/components/ScriptEditor.svelte @@ -104,6 +104,7 @@ import AssetsDropdownButton from './assets/AssetsDropdownButton.svelte' import { canHavePreprocessor } from '$lib/script_helpers' import { assetEq, type AssetWithAltAccessType } from './assets/lib' + import type { ColumnLineage } from './assets/AssetGraph/parsePipelineAnnotations' import { editor as meditor } from 'monaco-editor' import type { ReviewChangesOpts } from './copilot/chat/monaco-adapter' import GitRepoViewer from './GitRepoViewer.svelte' @@ -171,6 +172,11 @@ lastDeployedCode?: string | undefined disableAi?: boolean assets?: AssetWithAltAccessType[] + // Body-inferred column lineage (DuckDB SQL AST), surfaced alongside + // `assets` so the pipeline editor can render inferred column lineage on + // the live graph. Empty/undefined for non-DuckDB or when the parser + // build predates the inference. + inferredColumnLineage?: ColumnLineage[] modules?: { [key: string]: ScriptModule } | null editorBarRight?: import('svelte').Snippet enablePreprocessorSnippet?: boolean @@ -229,6 +235,7 @@ lastDeployedCode = undefined, disableAi = false, assets = $bindable(), + inferredColumnLineage = $bindable(), modules = $bindable(undefined), editorBarRight, enablePreprocessorSnippet = false, @@ -585,7 +592,13 @@ watch( () => inferAssetsRes.current, () => { - if (!inferAssetsRes.current || inferAssetsRes.current?.status === 'error') return + if (!inferAssetsRes.current || inferAssetsRes.current?.status === 'error') { + // Clear stale lineage on parse error / unset, so a script switch + // whose new body fails to parse can't leave the previous script's + // inferred column lineage bound to the new path. + if (inferredColumnLineage !== undefined) inferredColumnLineage = undefined + return + } let newAssets = inferAssetsRes.current.assets as AssetWithAltAccessType[] for (const asset of newAssets) { const old = assets?.find((a) => assetEq(a, asset)) @@ -593,6 +606,11 @@ } const normalizedAssets = newAssets.length > 0 ? newAssets : undefined if (!deepEqual(assets, normalizedAssets)) assets = normalizedAssets + + const newLineage = inferAssetsRes.current.column_lineage + const normalizedLineage = newLineage && newLineage.length > 0 ? newLineage : undefined + if (!deepEqual(inferredColumnLineage, normalizedLineage)) + inferredColumnLineage = normalizedLineage } ) diff --git a/frontend/src/lib/components/assets/AssetGraph/AssetGraphCanvas.svelte b/frontend/src/lib/components/assets/AssetGraph/AssetGraphCanvas.svelte index 81d5c58cd58e9..c13e9abc10337 100644 --- a/frontend/src/lib/components/assets/AssetGraph/AssetGraphCanvas.svelte +++ b/frontend/src/lib/components/assets/AssetGraph/AssetGraphCanvas.svelte @@ -219,6 +219,9 @@ // Producer's `// data_test` checks, on the write-edge to the // materialized asset — rendered as a flask badge on the link. data_tests?: NonNullable + // Producer's `// column` declared lineage, on the same write-edge — + // rendered as a columns badge on the link. + column_lineage?: NonNullable } // Graph-id of the script the user just launched (zero-latency hint), @@ -340,6 +343,28 @@ producerTests.set(`${r.usage_kind}:${r.path}`, r.data_tests) } } + // Producer → its `// column` declared lineage, keyed by runnable id, so + // the write-edge to the materialized asset can carry the columns badge — + // the edge *is* the transformation, and the lineage describes its output. + const producerColumnLineage = new Map< + string, + NonNullable + >() + // The `// materialize` target the lineage describes, so the badge lands + // only on that write-edge (a multi-output script writes several ducklake + // tables) — mirrors the anchor logic in `buildColumnGraph`. + const producerMaterializeTarget = new Map< + string, + NonNullable + >() + for (const r of g.runnables) { + if (r.column_lineage && r.column_lineage.length > 0) { + producerColumnLineage.set(`${r.usage_kind}:${r.path}`, r.column_lineage) + } + if (r.materialize_target) { + producerMaterializeTarget.set(`${r.usage_kind}:${r.path}`, r.materialize_target) + } + } for (const r of g.runnables) { const rid = `${r.usage_kind}:${r.path}` // Optimistic badge: the moment a run is launched from this view @@ -413,13 +438,23 @@ // write-edge carries the badge / custom-test nodes — a producer's // other (e.g. S3/datatable) outputs must not show them. const edgeTests = e.asset_kind === 'ducklake' ? producerTests.get(runnableId) : undefined + // Column lineage describes one materialized output, so the badge + // lands only on that asset's write-edge: the declared `// materialize` + // target when known (a multi-output script writes several ducklake + // tables), else the ducklake write-edge as the unambiguous fallback. + const matTarget = producerMaterializeTarget.get(runnableId) + const isOutputEdge = matTarget + ? e.asset_kind === matTarget.kind && e.asset_path === matTarget.path + : e.asset_kind === 'ducklake' + const edgeColumnLineage = isOutputEdge ? producerColumnLineage.get(runnableId) : undefined edges.push({ id: `prod:${runnableId}->${assetId}`, source: runnableId, target: assetId, kind: 'lineage-write', unsaved: e.unsaved, - data_tests: edgeTests + data_tests: edgeTests, + column_lineage: edgeColumnLineage }) // Each custom (`// data_test `) test → its own node // below the asset it validates, with a dashed "tests" edge. @@ -898,7 +933,10 @@ // producer's last-run status (the script fails if any test // fails) tints it green/red; neutral until it has run. data_tests: e.data_tests, - testsRunStatus: e.data_tests?.length ? runStates?.get(e.source)?.status : undefined + testsRunStatus: e.data_tests?.length ? runStates?.get(e.source)?.status : undefined, + // Column-lineage badge on the same write-edge (the link is the + // transformation whose output columns the lineage describes). + column_lineage: e.column_lineage }, animated, label, diff --git a/frontend/src/lib/components/assets/AssetGraph/AssetGraphDetailsPane.svelte b/frontend/src/lib/components/assets/AssetGraph/AssetGraphDetailsPane.svelte index b69db2e04ff9a..46ebaaf624035 100644 --- a/frontend/src/lib/components/assets/AssetGraph/AssetGraphDetailsPane.svelte +++ b/frontend/src/lib/components/assets/AssetGraph/AssetGraphDetailsPane.svelte @@ -25,7 +25,13 @@ import type { Schema } from '$lib/common' import type { AssetGraphSelection, PipelineMode } from './types' import PipelineScriptView from './PipelineScriptView.svelte' - import { parsePipelineAnnotations, type PipelineAnnotations } from './parsePipelineAnnotations' + import { + parsePipelineAnnotations, + type ColumnLineage, + type PipelineAnnotations + } from './parsePipelineAnnotations' + import ColumnLineageTrace from './ColumnLineageTrace.svelte' + import { assetColumnNodes, type ColumnLineageGraph } from './columnLineageGraph' import SummaryPathDisplay from '$lib/components/SummaryPathDisplay.svelte' import S3FilePreview from '$lib/components/S3FilePreview.svelte' import DataTablePreview from './DataTablePreview.svelte' @@ -72,7 +78,11 @@ // edges + synthesize asset nodes for drafts whose body has been // edited past the seeded template. Fires on every keystroke that // changes the inferred set. - onAssetsChange?: (scriptPath: string | undefined, assets: AssetWithAltAccessType[]) => void + onAssetsChange?: ( + scriptPath: string | undefined, + assets: AssetWithAltAccessType[], + columnLineage?: ColumnLineage[] + ) => void // Emits the live editor buffer on every keystroke so the parent can // autosave the in-flight content WITHOUT waiting for the pane teardown // (`onDraftPersist`). `onDraftPersist` stays the authoritative commit on @@ -114,6 +124,10 @@ path: string unsaved?: boolean }> + // Pipeline-wide column-lineage graph (built by the parent page from the + // resolved graph). Drives the transitive column-lineage trace shown for a + // selected materialized asset. + selectionColumnGraph?: ColumnLineageGraph // Whether the selected ducklake asset's schema can evolve (whole-table // `replace` producer). Forwarded to the Schema tab: version history when // true, a single fixed-schema view when false. Defaults to true (unknown). @@ -218,6 +232,7 @@ onScriptRenamed, onScriptRemoved, selectionProducers = [], + selectionColumnGraph, schemaCanEvolve = true, runsRefreshKey, runsPendingJobId, @@ -360,6 +375,10 @@ // edges as the user edits the body (e.g. renaming a CREATE TABLE // target updates the output asset node in real time). let liveBodyAssets = $state(undefined) + // Body-inferred column lineage (DuckDB SQL AST), bound out of ScriptEditor + // alongside `liveBodyAssets` and forwarded so the live graph can show + // inferred column lineage on the edited script before it deploys. + let liveColumnLineage = $state(undefined) // Bumped when the runs panel reports a watched job has reached a // terminal state. Drives S3FilePreview's refreshKey so the preview @@ -541,7 +560,8 @@ inPipeline: false, triggerAssets: [], nativeTriggers: [], - dataTests: [] + dataTests: [], + columnLineage: [] } ) $effect(() => { @@ -554,7 +574,7 @@ }) $effect(() => { if (readOnly) return - onAssetsChange?.(script?.path, liveBodyAssets ?? []) + onAssetsChange?.(script?.path, liveBodyAssets ?? [], liveColumnLineage) }) $effect(() => { if (readOnly) return @@ -999,7 +1019,21 @@ {#key selection.path} - +
+ {#if selectionColumnGraph && assetColumnNodes(selectionColumnGraph, selection.asset_kind, selection.path).length > 0} +
+ +
+ {/if} +
+ +
+
{/key} {:else}
@@ -1109,6 +1143,7 @@ bind:code={script.content} bind:schema={script.schema} bind:assets={liveBodyAssets} + bind:inferredColumnLineage={liveColumnLineage} {onTestStateChange} {args} /> diff --git a/frontend/src/lib/components/assets/AssetGraph/AssetGraphEdge.svelte b/frontend/src/lib/components/assets/AssetGraph/AssetGraphEdge.svelte index 65607cc5a65a7..c4bef2dd1f0bb 100644 --- a/frontend/src/lib/components/assets/AssetGraph/AssetGraphEdge.svelte +++ b/frontend/src/lib/components/assets/AssetGraph/AssetGraphEdge.svelte @@ -1,8 +1,8 @@ + +
+
+ + Column lineage + {#if targetLabel} + + {targetLabel} + {/if} + + {#if selected} + + {:else if component.size > 1} + click a column to trace + {/if} +
+ + {#if component.size === 0} + No column lineage for this asset. + {:else} +
+ + {#each edges as e (`${e.from}->${e.to}`)} + {@const hot = traced !== undefined && traced.has(e.from) && traced.has(e.to)} + {@const cold = traced !== undefined && !hot} + + {/each} + + + {#each [...component] as id (id)} + {@const n = graph.nodes.get(id)} + {@const p = pos.get(id)} + {#if n && p} + {@const isSeed = seedSet.has(id)} + + {/if} + {/each} +
+ {/if} +
diff --git a/frontend/src/lib/components/assets/AssetGraph/columnLineageGraph.test.ts b/frontend/src/lib/components/assets/AssetGraph/columnLineageGraph.test.ts new file mode 100644 index 0000000000000..6842dcfd86245 --- /dev/null +++ b/frontend/src/lib/components/assets/AssetGraph/columnLineageGraph.test.ts @@ -0,0 +1,162 @@ +import { describe, expect, it } from 'vitest' +import type { AssetGraphResponse } from './types' +import { + buildColumnGraph, + colNodeId, + traceColumn, + connectedComponent, + assetColumnNodes, + computeDepths +} from './columnLineageGraph' + +// Two scripts chained through an intermediate ducklake table: +// s1: orders.amount -> staging.amt +// s2: staging.amt -> daily.total +// s2: customers.name -> daily.cust (a second source into the sink) +function chainGraph(): AssetGraphResponse { + return { + assets: [], + triggers: [], + runnables: [ + { + path: 's1', + usage_kind: 'script', + column_lineage: [ + { + column: 'amt', + inputs: [{ from_kind: 'ducklake', from_path: 'wh/orders', from_column: 'amount' }] + } + ] + }, + { + path: 's2', + usage_kind: 'script', + column_lineage: [ + { + column: 'total', + inputs: [{ from_kind: 'ducklake', from_path: 'wh/staging', from_column: 'amt' }] + }, + { + column: 'cust', + inputs: [{ from_kind: 'ducklake', from_path: 'wh/customers', from_column: 'name' }] + } + ] + } + ], + edges: [ + { + runnable_path: 's1', + runnable_kind: 'script', + asset_kind: 'ducklake', + asset_path: 'wh/staging', + access_type: 'w' + }, + { + runnable_path: 's2', + runnable_kind: 'script', + asset_kind: 'ducklake', + asset_path: 'wh/daily', + access_type: 'w' + } + ] + } +} + +const ORDERS_AMOUNT = colNodeId('ducklake', 'wh/orders', 'amount') +const STAGING_AMT = colNodeId('ducklake', 'wh/staging', 'amt') +const DAILY_TOTAL = colNodeId('ducklake', 'wh/daily', 'total') +const DAILY_CUST = colNodeId('ducklake', 'wh/daily', 'cust') +const CUSTOMERS_NAME = colNodeId('ducklake', 'wh/customers', 'name') + +describe('buildColumnGraph', () => { + it('stitches per-script lineage into a transitive graph via shared columns', () => { + const g = buildColumnGraph(chainGraph()) + // orders.amount feeds staging.amt feeds daily.total + expect(g.up.get(STAGING_AMT)).toEqual(new Set([ORDERS_AMOUNT])) + expect(g.up.get(DAILY_TOTAL)).toEqual(new Set([STAGING_AMT])) + expect(g.down.get(ORDERS_AMOUNT)).toEqual(new Set([STAGING_AMT])) + expect(g.down.get(STAGING_AMT)).toEqual(new Set([DAILY_TOTAL])) + }) + + it('anchors to the // materialize target, not a guessed write-edge', () => { + const graph = chainGraph() + // s1 declares its materialize target and also has unordered extra + // ducklake writes; the lineage must anchor to the declared target. + const s1 = graph.runnables.find((r) => r.path === 's1')! + s1.materialize_target = { kind: 'ducklake', path: 'wh/staging' } + graph.edges.unshift({ + runnable_path: 's1', + runnable_kind: 'script', + asset_kind: 'ducklake', + asset_path: 'wh/other', + access_type: 'w' + }) + const g = buildColumnGraph(graph) + expect(g.nodes.has(STAGING_AMT)).toBe(true) // anchored to the declared target + expect(g.nodes.has(colNodeId('ducklake', 'wh/other', 'amt'))).toBe(false) + }) + + it('falls back to a ducklake write-edge when there is no materialize target', () => { + // chainGraph's runnables carry no materialize_target, so s1's lineage is + // anchored via its (single) ducklake write-edge. + const g = buildColumnGraph(chainGraph()) + expect(g.nodes.has(STAGING_AMT)).toBe(true) + }) + + it('node ids are collision-proof across `#` / `:` in paths and columns', () => { + // A delimiter-concatenated id would merge these; the JSON-encoded id must not. + expect(colNodeId('ducklake', 'a#b', 'c')).not.toBe(colNodeId('ducklake', 'a', 'b#c')) + expect(colNodeId('ducklake', 'a:b', 'c')).not.toBe(colNodeId('ducklake', 'a', 'b:c')) + }) + + it('skips producers with no ducklake output asset (columns unanchorable)', () => { + const graph = chainGraph() + graph.edges = graph.edges.filter((e) => e.runnable_path !== 's1') // s1 loses its output edge + const g = buildColumnGraph(graph) + // staging.amt is no longer produced as a node by s1... + expect(g.up.has(STAGING_AMT)).toBe(false) + // ...but s2 still anchors daily.total ← staging.amt (staging.amt as a source). + expect(g.up.get(DAILY_TOTAL)).toEqual(new Set([STAGING_AMT])) + }) +}) + +describe('traceColumn', () => { + it('returns the full upstream + downstream impact set of a source column', () => { + const g = buildColumnGraph(chainGraph()) + // from the root source, the whole chain downstream is impacted + expect(traceColumn(ORDERS_AMOUNT, g)).toEqual( + new Set([ORDERS_AMOUNT, STAGING_AMT, DAILY_TOTAL]) + ) + }) + + it('traces backward from a sink to every contributing source', () => { + const g = buildColumnGraph(chainGraph()) + expect(traceColumn(DAILY_TOTAL, g)).toEqual(new Set([DAILY_TOTAL, STAGING_AMT, ORDERS_AMOUNT])) + // the sibling output `cust` and its source are NOT in total's trace + expect(traceColumn(DAILY_TOTAL, g).has(DAILY_CUST)).toBe(false) + expect(traceColumn(DAILY_TOTAL, g).has(CUSTOMERS_NAME)).toBe(false) + }) + + it('traces an intermediate column both directions', () => { + const g = buildColumnGraph(chainGraph()) + expect(traceColumn(STAGING_AMT, g)).toEqual(new Set([STAGING_AMT, ORDERS_AMOUNT, DAILY_TOTAL])) + }) +}) + +describe('connectedComponent + depths', () => { + it('collects the neighborhood of an asset and lays it out by hop depth', () => { + const g = buildColumnGraph(chainGraph()) + const seeds = assetColumnNodes(g, 'ducklake', 'wh/daily') // the sink asset + expect(new Set(seeds)).toEqual(new Set([DAILY_TOTAL, DAILY_CUST])) + const comp = connectedComponent(seeds, g) + expect(comp).toEqual( + new Set([DAILY_TOTAL, DAILY_CUST, STAGING_AMT, ORDERS_AMOUNT, CUSTOMERS_NAME]) + ) + const depths = computeDepths(comp, g) + expect(depths.get(ORDERS_AMOUNT)).toBe(0) + expect(depths.get(STAGING_AMT)).toBe(1) + expect(depths.get(DAILY_TOTAL)).toBe(2) + expect(depths.get(CUSTOMERS_NAME)).toBe(0) + expect(depths.get(DAILY_CUST)).toBe(1) + }) +}) diff --git a/frontend/src/lib/components/assets/AssetGraph/columnLineageGraph.ts b/frontend/src/lib/components/assets/AssetGraph/columnLineageGraph.ts new file mode 100644 index 0000000000000..50fa1c4e7bc5f --- /dev/null +++ b/frontend/src/lib/components/assets/AssetGraph/columnLineageGraph.ts @@ -0,0 +1,169 @@ +import type { AssetKind } from '$lib/gen' +import type { AssetGraphResponse } from './types' + +// A node in the column-level lineage graph: one column of one asset. +export type ColumnNode = { kind: AssetKind; path: string; column: string } +export type ColumnNodeId = string + +// Collision-proof node id — JSON-encoded tuple, so a `#`/`:` inside a path or +// (quoted) column name can't merge two distinct columns into one node. +export function colNodeId(kind: AssetKind, path: string, column: string): ColumnNodeId { + return JSON.stringify([kind, path, column]) +} + +// The pipeline-wide column-lineage graph, stitched across every producer. Each +// producer's `column_lineage` contributes single-hop edges (its output column ← +// its source columns); shared (asset,column) nodes chain those hops into the +// full transitive graph (`orders.amount → staging.amt → daily.total`). +export type ColumnLineageGraph = { + nodes: Map + // outputColumn → the source columns it derives from (walk upstream). + up: Map> + // sourceColumn → the output columns derived from it (walk downstream). + down: Map> +} + +// Build the column graph from a resolved asset graph. A producer's +// `column_lineage` describes the columns of the asset it materializes; that +// output asset is the ducklake target it writes (v1 materialize target), found +// from its write-edge. Producers without a known ducklake output are skipped +// (their columns can't be anchored to an asset node). +export function buildColumnGraph(graph: AssetGraphResponse): ColumnLineageGraph { + const nodes = new Map() + const up = new Map>() + const down = new Map>() + + const addNode = (n: ColumnNode): ColumnNodeId => { + const id = colNodeId(n.kind, n.path, n.column) + if (!nodes.has(id)) nodes.set(id, n) + return id + } + const addEdge = (src: ColumnNodeId, out: ColumnNodeId) => { + if (src === out) return + ;(up.get(out) ?? up.set(out, new Set()).get(out)!).add(src) + ;(down.get(src) ?? down.set(src, new Set()).get(src)!).add(out) + } + + // The output asset a runnable's `column_lineage` describes. The declared + // `// materialize` target is authoritative (a multi-output script writes + // several ducklake tables, and the deployed write-edges are unordered, so + // picking "a" write-edge can anchor to the wrong asset). Fall back to a + // ducklake write-edge only for producers with no materialize annotation + // (e.g. a literal single-output CTAS). + const outputAsset = new Map() + for (const r of graph.runnables ?? []) { + if (r.materialize_target) { + outputAsset.set(`${r.usage_kind}:${r.path}`, r.materialize_target) + } + } + for (const e of graph.edges ?? []) { + const access = e.access_type ?? 'r' + const key = `${e.runnable_kind}:${e.runnable_path}` + if ( + (access === 'w' || access === 'rw') && + e.asset_kind === 'ducklake' && + !outputAsset.has(key) + ) { + outputAsset.set(key, { kind: e.asset_kind, path: e.asset_path }) + } + } + + for (const r of graph.runnables ?? []) { + const lineage = r.column_lineage + if (!lineage || lineage.length === 0) continue + const out = outputAsset.get(`${r.usage_kind}:${r.path}`) + if (!out) continue + for (const cl of lineage) { + const outId = addNode({ kind: out.kind, path: out.path, column: cl.column }) + for (const inp of cl.inputs) { + const srcId = addNode({ + kind: inp.from_kind, + path: inp.from_path, + column: inp.from_column + }) + addEdge(srcId, outId) + } + } + } + + return { nodes, up, down } +} + +// Every node reachable from `start` by following `adj` (transitive closure, +// excluding `start` itself). Iterative to avoid deep-recursion limits. +function reach(start: ColumnNodeId, adj: Map>): Set { + const seen = new Set() + const stack = [start] + while (stack.length) { + const n = stack.pop()! + for (const m of adj.get(n) ?? []) { + if (!seen.has(m)) { + seen.add(m) + stack.push(m) + } + } + } + return seen +} + +// The full transitive trace of a column: itself + all upstream ancestors + all +// downstream descendants. This is the impact set — "everything that feeds, or +// is fed by, this column". +export function traceColumn(id: ColumnNodeId, g: ColumnLineageGraph): Set { + const out = new Set([id]) + for (const a of reach(id, g.up)) out.add(a) + for (const d of reach(id, g.down)) out.add(d) + return out +} + +// The connected neighborhood of a set of seed columns (an asset's columns): +// the seeds plus everything upstream and downstream of any of them. This is the +// subgraph the trace view renders around a selected asset. +export function connectedComponent( + seeds: ColumnNodeId[], + g: ColumnLineageGraph +): Set { + const out = new Set() + for (const s of seeds) { + if (!g.nodes.has(s)) continue + out.add(s) + for (const a of reach(s, g.up)) out.add(a) + for (const d of reach(s, g.down)) out.add(d) + } + return out +} + +// All column-node ids belonging to one asset (its seed set for a trace). +export function assetColumnNodes( + g: ColumnLineageGraph, + kind: AssetKind, + path: string +): ColumnNodeId[] { + const ids: ColumnNodeId[] = [] + for (const [id, n] of g.nodes) if (n.kind === kind && n.path === path) ids.push(id) + return ids +} + +// Longest-path depth of each node within `ids`, sources at depth 0 and depth +// increasing downstream — so a left→right layout reads upstream→downstream. +// Cycle-guarded (lineage is a DAG, but be defensive). +export function computeDepths( + ids: Set, + g: ColumnLineageGraph +): Map { + const depth = new Map() + const visiting = new Set() + const d = (id: ColumnNodeId): number => { + const memo = depth.get(id) + if (memo !== undefined) return memo + if (visiting.has(id)) return 0 + visiting.add(id) + let m = 0 + for (const u of g.up.get(id) ?? []) if (ids.has(u)) m = Math.max(m, d(u) + 1) + visiting.delete(id) + depth.set(id, m) + return m + } + for (const id of ids) d(id) + return depth +} diff --git a/frontend/src/lib/components/assets/AssetGraph/parsePipelineAnnotations.parity.test.ts b/frontend/src/lib/components/assets/AssetGraph/parsePipelineAnnotations.parity.test.ts index 008b903ad99f9..3eb1e4e57420a 100644 --- a/frontend/src/lib/components/assets/AssetGraph/parsePipelineAnnotations.parity.test.ts +++ b/frontend/src/lib/components/assets/AssetGraph/parsePipelineAnnotations.parity.test.ts @@ -19,7 +19,8 @@ const ASSERTED_TS_FIELDS: Record = { tag: true, retry: true, materialize: true, - dataTests: true + dataTests: true, + columnLineage: true } // Parser-parity guard: this TS parser (drives the live graph preview) and @@ -71,6 +72,9 @@ type Fixture = { // corpus drives both sides. The TS parser emits this shape verbatim // (snake_case fields), so the comparison is 1:1. Absent === []. data_tests?: Array> + // Snake_case `ColumnLineage` serde shape — TS parser emits it verbatim, + // so the comparison is 1:1. Absent === []. + column_lineage?: Array> } } @@ -161,6 +165,8 @@ describe('parsePipelineAnnotations matches the shared Rust fixture corpus', () = } expect(got.dataTests, 'data tests').toEqual(f.expected.data_tests ?? []) + + expect(got.columnLineage, 'column lineage').toEqual(f.expected.column_lineage ?? []) }) } }) diff --git a/frontend/src/lib/components/assets/AssetGraph/parsePipelineAnnotations.test.ts b/frontend/src/lib/components/assets/AssetGraph/parsePipelineAnnotations.test.ts index 15b12b959db53..5f4b063e3fcd6 100644 --- a/frontend/src/lib/components/assets/AssetGraph/parsePipelineAnnotations.test.ts +++ b/frontend/src/lib/components/assets/AssetGraph/parsePipelineAnnotations.test.ts @@ -1,5 +1,9 @@ import { describe, expect, it } from 'vitest' -import { parsePipelineAnnotations } from './parsePipelineAnnotations' +import { + mergeColumnLineage, + parsePipelineAnnotations, + type ColumnLineage +} from './parsePipelineAnnotations' // Unit-test the TS mirror of the backend `parse_pipeline_annotations`. The // two implementations MUST stay behaviorally identical — these tests are @@ -131,3 +135,29 @@ describe('parsePipelineAnnotations: data_upload', () => { expect(out.nativeTriggers).toEqual([]) }) }) + +describe('mergeColumnLineage', () => { + const ref = (path: string, col: string): ColumnLineage['inputs'][number] => ({ + from_kind: 'ducklake', + from_path: path, + from_column: col + }) + + it('annotation wins per output column; inferred fills the rest (mirrors Rust)', () => { + const inferred: ColumnLineage[] = [ + { column: 'total', inputs: [ref('w/o', 'amount')] }, + { column: 'qty', inputs: [ref('w/o', 'qty')] } + ] + const annotated: ColumnLineage[] = [{ column: 'total', inputs: [ref('w/manual', 'grand')] }] + const merged = mergeColumnLineage(inferred, annotated) + expect(merged).toEqual([ + { column: 'total', inputs: [ref('w/manual', 'grand')] }, // annotation, first + authoritative + { column: 'qty', inputs: [ref('w/o', 'qty')] } // inferred, not overridden + ]) + }) + + it('returns annotations unchanged when there is no inferred lineage', () => { + const annotated: ColumnLineage[] = [{ column: 'a', inputs: [ref('w/o', 'a')] }] + expect(mergeColumnLineage([], annotated)).toEqual(annotated) + }) +}) diff --git a/frontend/src/lib/components/assets/AssetGraph/parsePipelineAnnotations.ts b/frontend/src/lib/components/assets/AssetGraph/parsePipelineAnnotations.ts index a788acdbe085c..ccbc746b04a40 100644 --- a/frontend/src/lib/components/assets/AssetGraph/parsePipelineAnnotations.ts +++ b/frontend/src/lib/components/assets/AssetGraph/parsePipelineAnnotations.ts @@ -113,6 +113,44 @@ export type DataTest = } | { type: 'custom'; path: string } +// `// column <- .[, …]` — declared column-level +// lineage: one output column and the upstream source columns it derives from. +// See backend `ColumnLineage`. A sibling of `DataTest` in the extensible +// annotation family — same parse shape, accumulating (one line per output +// column) — but pure metadata: drives the column-lineage graph view, runs no +// probe. Field names are snake_case to match the Rust `ColumnLineage` serde +// output verbatim, so the live-draft parse and the backend graph endpoint +// (deployed nodes) produce wire-identical shapes. +export type ColumnRef = { + from_kind: AssetKind + from_path: string + from_column: string +} +export type ColumnLineage = { + column: string + inputs: ColumnRef[] +} + +// Combine body-inferred column lineage with `// column` annotations, the +// annotation winning per output column. Mirrors the Rust `merge_column_lineage` +// (`asset_parser.rs`) so the live-draft preview matches what deploys: the +// backend already merges inferred + annotated server-side, and the live graph +// must apply the same precedence to the WASM-inferred lineage. +export function mergeColumnLineage( + inferred: ColumnLineage[], + annotated: ColumnLineage[] +): ColumnLineage[] { + const seen = new Set(annotated.map((c) => c.column)) + const out = [...annotated] + for (const c of inferred) { + if (!seen.has(c.column)) { + seen.add(c.column) + out.push(c) + } + } + return out +} + export type PipelineAnnotations = { inPipeline: boolean triggerAssets: PipelineTriggerAsset[] @@ -125,6 +163,8 @@ export type PipelineAnnotations = { materialize?: MaterializeSpec // `// data_test …` — accumulating data-quality checks (multiple lines). dataTests: DataTest[] + // `// column <- .[, …]` — accumulating column lineage. + columnLineage: ColumnLineage[] } // Tokenize a `key=value [key="quoted value"] ...` option string. Bare @@ -261,6 +301,37 @@ function parseRelationships(s: string): DataTest | undefined { } } +// `.` upstream reference. The column is the segment after the +// final `.`; the rest is the asset URI (default-syntax shorthands enabled). +// Mirrors Rust `parse_column_ref`. +function parseColumnRef(s: string): ColumnRef | undefined { + const dot = s.lastIndexOf('.') + if (dot < 0) return undefined + const fromColumn = singleIdent(s.slice(dot + 1)) + if (!fromColumn) return undefined + const asset = parseAssetSyntaxDefault(s.slice(0, dot).trim()) + if (!asset || asset.path === '') return undefined + return { from_kind: asset.kind, from_path: asset.path, from_column: fromColumn } +} + +// ` <- [, …]`. Individually malformed refs are dropped; the +// line is kept iff ≥1 ref parses (mirrors `parseAcceptedValues`). A missing +// `<-`, a non-ident output column, or zero valid refs drops the line. +// Mirrors Rust `parse_column_lineage_spec`. +function parseColumnLineageSpec(s: string): ColumnLineage | undefined { + const arrow = s.indexOf('<-') + if (arrow < 0) return undefined + const column = singleIdent(s.slice(0, arrow)) + if (!column) return undefined + const inputs = s + .slice(arrow + 2) + .split(',') + .map((r) => parseColumnRef(r.trim())) + .filter((r): r is ColumnRef => r !== undefined) + if (inputs.length === 0) return undefined + return { column, inputs } +} + // Parse a `// data_test …` right-hand side into one `DataTest`. The // leading token selects the variant; anything not a built-in keyword is the // `custom` escape hatch (a single script-path token). Returns `undefined` for @@ -398,7 +469,8 @@ export function parsePipelineAnnotations(code: string): PipelineAnnotations { inPipeline: false, triggerAssets: [], nativeTriggers: [], - dataTests: [] + dataTests: [], + columnLineage: [] } for (const rawLine of code.split('\n')) { @@ -475,6 +547,15 @@ export function parsePipelineAnnotations(code: string): PipelineAnnotations { continue } + // `column` is a complete word; a body comment that merely starts with + // `column` has no `<-` and is dropped fail-safe. Accumulates. + const afterColumn = consumeKeyword(inner, 'column') + if (afterColumn !== undefined) { + const spec = parseColumnLineageSpec(afterColumn.trim()) + if (spec) out.columnLineage.push(spec) + continue + } + const afterOn = consumeKeyword(inner, 'on') if (afterOn !== undefined) { const specText = afterOn.trim() diff --git a/frontend/src/lib/components/assets/AssetGraph/resolveGraph.ts b/frontend/src/lib/components/assets/AssetGraph/resolveGraph.ts index 0521051375f67..b7c82822fae4a 100644 --- a/frontend/src/lib/components/assets/AssetGraph/resolveGraph.ts +++ b/frontend/src/lib/components/assets/AssetGraph/resolveGraph.ts @@ -1,5 +1,10 @@ import type { AssetGraphResponse, NativeTriggerKind } from './types' -import { parsePipelineAnnotations, type PipelineAnnotations } from './parsePipelineAnnotations' +import { + mergeColumnLineage, + parsePipelineAnnotations, + type ColumnLineage, + type PipelineAnnotations +} from './parsePipelineAnnotations' import { extractWrites, extractReads, @@ -20,7 +25,12 @@ export type ResolveGraphInput = { /** In-flight drafts keyed by script path. */ drafts: Map /** Body assets inferred for the currently-open script (live keystrokes). */ - liveBodyAssets: { scriptPath: string | undefined; assets: AssetWithAltAccessType[] } + liveBodyAssets: { + scriptPath: string | undefined + assets: AssetWithAltAccessType[] + /** Body-inferred column lineage (DuckDB SQL AST) for the open script. */ + columnLineage?: ColumnLineage[] + } /** Pipeline annotations parsed from the currently-open buffer. */ liveAnnotations: { scriptPath: string | undefined; annotations: PipelineAnnotations } /** Sticky session caches of inferred body writes/reads per script path. */ @@ -222,6 +232,19 @@ function seedDraftOverlays(acc: Accumulator, input: ResolveGraphInput) { for (const [path, d] of drafts) { const parsed = parsePipelineAnnotations(d.script.content) + // For the open script, fold in the WASM-inferred column lineage (DuckDB + // SQL AST) under the same annotation-wins precedence the backend applies + // on deploy, so the live preview matches what deploys. Only the open + // script carries live inference (`liveBodyAssets`); other drafts stay + // annotation-only until they deploy (the backend infers then). + const inferredCL = + path === liveBodyAssets.scriptPath ? (liveBodyAssets.columnLineage ?? []) : [] + const mergedCL = mergeColumnLineage(inferredCL, parsed.columnLineage) + // The `// materialize` target this draft's column lineage describes, so + // the column graph anchors to it rather than guessing a write-edge. + const materializeTarget = parsed.materialize + ? { kind: parsed.materialize.targetKind, path: parsed.materialize.targetPath } + : undefined // A draft can coexist with a base entry — during save the refetch // lands before drafts cleanup, and a user re-editing a deployed // script also produces both. In that case we mutate the existing @@ -240,15 +263,20 @@ function seedDraftOverlays(acc: Accumulator, input: ResolveGraphInput) { tag: parsed.tag, retry: parsed.retry, data_tests: parsed.dataTests.length > 0 ? parsed.dataTests : undefined, + column_lineage: mergedCL.length > 0 ? mergedCL : undefined, + materialize_target: materializeTarget, unsaved: true }) } else { // Refresh annotation-derived badges from the live parse too, so - // adding/removing `// data_test` lines on an already-deployed script - // updates the badge immediately (not only after redeploy/refetch). + // adding/removing `// data_test` / `// column` lines on an + // already-deployed script updates the badge immediately (not only + // after redeploy/refetch). runnables[baseIdx] = { ...runnables[baseIdx], data_tests: parsed.dataTests.length > 0 ? parsed.dataTests : undefined, + column_lineage: mergedCL.length > 0 ? mergedCL : undefined, + materialize_target: materializeTarget, unsaved: true } } diff --git a/frontend/src/lib/components/assets/AssetGraph/types.ts b/frontend/src/lib/components/assets/AssetGraph/types.ts index c942d289349e4..9276f500afd0c 100644 --- a/frontend/src/lib/components/assets/AssetGraph/types.ts +++ b/frontend/src/lib/components/assets/AssetGraph/types.ts @@ -1,5 +1,5 @@ import type { AssetKind } from '$lib/gen' -import type { DataTest } from './parsePipelineAnnotations' +import type { ColumnLineage, DataTest } from './parsePipelineAnnotations' export type GraphUsageKind = 'script' | 'flow' @@ -33,6 +33,14 @@ export interface AssetGraphRunnableNode { // asset. Surfaced as a count badge (with a per-test breakdown in the title) // so test coverage is visible on the node without opening the pane. data_tests?: DataTest[] + // `// column <- .` declared column-level lineage for this + // script's materialized output. Surfaced as a count badge on the write-edge + // and as a column-to-column diagram in the asset details pane. + column_lineage?: ColumnLineage[] + // `// materialize ` target — the asset `column_lineage` describes. + // Lets the column graph anchor lineage to the exact output instead of + // guessing a ducklake write-edge (a multi-output script writes several). + materialize_target?: { kind: AssetKind; path: string } // Managed `// materialize` write strategy. Absent for non-materializing or // `manual` scripts. Used (with `partition_kind`) to decide whether a // produced asset's schema can evolve: only whole-table `replace` can, since diff --git a/frontend/src/lib/infer.ts b/frontend/src/lib/infer.ts index 7c8b83a64f0ea..a70dd7c7457a8 100644 --- a/frontend/src/lib/infer.ts +++ b/frontend/src/lib/infer.ts @@ -68,6 +68,7 @@ import wasmUrlWac from 'windmill-parser-wasm-wac/windmill_parser_wasm_bg.wasm?ur import { workspaceStore } from './stores.js' import { argSigToJsonSchemaType } from 'windmill-utils-internal' import { type AssetWithAccessType } from './components/assets/lib.js' +import { type ColumnLineage } from './components/assets/AssetGraph/parsePipelineAnnotations' const loadSchemaLastRun = writable< | [ @@ -169,6 +170,10 @@ type InferAssetsResult = assets: AssetWithAccessType[] sql_queries?: InferAssetsSqlQueryDetails[] columns?: Record + // Body-inferred column lineage (DuckDB SQL AST). Present once the + // `windmill-parser-wasm-asset` package is rebuilt with the inference; + // the spread below already forwards it from the parser output. + column_lineage?: ColumnLineage[] } | { status: 'error' diff --git a/frontend/src/routes/(root)/(logged)/pipeline/[folder]/+page.svelte b/frontend/src/routes/(root)/(logged)/pipeline/[folder]/+page.svelte index 3548ba60179b0..3056245e51e01 100644 --- a/frontend/src/routes/(root)/(logged)/pipeline/[folder]/+page.svelte +++ b/frontend/src/routes/(root)/(logged)/pipeline/[folder]/+page.svelte @@ -33,8 +33,13 @@ import PipelineModeToggle from '$lib/components/assets/AssetGraph/PipelineModeToggle.svelte' import { parsePipelineAnnotations, + type ColumnLineage, type PipelineAnnotations } from '$lib/components/assets/AssetGraph/parsePipelineAnnotations' + import { + buildColumnGraph, + type ColumnLineageGraph + } from '$lib/components/assets/AssetGraph/columnLineageGraph' import { resolveGraph } from '$lib/components/assets/AssetGraph/resolveGraph' import { computeDownstreamClosure, @@ -472,7 +477,8 @@ inPipeline: false, triggerAssets: [], nativeTriggers: [], - dataTests: [] + dataTests: [], + columnLineage: [] } }) @@ -485,6 +491,7 @@ let liveBodyAssets = $state<{ scriptPath: string | undefined assets: AssetWithAltAccessType[] + columnLineage?: ColumnLineage[] }>({ scriptPath: undefined, assets: [] }) // The open draft's live editor buffer, emitted by the pane on every @@ -502,7 +509,13 @@ const EMPTY_LIVE_ASSETS = { scriptPath: undefined, assets: [] } const EMPTY_LIVE_ANNOTATIONS = { scriptPath: undefined, - annotations: { inPipeline: false, triggerAssets: [], nativeTriggers: [], dataTests: [] } + annotations: { + inPipeline: false, + triggerAssets: [], + nativeTriggers: [], + dataTests: [], + columnLineage: [] + } } // Reset every live editor overlay (annotations / body assets / content) @@ -1190,14 +1203,18 @@ ) { liveAnnotations = { scriptPath, annotations } } - function handleAssetsChange(scriptPath: string | undefined, assets: AssetWithAltAccessType[]) { + function handleAssetsChange( + scriptPath: string | undefined, + assets: AssetWithAltAccessType[], + columnLineage?: ColumnLineage[] + ) { // Single update site for the live overlay. `inferredWritesByPath` // / `inferredReadsByPath` are now derived from `liveBodyAssets` // (for the open script) + `inferredAssetsByPath` (prefetched // snapshot for every other script), so we don't have to write // into those caches here — the derive picks up our update on the // next reactive tick. - liveBodyAssets = { scriptPath, assets } + liveBodyAssets = { scriptPath, assets, columnLineage } } function handleContentChange(scriptPath: string | undefined, content: string) { liveContent = { scriptPath, content } @@ -1958,6 +1975,27 @@ .map((e) => ({ kind: e.runnable_kind, path: e.runnable_path, unsaved: e.unsaved })) }) + // Empty graph reused when the trace isn't shown (no ducklake-asset selection, + // or a draft is actively edited) so the pane blanks out like the other + // selection overlays and `buildColumnGraph` doesn't run. + const EMPTY_COLUMN_GRAPH: ColumnLineageGraph = { + nodes: new Map(), + up: new Map(), + down: new Map() + } + // Pipeline-wide column-lineage graph, stitched across every producer's + // (inferred + annotated) `column_lineage` and the asset write-edges. Drives + // the transitive column trace in the details pane. Built from `displayGraph` + // — the exact graph the canvas renders — so the trace matches it: draft + // overlays in edit / show-drafts, deployed-only in plain View. Gated to a + // ducklake-asset selection so it isn't rebuilt on every editor keystroke when + // the trace UI isn't even shown. + let columnGraph = $derived( + selection?.kind === 'asset' && selection.asset_kind === 'ducklake' + ? buildColumnGraph(displayGraph) + : EMPTY_COLUMN_GRAPH + ) + // Whether the selected ducklake asset's captured schema can *evolve* (drives // the asset panel's Schema tab: version history vs. a single fixed schema). // Only a whole-table `replace` producer (CREATE OR REPLACE) can change @@ -2552,6 +2590,7 @@ onRunByPath={runByPathLegit} selection={activeDraft ? undefined : selection} selectionProducers={activeDraft ? [] : selectionProducers} + selectionColumnGraph={activeDraft ? EMPTY_COLUMN_GRAPH : columnGraph} {schemaCanEvolve} {runsRefreshKey} {runsPendingJobId}