Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
75cd4d7
fix: preserve subquery structure when unparsing SubqueryAlias over Ag…
yonatan-sevenai Mar 22, 2026
2f8f667
Merge branch 'main' into main
yonatan-sevenai Mar 27, 2026
0d223f9
fix: preserve subquery structure when unparsing SubqueryAlias over Ag…
yonatan-sevenai Mar 27, 2026
42f7f64
Fixes in PR
yonatan-sevenai Apr 4, 2026
ae2fdcf
Merge branch 'main' into main
yonatan-sevenai Apr 4, 2026
9a31259
test: add tests for aggregate over subquery unparsing
yonatan-sevenai Apr 4, 2026
66629b7
test: add no-outer-rename aggregate unparse test
yonatan-sevenai Apr 4, 2026
da4306c
fix: fold Limit/Sort into outer SELECT when Projection claims Aggrega…
yonatan-sevenai Apr 4, 2026
3e36fbe
Merge branch 'main' into bug/unparse_over_aggregate
yonatan-sevenai Apr 4, 2026
e202ffd
Merge remote-tracking branch 'datafusion/main' into HEAD
yonatan-sevenai Apr 22, 2026
5e25008
fix(unparser): fold stacked Limit/Sort between Projection and Aggregate
yonatan-sevenai Apr 24, 2026
65b6897
Merge remote-tracking branch 'datafusion/main' into bug/unparse_over_…
yonatan-sevenai Apr 24, 2026
8c41d88
Merge branch 'main' into bug/unparse_over_aggregate
yonatan-sevenai Apr 24, 2026
2808173
Merge remote-tracking branch 'datafusion/main' into bug/unparse_over_…
yonatan-sevenai Apr 30, 2026
5048c09
Fix multiple limit sort levels
yonatan-sevenai Apr 30, 2026
40b6b85
Merge branch 'main' into bug/unparse_over_aggregate
yonatan-sevenai Apr 30, 2026
eda9498
Merge remote-tracking branch 'datafusion/main' into bug/unparse_over_…
yonatan-sevenai May 15, 2026
230d6ac
Only fold Sort/Limit chains that map cleanly onto a single SELECT
yonatan-sevenai May 16, 2026
c57db1f
Split Sort arm into "with fetch" and "without fetch" cases
yonatan-sevenai May 17, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
282 changes: 267 additions & 15 deletions datafusion/sql/src/unparser/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,12 +45,13 @@ use datafusion_common::{
Column, DataFusionError, Result, ScalarValue, TableReference, assert_or_internal_err,
internal_datafusion_err, internal_err, not_impl_err,
tree_node::{Transformed, TransformedResult, TreeNode, TreeNodeRecursion},
utils::combine_limit,
};
use datafusion_expr::expr::{OUTER_REFERENCE_COLUMN_PREFIX, UNNEST_COLUMN_PREFIX};
use datafusion_expr::{
Aggregate, BinaryExpr, Distinct, Expr, JoinConstraint, JoinType, LogicalPlan,
LogicalPlanBuilder, Operator, Projection, SortExpr, TableScan, Unnest,
UserDefinedLogicalNode, Window, expr::Alias,
Aggregate, BinaryExpr, Distinct, Expr, FetchType, JoinConstraint, JoinType,
LogicalPlan, LogicalPlanBuilder, Operator, Projection, SkipType, Sort, SortExpr,
TableScan, Unnest, UserDefinedLogicalNode, Window, expr::Alias,
};
use sqlparser::ast::{self, Ident, OrderByKind, SetExpr, TableAliasColumnDef};
use std::{sync::Arc, vec};
Expand Down Expand Up @@ -226,15 +227,29 @@ impl Unparser<'_> {
Ok(SetExpr::Select(Box::new(select_builder.build()?)))
}

/// Reconstructs a SELECT SQL statement from a logical plan by unprojecting column expressions
/// found in a [Projection] node. This requires scanning the plan tree for relevant Aggregate
/// and Window nodes and matching column expressions to the appropriate agg or window expressions.
/// Reconstructs a SELECT SQL statement from a logical plan by
/// unprojecting column expressions found in a [Projection] node. This
/// requires scanning the plan tree for relevant Aggregate and Window
/// nodes and matching column expressions to the appropriate agg or
/// window expressions.
///
/// `fully_absorbed` reports whether the Projection arm was able to
/// absorb every `Sort`/`Limit` node between this Projection and the
/// Aggregate/Window into the current SELECT. When `false`, the
/// Aggregate/Window will end up in a derived subquery, so we fall
/// back to passthrough column references that resolve against that
/// subquery's output instead of unprojecting onto the original
/// aggregate expressions.
///
/// Returns `true` if an Aggregate node was found and claimed for this
/// SELECT.
fn reconstruct_select_statement(
&self,
plan: &LogicalPlan,
p: &Projection,
select: &mut SelectBuilder,
) -> Result<()> {
fully_absorbed: bool,
) -> Result<bool> {
let mut exprs = p.expr.clone();

// If an Unnest node is found within the select, find and unproject the unnest column
Expand Down Expand Up @@ -277,10 +292,24 @@ impl Unparser<'_> {
.collect::<Result<Vec<_>>>()?;
}

match (
find_agg_node_within_select(plan, true),
find_window_nodes_within_select(plan, None, true),
) {
// When some Sort/Limit nodes between this Projection and the
// Aggregate/Window couldn't be absorbed into the current SELECT,
// the Aggregate/Window will live inside a derived subquery. In
// that case we use the passthrough projection path — column refs
// resolve against the derived subquery's output columns instead
// of being unprojected onto the original aggregate/window
// expressions.
let agg = if fully_absorbed {
find_agg_node_within_select(plan, true)
} else {
None
};
let window = if fully_absorbed {
find_window_nodes_within_select(plan, None, true)
} else {
None
};
match (agg, window) {
(Some(agg), window) => {
let window_option = window.as_deref();
let items = exprs
Expand All @@ -299,6 +328,7 @@ impl Unparser<'_> {
.collect::<Result<Vec<_>>>()?,
vec![],
));
Ok(true)
}
(None, Some(window)) => {
let items = exprs
Expand All @@ -310,6 +340,7 @@ impl Unparser<'_> {
.collect::<Result<Vec<_>>>()?;

select.projection(items);
Ok(false)
}
_ => {
let items = exprs
Expand All @@ -328,9 +359,9 @@ impl Unparser<'_> {
})
.collect::<Result<Vec<_>>>()?;
select.projection(items);
Ok(false)
}
}
Ok(())
}

fn derive(
Expand Down Expand Up @@ -438,7 +469,7 @@ impl Unparser<'_> {
}));

if !select.already_projected() {
self.reconstruct_select_statement(plan, p, select)?;
self.reconstruct_select_statement(plan, p, select, true)?;
}

if matches!(
Expand Down Expand Up @@ -680,8 +711,229 @@ impl Unparser<'_> {
if self.dialect.unnest_as_lateral_flatten() {
Self::collect_flatten_aliases(p.input.as_ref(), select);
}
self.reconstruct_select_statement(plan, p, select)?;
self.select_to_sql_recursively(p.input.as_ref(), query, select, relation)
// Walk down through consecutive Sort/Limit nodes, greedily
// absorbing what can be folded into the SELECT we're
// building around the Aggregate. A single SQL SELECT can
// carry at most one `ORDER BY` (applied before `LIMIT`),
// so the safe shape between us and the Aggregate is
// `Limit* Sort?` (outer→inner). We stop at the first node
// that would violate this; that node becomes the
// subquery boundary, and recursion (seeing
// `already_projected = true`) wraps it in a derived
// relation. If we walk all the way to a non-Sort/non-Limit
// terminator, the entire chain folds into one SELECT.
//
// Stacked Sorts with nothing between them collapse to the
// outermost — the same simplification `EnforceSorting`
// applies on the physical side — but only when no Limit
// has been absorbed since the previous Sort, since the
// inner Sort would otherwise be determining which rows
// the Limit keeps.
//
// The fold is collected here without touching `query`
// (apart from non-literal direct Limits, which don't
// depend on projection form). Once we know whether every
// Sort/Limit was absorbed we can pick the right
// projection form and emit `ORDER BY` with or without
// unprojection.
let mut cur = p.input.as_ref();
let mut absorbed_sort: Option<&Sort> = None;
let mut combined_skip: usize = 0;
let mut combined_fetch: Option<usize> = None;
let mut have_combined_limit = false;
let mut have_direct_limit = false;
let mut have_order_by = false;
loop {
match cur {
LogicalPlan::Limit(limit) => {
if have_order_by {
// Limit-below-Sort: `ORDER BY … LIMIT N`
// would apply the sort first, but the
// logical plan applies the Limit first.
break;
}
let skip_lit = limit.get_skip_type()?;
let fetch_lit = limit.get_fetch_type()?;
match (skip_lit, fetch_lit) {
(SkipType::Literal(s), FetchType::Literal(f)) => {
if have_direct_limit {
break;
}
if have_combined_limit {
// outer = already-accumulated;
// inner = this Limit. Same merge
// rule as the optimizer.
let (cs, cf) = combine_limit(
combined_skip,
combined_fetch,
s,
f,
);
combined_skip = cs;
combined_fetch = cf;
} else {
combined_skip = s;
combined_fetch = f;
have_combined_limit = true;
}
}
_ => {
if have_combined_limit || have_direct_limit {
// Cannot safely merge a
// non-literal Limit with a prior
// one; let recursion handle it.
break;
}
let Some(query_ref) = query.as_mut() else {
return internal_err!(
"Limit operator only valid in a statement context."
);
};
if let Some(fetch) = &limit.fetch {
query_ref.limit(Some(self.expr_to_sql(fetch)?));
}
if let Some(skip) = &limit.skip {
query_ref.offset(Some(ast::Offset {
rows: ast::OffsetRows::None,
value: self.expr_to_sql(skip)?,
}));
}
have_direct_limit = true;
}
}
cur = limit.input.as_ref();
}
LogicalPlan::Sort(sort) if sort.fetch.is_some() => {
// `Sort { fetch }` is logically
// `Limit(fetch) -> Sort`. Try to absorb the
// virtual Limit first; only if that succeeds
// do we absorb the Sort. Otherwise we'd
// silently drop the fetch.
let fetch = sort.fetch.expect("guarded above");
if have_order_by {
// The virtual Limit would sit below an
// already-absorbed outer Sort.
break;
}
if have_direct_limit {
// Cannot combine a literal fetch with a
// non-literal direct Limit; let the
// derived subquery preserve both.
break;
}
if have_combined_limit {
let (cs, cf) = combine_limit(
combined_skip,
combined_fetch,
0,
Some(fetch),
);
combined_skip = cs;
combined_fetch = cf;
} else {
combined_skip = 0;
combined_fetch = Some(fetch);
have_combined_limit = true;
}
// Now the Sort itself. We know
// `!have_order_by` from the check above.
absorbed_sort = Some(sort);
have_order_by = true;
cur = sort.input.as_ref();
}
LogicalPlan::Sort(sort) => {
// Sort without `fetch`.
if have_order_by {
// Outer Sort already absorbed; the inner
// Sort is reordered by it and is
// conventionally dropped, matching
// `EnforceSorting` on the physical side.
cur = sort.input.as_ref();
continue;
}
absorbed_sort = Some(sort);
have_order_by = true;
cur = sort.input.as_ref();
}
_ => break,
}
}

// `fully_absorbed` is the bottom-up algorithm's "walked
// all the way to the terminator without stopping": the
// Aggregate/Window will live in the same SELECT as this
// Projection, so we can unproject sort exprs and let
// `reconstruct_select_statement` claim it.
let fully_absorbed =
!matches!(cur, LogicalPlan::Limit(_) | LogicalPlan::Sort(_));
let found_agg =
self.reconstruct_select_statement(plan, p, select, fully_absorbed)?;

// Whether to bother emitting the absorbed clauses: only
// if there's an Aggregate either claimed in this SELECT
// or about to live in a derived subquery below us. If
// there's nothing aggregate-like to fold over, fall
// through and let the normal recursion handle the
// Projection's input.
let agg_below =
!fully_absorbed && find_agg_node_within_select(plan, true).is_some();
if !(found_agg || agg_below) {
return self.select_to_sql_recursively(
p.input.as_ref(),
query,
select,
relation,
);
}

if let Some(sort) = absorbed_sort {
let Some(query_ref) = query.as_mut() else {
return internal_err!(
"Sort operator only valid in a statement context."
);
};
let sort_exprs: Vec<SortExpr> = if fully_absorbed {
let agg =
find_agg_node_within_select(plan, select.already_projected());
sort.expr
.iter()
.map(|sort_expr| {
unproject_sort_expr(
sort_expr.clone(),
agg,
sort.input.as_ref(),
)
})
.collect::<Result<Vec<_>>>()?
} else {
sort.expr.clone()
};
query_ref.order_by(self.sorts_to_sql(&sort_exprs)?);
}
if have_combined_limit {
let Some(query_ref) = query.as_mut() else {
return internal_err!(
"Limit operator only valid in a statement context."
);
};
if let Some(fetch) = combined_fetch {
query_ref.limit(Some(ast::Expr::value(ast::Value::Number(
fetch.to_string(),
false,
))));
}
if combined_skip > 0 {
query_ref.offset(Some(ast::Offset {
rows: ast::OffsetRows::None,
value: ast::Expr::value(ast::Value::Number(
combined_skip.to_string(),
false,
)),
}));
}
}

self.select_to_sql_recursively(cur, query, select, relation)
}
LogicalPlan::Filter(filter) => {
let window = find_window_nodes_within_select(
Expand Down
Loading
Loading