Skip to content

Conversation

@tharvey5
Copy link

@tharvey5 tharvey5 commented Jun 25, 2025

What is the purpose of the change

Currently, Apache Flink's does not support storage partition join, which can lead to unnecessary data shuffles in batch mode. This PR implements a basic version to support that.

Brief change log

This pull request introduces a new optimizer configuration option table.optimizer.storage-partition-join-enabled and query planer changes to detect when both sides of a join are partitioned by the join keys and compatible, allowing it to apply a storage partition join strategy. This avoids unnecessary shuffles by leveraging the source's partitioning.

Key changes include:

  • Addition of the SupportsPartitioning interface for table sources to expose partitioning information.
  • Implementation of KeyGroupedPartitioning to represent partitioning schemes.
  • Integration of partitioning awareness in the batch physical sort-merge join rule to conditionally use the storage partition join when enabled and applicable.
  • [for Testing] Serialization and deserialization utilities for partitioning metadata.
  • [for Testing] Extension of the test values table factory to support partitioning.
  • Comprehensive unit and integration tests verifying the new join strategy and its configuration.

This enhancement is currently applicable only in batch mode and requires the source tables to be partitioned by the join keys.

Verifying this change

  • Added test util for serialization and deserialization of partitioning metadata, so we can create a test table with a KeyGroupPartition.
  • Added integration tests (TestStoragePartitionJoin) that verify the optimizer plan changes when the storage partition join is enabled or disabled.
  • Verified that existing tests pass and that the new join strategy is correctly applied only when the configuration is enabled and partitioning is compatible.
  • Manual verification of execution plans to confirm the absence of unnecessary shuffles when storage partition join is enabled.
  • Verified unit test in table-planner module result is the same as before the change:
    [ERROR] Tests run: 8671, Failures: 4, Errors: 0, Skipped: 1
    We tested that before the change test failures is also 4:

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): (yes / no)
  • The public API, i.e., is any changed class annotated with @Public(Evolving): (yes / no)
  • The serializers: (yes / no / don't know)
  • The runtime per-record code paths (performance sensitive): (yes / no / don't know)
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / no / don't know)
  • The S3 file system connector: (yes / no / don't know)

Documentation

  • Does this pull request introduce a new feature? (yes / no)
  • If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented)

qqibrow and others added 2 commits June 25, 2025 13:18
…ng support (apache#208)

Currently, Apache Flink's does not support storage partition join, which
can lead to unnecessary data shuffles in batch mode. This PR implements
a basic version to support that.

This pull request introduces a new optimizer configuration option
`table.optimizer.storage-partition-join-enabled` and query planer
changes to detect when both sides of a join are partitioned by the join
keys and compatible, allowing it to apply a storage partition join
strategy. This avoids unnecessary shuffles by leveraging the source's
partitioning.

Key changes include:
- Addition of the `SupportsPartitioning` interface for table sources to
expose partitioning information.
- Implementation of `KeyGroupedPartitioning` to represent partitioning
schemes.
- Integration of partitioning awareness in the batch physical sort-merge
join rule to conditionally use the storage partition join when enabled
and applicable.
- [for Testing] Serialization and deserialization utilities for
partitioning metadata.
- [for Testing] Extension of the test values table factory to support
partitioning.
- Comprehensive unit and integration tests verifying the new join
strategy and its configuration.

This enhancement is currently applicable only in batch mode and requires
the source tables to be partitioned by the join keys.

- Added test util for serialization and deserialization of partitioning
metadata, so we can create a test table with a KeyGroupPartition.
- Added integration tests (`TestStoragePartitionJoin`) that verify the
optimizer plan changes when the storage partition join is enabled or
disabled.
- Verified that existing tests pass and that the new join strategy is
correctly applied only when the configuration is enabled and
partitioning is compatible.
- Manual verification of execution plans to confirm the absence of
unnecessary shuffles when storage partition join is enabled.
- verified unit test in table-planner module result is the same as
before the change:
```
[ERROR] Tests run: 8671, Failures: 4, Errors: 0, Skipped: 1
```
It was tested that before the change test failures is also 4:

---------

Co-authored-by: Jeyhun Karimov <[email protected]>
Enable's storage partitioned joined to work e2e with a
DynamicTableSource such as Iceberg Table that Supports Partitioning

- calls applyPartitionedRead from Flink Planner to notify any table
source that implements `SupportsPartitioning` that SPJ can be used
- Improve `isPartitionedBy` check which now can account for filter
pushdown optimizations when determining whether SPJ can be applied

**Following queries DO utilize SPJ**
```
select count(*) from iceberg.db.simple_bucketed as table1 join iceberg.db.simple_bucketed_2 as table2 on table1.user_id = table2.user_id2
```

```
select count(*) from iceberg.db.user_id_two_partition_cols t1 join iceberg.db.user_id_two_partition_cols_2 t2 on t1.dt = t2.dt and t1.user_id = t2.user_id
```

```
select count(*) from iceberg.db.user_id_two_partition_cols t1 join iceberg.db.user_id_two_partition_cols_2 t2 on t1.dt = t2.dt and t1.user_id = t2.user_id  where t1.dt = '\''2025-05-01'\'' and t2.dt = '\''2025-05-01'\''
```

**The following Query DO NOT apply SPJ optimization**

```
select count(*) from iceberg.db.simple_bucketed as table1 join iceberg.db.simple_bucketed_32 as table2 on table1.user_id = table2.user_id2
```

Results all came back as expected as compared with Spark/Presto

  - Dependencies (does it add or upgrade a dependency): (yes / **no**)
- The public API, i.e., is any changed class annotated with
`@Public(Evolving)`: (yes / **no**)
  - The serializers: (yes / **no** / don't know)
- The runtime per-record code paths (performance sensitive): (yes /
**no** / don't know)
- Anything that affects deployment or recovery: JobManager (and its
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / **no** /
don't know)
  - The S3 file system connector: (yes / **no** / don't know)

  - Does this pull request introduce a new feature? (yes / **no**)
- If yes, how is the feature documented? (not applicable / docs /
JavaDocs / not documented)
@flinkbot
Copy link
Collaborator

flinkbot commented Jun 25, 2025

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

tharvey5 added a commit to tharvey5/iceberg that referenced this pull request Jun 25, 2025
Currently, Apache Flink's does not support storage partition join, which
can lead to unnecessary data shuffles in batch mode. We have implemented
the Query Planner changes in Flink already here apache/flink#26715

This feature **IS ONLY APPLIED via a config**
`table.optimizer.storage-partition-join-enabled=true` o/w there is no
impact to current jobs. This PR only supports batch execution mode.

This PR consists of relevant changes for the Flink Iceberg Source.
Please note that **these changes are ONLY included for FLIP27 (new
Source API)**.

This PR adds the following support.
- Enhances `IcebergTableSource` to implement `SupportsPartitioning`
interface which we defined on the [flink
side](apache/flink#26715) which enables Iceberg
to report Partitioning metadata to the Flink Query planner. Done via
`outputPartitioning()` returning `KeyGroupedPartitioning` with table’s
partition scheme. It can support various transform types including
bucket, identity, month, day, year.
- Improvements to `IcebergSource` to support StoragePartitionJoin
- Enhances `FlinkSplitPlanner` to include a method to group ScanTasks by
groupingKey (Partition Values) which enables us to ensure that all
records within the same partition end up being processed by the same
subtask.
- PartitionAwareSplitAssignment capabilities including a new
`PartitionAwareSplitAssignerFactory` and `PartitionAwareSplitAssigner`
which is responsible for ensuring that records with the same partition
are assigned to the same subtask via deterministic assignment
- Includes a new`SpecTransformToFlinkTransform` to map the various
TransformExpressions used to represent the partitions to the Flink
System

* Added Unit tests to `TestPartitionAwareSplitAssigner` to verify that
splits were deterministically applied to the correct subtasks
* Added Unit Tests `TestFlinkSplitPlanner` to test improved
functionality to get batchSplits based on `ScanGroup`
* Added Unit test `TestStoragePartitionedJoin` to verify that we
correctly ensure that we get the correct metadata
@tharvey5 tharvey5 changed the title Storage partition join 1.18 [Improvement] Storage Partition Join Jun 25, 2025
@davidradl
Copy link
Contributor

Hi @tharvey5 ,
I know this is a draft, so this may already be in hand.

  • I notice the change is against Flink 1.18. We should do new changes against master then back port. Currently the back ports are to v2 1.20 and 1.19.
  • see our process https://flink.apache.org/how-to-contribute/contribute-code/ where it refers to getting consensus on the dev list , raising a Jira . The Jira number should be in the PR title for all changes apart from hotfixes.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants