From 8afd60f64e78c68f306235ab53090f3e12bba356 Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Fri, 5 Jun 2026 14:21:57 +0800 Subject: [PATCH 1/3] feat(optimizer): enhance scalar subquery handling with ProjectionRewriteState - Added ProjectionRewriteState for improved management - Updated projection branch to utilize slot-owned rewrite state - Improved alias ownership derivation based on states - Preserved the join order, compensation rewrite, and output aliases --- .../optimizer/src/scalar_subquery_to_join.rs | 46 +++++++++++++------ .../sqllogictest/test_files/subquery.slt | 46 +++++++++++++++++++ 2 files changed, 79 insertions(+), 13 deletions(-) diff --git a/datafusion/optimizer/src/scalar_subquery_to_join.rs b/datafusion/optimizer/src/scalar_subquery_to_join.rs index 44011a125ba96..e4f0040234477 100644 --- a/datafusion/optimizer/src/scalar_subquery_to_join.rs +++ b/datafusion/optimizer/src/scalar_subquery_to_join.rs @@ -169,25 +169,38 @@ impl OptimizerRule for ScalarSubqueryToJoin { } let mut all_subqueries = vec![]; - let mut alias_to_index: HashMap = HashMap::new(); - let mut rewrite_exprs: Vec = - Vec::with_capacity(projection.expr.len()); - for (idx, expr) in projection.expr.iter().enumerate() { - let (subqueries, rewrite_expr) = self.extract_subquery_exprs( + let mut rewrite_states = Vec::with_capacity(projection.expr.len()); + for expr in &projection.expr { + let (subqueries, rewritten_expr) = self.extract_subquery_exprs( expr, config.alias_generator(), physical_uncorrelated, )?; - for (_, alias) in &subqueries { - alias_to_index.insert(alias.clone(), idx); - } + let subquery_aliases = + subqueries.iter().map(|(_, alias)| alias.clone()).collect(); all_subqueries.extend(subqueries); - rewrite_exprs.push(rewrite_expr); + rewrite_states.push(ProjectionRewriteState { + rewritten_expr, + subquery_aliases, + }); } assert_or_internal_err!( !all_subqueries.is_empty(), "Expected subqueries not found in projection" ); + + let alias_to_state_index: HashMap = rewrite_states + .iter() + .enumerate() + .flat_map(|(idx, state)| { + state + .subquery_aliases + .iter() + .cloned() + .map(move |alias| (alias, idx)) + }) + .collect(); + // iterate through all subqueries in predicate, turning each into a left join let mut cur_input = projection.input.as_ref().clone(); for (subquery, alias) in all_subqueries { @@ -196,9 +209,10 @@ impl OptimizerRule for ScalarSubqueryToJoin { { cur_input = optimized_subquery; if !compensation_exprs.is_empty() - && let Some(&idx) = alias_to_index.get(&alias) + && let Some(&idx) = alias_to_state_index.get(&alias) { - let new_expr = rewrite_exprs[idx] + let new_expr = rewrite_states[idx] + .rewritten_expr .clone() .transform_up(|expr| { if let Some(compensation_expr) = expr @@ -211,7 +225,7 @@ impl OptimizerRule for ScalarSubqueryToJoin { } }) .data()?; - rewrite_exprs[idx] = new_expr; + rewrite_states[idx].rewritten_expr = new_expr; } } else { // if we can't handle all of the subqueries then bail for now @@ -220,8 +234,9 @@ impl OptimizerRule for ScalarSubqueryToJoin { } let mut proj_exprs = vec![]; - for (expr, new_expr) in projection.expr.iter().zip(rewrite_exprs) { + for (expr, state) in projection.expr.iter().zip(rewrite_states) { let old_expr_name = expr.schema_name().to_string(); + let new_expr = state.rewritten_expr; let new_expr_name = new_expr.schema_name().to_string(); if new_expr_name != old_expr_name { proj_exprs.push(new_expr.alias(old_expr_name)) @@ -266,6 +281,11 @@ fn contains_scalar_subquery_to_rewrite(expr: &Expr, physical_uncorrelated: bool) .expect("Inner is always Ok") } +struct ProjectionRewriteState { + rewritten_expr: Expr, + subquery_aliases: Vec, +} + struct ExtractScalarSubQuery<'a> { sub_query_info: Vec<(Subquery, String)>, alias_gen: &'a Arc, diff --git a/datafusion/sqllogictest/test_files/subquery.slt b/datafusion/sqllogictest/test_files/subquery.slt index 3d6f8027454c7..3b7e0fc152695 100644 --- a/datafusion/sqllogictest/test_files/subquery.slt +++ b/datafusion/sqllogictest/test_files/subquery.slt @@ -867,6 +867,52 @@ FROM t1 33 4 5 44 1 2 +#correlated_scalar_subquery_multiple_projection_slots +# Distinct projection slots must each own their scalar subquery rewrite. +# COUNT gets empty-input compensation; SUM preserves NULL on no match. +query III rowsort +SELECT + t1_id, + (SELECT count(*) FROM t2 WHERE t2.t2_int = t1.t1_int) AS cnt, + (SELECT sum(t2_int) FROM t2 WHERE t2.t2_id = t1.t1_id) AS total +FROM t1 +---- +11 1 3 +22 0 1 +33 3 NULL +44 0 3 + +#correlated_scalar_subquery_multiple_subqueries_one_projection_slot +# Multiple COUNT subqueries in a single projection expression must all be +# compensated before the expression is evaluated. +query II rowsort +SELECT + t1_id, + (SELECT count(*) FROM t2 WHERE t2.t2_int = t1.t1_int) + + (SELECT count(*) FROM t2 WHERE t2.t2_id = t1.t1_id) AS combined +FROM t1 +---- +11 2 +22 1 +33 3 +44 1 + +#correlated_scalar_subquery_mixed_repeated_and_non_count_projection_slots +# Repeated COUNT slots must each be compensated while the SUM slot keeps NULL +# semantics for unmatched outer rows. +query IIII rowsort +SELECT + t1_id, + (SELECT count(*) FROM t2 WHERE t2.t2_int = t1.t1_int) + 1 AS a, + (SELECT count(*) FROM t2 WHERE t2.t2_int = t1.t1_int) + 2 AS b, + (SELECT sum(t2_int) FROM t2 WHERE t2.t2_int = t1.t1_int) AS c +FROM t1 +---- +11 2 3 1 +22 1 2 NULL +33 4 5 9 +44 1 2 NULL + #correlated_scalar_subquery_count_agg2 query TT explain SELECT t1_id, (SELECT count(*) FROM t2 WHERE t2.t2_int = t1.t1_int) as cnt from t1 From ea4e06a12c9d4d15f3da39eda94c3cfa76fdeb96 Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Fri, 5 Jun 2026 14:28:41 +0800 Subject: [PATCH 2/3] feat: refactor expressions and improve projection handling - Added private `apply_compensation_exprs` helper. - Reused helper in filter and projection branches. - Built alias ownership during extraction. - Renamed `all_subqueries` to `subqueries_to_join`. - Fixed stale projection comment. - Converted final projection expression build to use `map(...).collect()`. --- .../optimizer/src/scalar_subquery_to_join.rs | 114 +++++++++--------- 1 file changed, 58 insertions(+), 56 deletions(-) diff --git a/datafusion/optimizer/src/scalar_subquery_to_join.rs b/datafusion/optimizer/src/scalar_subquery_to_join.rs index e4f0040234477..9af120fd49751 100644 --- a/datafusion/optimizer/src/scalar_subquery_to_join.rs +++ b/datafusion/optimizer/src/scalar_subquery_to_join.rs @@ -126,18 +126,10 @@ impl OptimizerRule for ScalarSubqueryToJoin { build_join(&subquery, &cur_input, &alias)? { if !compensation_exprs.is_empty() { - rewrite_expr = rewrite_expr - .transform_up(|expr| { - if let Some(compensation_expr) = expr - .try_as_col() - .and_then(|col| compensation_exprs.get(col)) - { - Ok(Transformed::yes(compensation_expr.clone())) - } else { - Ok(Transformed::no(expr)) - } - }) - .data()?; + rewrite_expr = apply_compensation_exprs( + rewrite_expr, + &compensation_exprs, + )?; } cur_input = optimized_subquery; } else { @@ -168,42 +160,41 @@ impl OptimizerRule for ScalarSubqueryToJoin { return Ok(Transformed::no(LogicalPlan::Projection(projection))); } - let mut all_subqueries = vec![]; + let mut subqueries_to_join = vec![]; + let mut alias_to_state_index = HashMap::new(); let mut rewrite_states = Vec::with_capacity(projection.expr.len()); for expr in &projection.expr { + let state_idx = rewrite_states.len(); let (subqueries, rewritten_expr) = self.extract_subquery_exprs( expr, config.alias_generator(), physical_uncorrelated, )?; - let subquery_aliases = - subqueries.iter().map(|(_, alias)| alias.clone()).collect(); - all_subqueries.extend(subqueries); + let subquery_aliases = subqueries + .iter() + .map(|(_, alias)| alias.clone()) + .collect::>(); + subqueries_to_join.extend(subqueries); rewrite_states.push(ProjectionRewriteState { rewritten_expr, subquery_aliases, }); + alias_to_state_index.extend( + rewrite_states[state_idx] + .subquery_aliases + .iter() + .cloned() + .map(|alias| (alias, state_idx)), + ); } assert_or_internal_err!( - !all_subqueries.is_empty(), + !subqueries_to_join.is_empty(), "Expected subqueries not found in projection" ); - let alias_to_state_index: HashMap = rewrite_states - .iter() - .enumerate() - .flat_map(|(idx, state)| { - state - .subquery_aliases - .iter() - .cloned() - .map(move |alias| (alias, idx)) - }) - .collect(); - - // iterate through all subqueries in predicate, turning each into a left join + // Iterate through projection subqueries, turning each into a left join. let mut cur_input = projection.input.as_ref().clone(); - for (subquery, alias) in all_subqueries { + for (subquery, alias) in subqueries_to_join { if let Some((optimized_subquery, compensation_exprs)) = build_join(&subquery, &cur_input, &alias)? { @@ -211,20 +202,10 @@ impl OptimizerRule for ScalarSubqueryToJoin { if !compensation_exprs.is_empty() && let Some(&idx) = alias_to_state_index.get(&alias) { - let new_expr = rewrite_states[idx] - .rewritten_expr - .clone() - .transform_up(|expr| { - if let Some(compensation_expr) = expr - .try_as_col() - .and_then(|col| compensation_exprs.get(col)) - { - Ok(Transformed::yes(compensation_expr.clone())) - } else { - Ok(Transformed::no(expr)) - } - }) - .data()?; + let new_expr = apply_compensation_exprs( + rewrite_states[idx].rewritten_expr.clone(), + &compensation_exprs, + )?; rewrite_states[idx].rewritten_expr = new_expr; } } else { @@ -233,17 +214,21 @@ impl OptimizerRule for ScalarSubqueryToJoin { } } - let mut proj_exprs = vec![]; - for (expr, state) in projection.expr.iter().zip(rewrite_states) { - let old_expr_name = expr.schema_name().to_string(); - let new_expr = state.rewritten_expr; - let new_expr_name = new_expr.schema_name().to_string(); - if new_expr_name != old_expr_name { - proj_exprs.push(new_expr.alias(old_expr_name)) - } else { - proj_exprs.push(new_expr); - } - } + let proj_exprs = projection + .expr + .iter() + .zip(rewrite_states) + .map(|(expr, state)| { + let old_expr_name = expr.schema_name().to_string(); + let new_expr = state.rewritten_expr; + let new_expr_name = new_expr.schema_name().to_string(); + if new_expr_name != old_expr_name { + new_expr.alias(old_expr_name) + } else { + new_expr + } + }) + .collect::>(); let new_plan = LogicalPlanBuilder::from(cur_input) .project(proj_exprs)? .build()?; @@ -286,6 +271,23 @@ struct ProjectionRewriteState { subquery_aliases: Vec, } +fn apply_compensation_exprs( + expr: Expr, + compensation_exprs: &HashMap, +) -> Result { + expr.transform_up(|expr| { + if let Some(compensation_expr) = expr + .try_as_col() + .and_then(|col| compensation_exprs.get(col)) + { + Ok(Transformed::yes(compensation_expr.clone())) + } else { + Ok(Transformed::no(expr)) + } + }) + .data() +} + struct ExtractScalarSubQuery<'a> { sub_query_info: Vec<(Subquery, String)>, alias_gen: &'a Arc, From 5f9351d190390c7ff4d06149c9507c23f05d0ed7 Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Fri, 5 Jun 2026 14:32:45 +0800 Subject: [PATCH 3/3] feat: improve ProjectionRewriteState handling in scalar subquery optimization - Built ProjectionRewriteState as a local variable - Enhanced reading of aliases from state - Implemented pushing state after map extension - Avoided immediate indexing into rewrite_states to improve performance --- datafusion/optimizer/src/scalar_subquery_to_join.rs | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/datafusion/optimizer/src/scalar_subquery_to_join.rs b/datafusion/optimizer/src/scalar_subquery_to_join.rs index 9af120fd49751..26fb7dc555f75 100644 --- a/datafusion/optimizer/src/scalar_subquery_to_join.rs +++ b/datafusion/optimizer/src/scalar_subquery_to_join.rs @@ -175,17 +175,18 @@ impl OptimizerRule for ScalarSubqueryToJoin { .map(|(_, alias)| alias.clone()) .collect::>(); subqueries_to_join.extend(subqueries); - rewrite_states.push(ProjectionRewriteState { + let state = ProjectionRewriteState { rewritten_expr, subquery_aliases, - }); + }; alias_to_state_index.extend( - rewrite_states[state_idx] + state .subquery_aliases .iter() .cloned() .map(|alias| (alias, state_idx)), ); + rewrite_states.push(state); } assert_or_internal_err!( !subqueries_to_join.is_empty(),