Skip to content

Bug: EnforceDistribution is not idempotent despite documentation claim #21174

@zhuqi-lucas

Description

@zhuqi-lucas

Describe the bug

The EnforceDistribution optimizer rule documents itself as idempotent:

is idempotent; i.e. it can be applied multiple times, each time producing the same result.

However, running EnforceDistribution twice on the same plan can produce materially different results. This is problematic for use cases that run EnforceDistribution multiple times (e.g., before and after custom optimizer rules that introduce new distribution-changing operators).

Root causes

The fundamental issue is the "strip-and-rebuild" design: remove_dist_changing_operators strips all distribution-changing operators (CoalescePartitionsExec, SortPreservingMergeExec, RepartitionExec), then the rule re-inserts them as needed. However, several pieces of state are lost or changed during this cycle:

1. data flag changes between passes

DistributionContext.data tracks whether a subtree contains distribution-changing operators. After the first pass strips them, the second pass sees data=false, which changes the control flow in ensure_distribution:

// L1407-1428
let streaming_benefit = if child.data {  // false on 2nd pass
    preserving_order_enables_streaming(&plan, &child.plan)?
} else {
    false
};

if (!ordering_satisfied || !order_preserving_variants_desirable)
    && !streaming_benefit
    && child.data  // false on 2nd pass → skip replace_order_preserving_variants
{
    child = replace_order_preserving_variants(child)?;
}

2. replace_order_preserving_variants makes irreversible transformations

  • SortPreservingMergeExecCoalescePartitionsExec (L1086-1091)
  • RepartitionExec(preserve_order=true) → regular RepartitionExec (L1092-1101)

On the second pass, these original operators are gone, so the function makes different decisions.

3. Hash partitioning decisions depend on current partition count

// L896-901
n_target > input.plan.output_partitioning().partition_count()

After the first pass adds/removes RepartitionExec, the partition count changes, leading to different decisions on the second pass.

4. fetch values lost during strip-and-rebuild (fixed in #21170)

remove_dist_changing_operators strips operators that carry fetch (limit push-down). Without preserving the fetch, the plan changes between passes. This specific case is addressed in #21170.

To Reproduce

Pass 1 input:
  AggregateExec (requires SinglePartition, maintains_input_order)
    SortPreservingMergeExec
      RepartitionExec(preserve_order=true, Hash)
        DataSourceExec (sorted, 2 partitions)

Pass 1 output:
  AggregateExec
    SortPreservingMergeExec       ← data=true in child context
      RepartitionExec(preserve_order=true, Hash)
        DataSourceExec

Pass 2 input = Pass 1 output, but now:
  - remove_dist_changing_operators strips SPM and RepartitionExec
  - data=false for the stripped subtree
  - streaming_benefit evaluation differs
  - replace_order_preserving_variants may or may not be called
  → Different plan structure

Expected behavior

EnforceDistribution should be truly idempotent: enforce(enforce(plan)) == enforce(plan) for all valid input plans. Either:

  1. Fix the rule to be idempotent, or
  2. Update the documentation to accurately reflect that it is NOT idempotent.

Additional context

Metadata

Metadata

Assignees

Labels

No labels
No labels

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions