Skip to content

[fix](be) Backport agg shuffle and runtime filter fixes to 4.1#64156

Closed
BiteTheDDDDt wants to merge 3 commits into
apache:branch-4.1from
BiteTheDDDDt:codex/pick-63529-63766-64102-branch-4.1
Closed

[fix](be) Backport agg shuffle and runtime filter fixes to 4.1#64156
BiteTheDDDDt wants to merge 3 commits into
apache:branch-4.1from
BiteTheDDDDt:codex/pick-63529-63766-64102-branch-4.1

Conversation

@BiteTheDDDDt
Copy link
Copy Markdown
Contributor

What problem does this PR solve?

Issue Number: N/A

Related PR: #63529, #63766, #64102

Problem Summary:

Backport the following merged fixes to branch-4.1:

The BE aggregation backports are applicable to branch-4.1 because the branch still has the affected aggregation/local-exchange distribution paths and was missing these fixes. The runtime filter fix is also absent from branch-4.1 and was adapted to the branch-4.1 RuntimeFilterPushDownVisitor structure.

Release note

None

Check List (For Author)

  • Test:
    • Unit Test: build-support/clang-format.sh
    • Unit Test: build-support/check-format.sh
    • Unit Test: ./run-be-ut.sh --run --filter=AggOperatorRequiredDistributionTest.*:StreamingAggOperatorTest.require_hash_shuffle_after_non_hash_local_exchange:DistinctStreamingAggOperatorTest.require_hash_shuffle_after_non_hash_local_exchange:AggOperatorGroupByLimitOptTestWithGroupBy.test_2_phase_without_order_by
    • Unit Test: ./run-fe-ut.sh --run org.apache.doris.nereids.postprocess.RuntimeFilterTest#testDoNotPushDownNonNullPropagatingRuntimeFilterThroughOuterJoin,org.apache.doris.nereids.postprocess.RuntimeFilterTest#testPushDownNullPropagatingRuntimeFilterThroughOuterJoin
  • Behavior changed: No
  • Does this need documentation: No

Problem Summary: PR apache#62438 added `enable_local_exchange_before_agg` to
allow skipping local exchange before non-finalizing aggregation. That
optimization used `!_needs_finalize` as the condition, but
non-finalizing aggregation includes both first-phase update/serialize
aggregation and merge/serialize aggregation.

When `experimental_use_serial_exchange` is enabled and
`enable_local_exchange_before_agg` is disabled, a serial exchange source
can be followed by the default `PASSTHROUGH` local exchange before a
non-finalizing merge aggregation. For DISTINCT aggregation, the merge
aggregation is the stage that deduplicates distinct keys after hash
exchange. `PASSTHROUGH` only restores local parallelism and does not
preserve key distribution, so duplicate distinct keys can be processed
by different local tasks and later partial sums can produce incorrect
results.

This PR keeps the knob behavior for aggregation stages that can safely
skip the local hash exchange, but excludes merge aggregation with a
serial child from the skip path. That case falls through to the existing
`HASH_SHUFFLE` / `BUCKET_HASH_SHUFFLE` distribution requirement. The PR
also computes the merge flag during `AggSinkOperatorX::init()` because
local exchange planning runs before `prepare()`.

Fix occasional incorrect DISTINCT aggregate results when serial exchange
is enabled.

(cherry picked from commit b95038e)
…3766)

Related PR: apache#63529, apache#62438

Problem Summary: `enable_local_exchange_before_agg=false` allows
first-phase aggregation to skip the local hash exchange before agg for
performance. This is only correct when the input still preserves local
key distribution.

After apache#62438, nested loop join and other operators may introduce
non-hash local exchanges such as `ADAPTIVE_PASSTHROUGH`. Those exchanges
can split rows with the same group/distinct key across local pipeline
tasks. If agg still skips the hash local exchange, partial aggregation
states for the same key are built in different tasks and later
`COUNT(DISTINCT ...)` can over-count. The reproduced query in
`output/ddl.txt` returned wrong counts such as `18/20` instead of `10`.

This PR preserves correctness while keeping the knob usable:

- Aggregation operators now skip local exchange with
`enable_local_exchange_before_agg=false` only when the child preserves
local key distribution.
- The shared child-distribution check is reused by `AggSinkOperatorX`,
`StreamingAggOperatorX`, and `DistinctStreamingAggOperatorX`.
- `Pipeline::need_to_local_exchange()` also handles the case where the
current pipeline source is a non-hash `LocalExchangeSource` but the
downstream target requires hash distribution, so inherited hash-ish
pipeline state cannot incorrectly suppress the required local exchange.
- Regression coverage is added for the nested-loop-join + distinct
aggregation wrong-result case, and unit tests cover the agg distribution
decisions.

(cherry picked from commit d232caa)
…pache#64102)

related with: apache#57425

Runtime filters from a parent inner join could be pushed through an
outer join into the null-generating child even when the probe expression
was not null-propagating for that child.

The problem can be reproduced with this SQL shape:

```sql
create table rf_outer_join_nullable_a (pk int)
duplicate key(pk)
distributed by hash(pk) buckets 1
properties("replication_num" = "1");

create table rf_outer_join_nullable_b (pk int)
duplicate key(pk)
distributed by hash(pk) buckets 1
properties("replication_num" = "1");

create table rf_outer_join_nullable_c (pk int)
duplicate key(pk)
distributed by hash(pk) buckets 1
properties("replication_num" = "1");

insert into rf_outer_join_nullable_a values (1);
insert into rf_outer_join_nullable_b values (1);
insert into rf_outer_join_nullable_c values (0);

set disable_join_reorder = true;

select coalesce(b.pk, 0) as k, count(*) as cnt
from rf_outer_join_nullable_a a
left join rf_outer_join_nullable_b b on a.pk = b.pk
inner join rf_outer_join_nullable_c c on coalesce(b.pk, 0) = c.pk
group by 1
order by 1;
```

The correct result is empty. `a.pk = 1` matches `b.pk = 1` in the left
outer join, then the parent inner join evaluates `coalesce(1, 0) = 0`,
which is false.

The wrong plan generated a runtime filter from the parent inner join,
effectively `c.pk -> coalesce(b.pk, 0)`, and pushed it through the lower
`LEFT OUTER JOIN` into the right side scan of `b`. If `b.pk = 1` is
pre-filtered before the left outer join, the join emits a NULL-extended
row for `b`; then `coalesce(NULL, 0) = 0` becomes true and incorrectly
returns `(0, 1)`.

Therefore the runtime filter `c.pk -> coalesce(b.pk, 0)` must not be
planned on the null-generating side of the lower outer join. This PR
blocks runtime filter pushdown through an outer join's null-generating
child unless the probe expression preserves NULL semantics for slots
from that child. Normal pushdown through preserved sides and
null-propagating expressions is kept unchanged.

The bug became observable after apache#57425 changed the target lookup for
expression runtime filters from `ctx.probeExpr` to `ctx.probeSlot`.
Before that change, an expression such as `coalesce(b.pk, 0)` could not
resolve the target relation in this path and the unsafe pushdown was not
generated.

None

- Test <!-- At least one of them must be included. -->
    - [x] Regression test
    - [x] Unit Test
    - [ ] Manual test (add detailed scripts or steps below)
    - [ ] No need to test or manual test. Explain why:
- [ ] This is a refactor/code format and no logic has been changed.
        - [ ] Previous test can cover this change.
        - [ ] No code files have been changed.
        - [ ] Other reason <!-- Add your reason?  -->

Added regression case with `disable_join_reorder`, `qt_shape`, and empty
result verification:

`regression-test/suites/correctness_p0/test_runtime_filter_outer_join_nullable_side.groovy`

Unit test:

`./run-fe-ut.sh --run
org.apache.doris.nereids.postprocess.RuntimeFilterTest#testDoNotPushDownNonNullPropagatingRuntimeFilterThroughOuterJoin,org.apache.doris.nereids.postprocess.RuntimeFilterTest#testPushDownNullPropagatingRuntimeFilterThroughOuterJoin`

The SQL regression case was not run locally against the available 9333
cluster because that cluster was the unpatched repro cluster.

- Behavior changed:
    - [ ] No.
- [x] Yes. Runtime filters are no longer pushed through an outer join
into its null-generating child when the probe expression can convert
NULL to a non-NULL value.

- Does this need documentation?
    - [x] No.
- [ ] Yes. <!-- Add document PR link here. eg:
apache/doris-website#1214 -->

- [ ] Confirm the release note
- [ ] Confirm test cases
- [ ] Confirm document
- [ ] Add branch pick label <!-- Add branch pick label that this PR
should merge into -->

(cherry picked from commit 691189a)
Copilot AI review requested due to automatic review settings June 5, 2026 11:49
@BiteTheDDDDt BiteTheDDDDt requested a review from yiguolei as a code owner June 5, 2026 11:49
@hello-stephen
Copy link
Copy Markdown
Contributor

Thank you for your contribution to Apache Doris.
Don't know what should be done next? See How to process your PR.

Please clearly describe your PR:

  1. What problem was fixed (it's best to include specific error reporting information). How it was fixed.
  2. Which behaviors were modified. What was the previous behavior, what is it now, why was it modified, and what possible impacts might there be.
  3. What features were added. Why was this function added?
  4. Which code was refactored and why was this part of the code refactored?
  5. Which functions were optimized and what is the difference before and after the optimization?

@BiteTheDDDDt
Copy link
Copy Markdown
Contributor Author

run buildall

Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Backports correctness fixes to the 4.1 branch for (1) aggregation/local-exchange distribution handling (to avoid wrong DISTINCT/group-by results when local key distribution is broken) and (2) runtime filter pushdown across outer joins (to avoid filtering the null-generating side when probe expressions are not NULL-propagating).

Changes:

  • BE: Refine aggregation operators’ required distribution logic to preserve hash shuffle when upstream local exchanges/serial sources can break local key distribution, plus pipeline handling for non-hash local exchange sources.
  • FE: Block runtime filter pushdown into the null-generating side of outer joins unless the probe expression is NULL-propagating for that side.
  • Tests: Add/extend BE unit tests, FE unit tests, and regression suites + expected outputs to cover the above scenarios.

Reviewed changes

Copilot reviewed 21 out of 21 changed files in this pull request and generated 2 comments.

Show a summary per file
File Description
regression-test/suites/query_p0/join/test_agg_after_nested_loop_join_local_exchange.groovy New regression to catch wrong DISTINCT agg results after NLJ-introduced non-hash local exchange.
regression-test/data/query_p0/join/test_agg_after_nested_loop_join_local_exchange.out Expected output for the new NLJ + agg regression.
regression-test/suites/nereids_syntax_p0/agg_4_phase.groovy Extends existing suite with serial-exchange distinct aggregation coverage and config toggles.
regression-test/data/nereids_syntax_p0/agg_4_phase.out Expected output update for the added serial-exchange distinct agg query.
regression-test/suites/correctness_p0/test_runtime_filter_outer_join_nullable_side.groovy New regression to ensure unsafe RF pushdown into outer-join nullable side is prevented.
regression-test/data/correctness_p0/test_runtime_filter_outer_join_nullable_side.out Expected shape/result output for the new runtime filter regression.
fe/fe-core/src/test/java/org/apache/doris/nereids/postprocess/RuntimeFilterTest.java Adds unit tests asserting RF behavior for NULL-propagating vs non-propagating probe expressions.
fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterPushDownVisitor.java Implements the outer-join nullable-side RF pushdown guard based on NULL propagation.
be/src/runtime/runtime_state.h Adds an accessor for enable_local_exchange_before_agg.
be/src/exec/pipeline/pipeline.cpp Ensures downstream hash requirements aren’t suppressed after a non-hash LocalExchangeSource.
be/src/exec/operator/operator.h Adds helper APIs to detect when a child breaks local key distribution.
be/src/exec/operator/operator.cpp Implements the child local-key-distribution break detection logic.
be/src/exec/operator/streaming_aggregation_operator.h Uses new child-distribution check to decide when skipping local exchange is safe.
be/src/exec/operator/distinct_streaming_aggregation_operator.h Same distribution-safety logic for distinct streaming aggregation.
be/src/exec/operator/aggregation_sink_operator.h Same distribution-safety logic for aggregation sink operator required distribution.
be/src/exec/operator/aggregation_sink_operator.cpp Computes _is_merge earlier (during init) to support distribution planning before prepare.
be/src/exec/exchange/local_exchange_source_operator.h Exposes local exchange type via required distribution + accessor to aid distribution checks.
be/test/exec/operator/streaming_agg_operator_test.cpp Adds unit test covering hash-shuffle requirement after non-hash local exchange.
be/test/exec/operator/distinct_streaming_aggregation_operator_test.cpp Adds unit test covering hash-shuffle requirement after non-hash local exchange.
be/test/exec/operator/agg_operator_test.cpp Adds unit tests for agg sink required distribution + pipeline local exchange decision.
be/test/exec/operator/agg_operator_group_by_limit_opt_test.cpp Updates mock agg sink evaluator init to compute _is_merge (test correctness).

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines +589 to +592
bool enable_local_exchange_before_agg() const {
return _query_options.__isset.enable_local_exchange_before_agg &&
_query_options.enable_local_exchange_before_agg;
}
Comment on lines 752 to 756
tnode.agg_node.__isset.agg_sort_infos ? tnode.agg_node.agg_sort_infos[i] : dummy,
tnode.agg_node.grouping_exprs.empty(), false, &evaluator));
_aggregate_evaluators.push_back(evaluator);
_is_merge |= evaluator->is_merge();
}
@hello-stephen
Copy link
Copy Markdown
Contributor

FE Regression Coverage Report

Increment line coverage 33.33% (9/27) 🎉
Increment coverage report
Complete coverage report

@BiteTheDDDDt
Copy link
Copy Markdown
Contributor Author

Replaced by #64162, which backports only #64102 to branch-4.1. #63529/#63766 are not needed for branch-4.1.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants