-
Notifications
You must be signed in to change notification settings - Fork 2k
Description
Describe the bug
The EnforceDistribution optimizer rule documents itself as idempotent:
is idempotent; i.e. it can be applied multiple times, each time producing the same result.
However, running EnforceDistribution twice on the same plan can produce materially different results. This is problematic for use cases that run EnforceDistribution multiple times (e.g., before and after custom optimizer rules that introduce new distribution-changing operators).
Root causes
The fundamental issue is the "strip-and-rebuild" design: remove_dist_changing_operators strips all distribution-changing operators (CoalescePartitionsExec, SortPreservingMergeExec, RepartitionExec), then the rule re-inserts them as needed. However, several pieces of state are lost or changed during this cycle:
1. data flag changes between passes
DistributionContext.data tracks whether a subtree contains distribution-changing operators. After the first pass strips them, the second pass sees data=false, which changes the control flow in ensure_distribution:
// L1407-1428
let streaming_benefit = if child.data { // false on 2nd pass
preserving_order_enables_streaming(&plan, &child.plan)?
} else {
false
};
if (!ordering_satisfied || !order_preserving_variants_desirable)
&& !streaming_benefit
&& child.data // false on 2nd pass → skip replace_order_preserving_variants
{
child = replace_order_preserving_variants(child)?;
}2. replace_order_preserving_variants makes irreversible transformations
SortPreservingMergeExec→CoalescePartitionsExec(L1086-1091)RepartitionExec(preserve_order=true)→ regularRepartitionExec(L1092-1101)
On the second pass, these original operators are gone, so the function makes different decisions.
3. Hash partitioning decisions depend on current partition count
// L896-901
n_target > input.plan.output_partitioning().partition_count()After the first pass adds/removes RepartitionExec, the partition count changes, leading to different decisions on the second pass.
4. fetch values lost during strip-and-rebuild (fixed in #21170)
remove_dist_changing_operators strips operators that carry fetch (limit push-down). Without preserving the fetch, the plan changes between passes. This specific case is addressed in #21170.
To Reproduce
Pass 1 input:
AggregateExec (requires SinglePartition, maintains_input_order)
SortPreservingMergeExec
RepartitionExec(preserve_order=true, Hash)
DataSourceExec (sorted, 2 partitions)
Pass 1 output:
AggregateExec
SortPreservingMergeExec ← data=true in child context
RepartitionExec(preserve_order=true, Hash)
DataSourceExec
Pass 2 input = Pass 1 output, but now:
- remove_dist_changing_operators strips SPM and RepartitionExec
- data=false for the stripped subtree
- streaming_benefit evaluation differs
- replace_order_preserving_variants may or may not be called
→ Different plan structure
Expected behavior
EnforceDistribution should be truly idempotent: enforce(enforce(plan)) == enforce(plan) for all valid input plans. Either:
- Fix the rule to be idempotent, or
- Update the documentation to accurately reflect that it is NOT idempotent.
Additional context
- Related fix for
fetchloss: fix: EnforceDistribution optimizer preserves fetch (LIMIT) from distribution-changing operators #21170 - The
dataflag dependency is the most pervasive source of non-idempotency and likely the hardest to fix. - Use cases that run
EnforceDistributionmultiple times (e.g., custom optimizer rules that insert distribution-changing operators between passes) are affected by this.