diff --git a/flink/v1.18/flink/src/main/java/org/apache/flink/table/connector/source/abilities/SupportsPartitioning.java b/flink/v1.18/flink/src/main/java/org/apache/flink/table/connector/source/abilities/SupportsPartitioning.java
new file mode 100644
index 000000000000..2dd17ab02ebd
--- /dev/null
+++ b/flink/v1.18/flink/src/main/java/org/apache/flink/table/connector/source/abilities/SupportsPartitioning.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flink.table.connector.source.abilities;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.connector.source.ScanTableSource;
+import org.apache.flink.table.connector.source.partitioning.Partitioning;
+
+/**
+ * Enables {@link ScanTableSource} to discover source partitions and inform the optimizer
+ * accordingly.
+ *
+ *
Partitions split the data stored in an external system into smaller portions that are
+ * identified by one or more string-based partition keys.
+ *
+ *
For example, data can be partitioned by region and within a region partitioned by month. The
+ * order of the partition keys (in the example: first by region then by month) is defined by the
+ * catalog table. A list of partitions could be:
+ *
+ *
+ * List(
+ * ['region'='europe', 'month'='2020-01'],
+ * ['region'='europe', 'month'='2020-02'],
+ * ['region'='asia', 'month'='2020-01'],
+ * ['region'='asia', 'month'='2020-02']
+ * )
+ *
+ *
+ * In the above case (data is partitioned w.r.t. region and month) the optimizer might utilize
+ * this pre-partitioned data source to eliminate possible shuffle operation. For example, for a
+ * query SELECT region, month, AVG(age) from MyTable GROUP BY region, month the optimizer takes the
+ * advantage of pre-partitioned source and avoids partitioning the data w.r.t. [region,month]
+ */
+// TODO -- remove this once this function can be imported from flink libraries
+@PublicEvolving
+public interface SupportsPartitioning {
+
+ /** Returns the output data partitioning that this reader guarantees. */
+ Partitioning outputPartitioning();
+
+ /** Applies partitioned reading to the source operator. */
+ void applyPartitionedRead();
+}
diff --git a/flink/v1.18/flink/src/main/java/org/apache/flink/table/connector/source/partitioning/KeyGroupedPartitioning.java b/flink/v1.18/flink/src/main/java/org/apache/flink/table/connector/source/partitioning/KeyGroupedPartitioning.java
new file mode 100644
index 000000000000..fd44934cca0c
--- /dev/null
+++ b/flink/v1.18/flink/src/main/java/org/apache/flink/table/connector/source/partitioning/KeyGroupedPartitioning.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flink.table.connector.source.partitioning;
+
+import org.apache.flink.table.expressions.TransformExpression;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+
+/**
+ * TODO Consider relaxing this constraint in a future version Preconditions: 1. keys are ordered by
+ * the partition columns defined in the table schema. 2. the partition values are ordered by the
+ * values in Row, comparing the values from 1st to last. for example: if a table is partitioned by
+ * (dt, bucket(128, user_id)) then the partition keys = [dt, bucket(128, user_id)]. It cannot be
+ * [bucket(128, user_id), dt]. the partition values can be ("2023-10-01", 0), ("2023-10-01", 1),
+ * ("2023-10-02", 0), ... it cannot be ("2023-10-01", 1), ("2023-10-01", 0), ("2023-10-02", 0), ...
+ */
+// TODO -- remove this once this function can be imported from flink libraries
+public class KeyGroupedPartitioning implements Partitioning {
+ private final TransformExpression[] keys;
+ private final int numPartitions;
+ private final Row[] partitionValues;
+
+ // bucket(128, user_id)
+ // partitioned by (dt, bucket(128, user_id)
+ // dt=2023-10-01/user_id_bucket=0/ => InternalRow("2023-10-01", 0)
+
+ public KeyGroupedPartitioning(
+ TransformExpression[] keys, Row[] partitionValues, int numPartitions) {
+ this.keys = keys;
+ this.numPartitions = numPartitions;
+ this.partitionValues = partitionValues;
+ }
+
+ /** Returns the partition transform expressions for this partitioning. */
+ public TransformExpression[] keys() {
+ return keys;
+ }
+
+ public Row[] getPartitionValues() {
+ return partitionValues;
+ }
+
+ @Override
+ public int numPartitions() {
+ return numPartitions;
+ }
+
+ boolean isPartitionedByKeys(String keysString) {
+ for (TransformExpression key : keys) {
+ if (key.getKey().equals(keysString)) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ /**
+ * * Checks if this partitioning is compatible with another KeyGroupedPartitioning. conditions: 1.
+ * numPartitions is the same 2. keys length is the same and for each key,keys are compatible 3.
+ * RowData length is the same. values are the same.
+ *
+ * @param other the other KeyGroupedPartitioning to check compatibility with
+ * @return true if compatible, false otherwise
+ */
+ public boolean isCompatible(KeyGroupedPartitioning other) {
+ if (other == null) {
+ return false;
+ }
+
+ // 1. Check numPartitions is the same
+ if (this.numPartitions != other.numPartitions) {
+ return false;
+ }
+
+ // 2. Check keys length is the same and each key is compatible
+ if (this.keys.length != other.keys.length) {
+ return false;
+ }
+
+ for (int i = 0; i < this.keys.length; i++) {
+ if (!this.keys[i].isCompatible(other.keys[i])) {
+ return false;
+ }
+ }
+
+ // 3. Check RowData length and values are the same
+ if (this.partitionValues.length != other.partitionValues.length) {
+ return false;
+ }
+
+ for (int i = 0; i < this.partitionValues.length; i++) {
+ Row thisRow = this.partitionValues[i];
+ Row otherRow = other.partitionValues[i];
+
+ if (thisRow.getArity() != otherRow.getArity()) {
+ return false;
+ }
+
+ for (int j = 0; j < thisRow.getArity(); j++) {
+ // filed in row cannot be null
+ Preconditions.checkArgument(thisRow.getField(j) != null);
+ if (!thisRow.getField(j).equals(otherRow.getField(j))) {
+ return false;
+ }
+ }
+ }
+
+ return true;
+ }
+}
diff --git a/flink/v1.18/flink/src/main/java/org/apache/flink/table/connector/source/partitioning/Partitioning.java b/flink/v1.18/flink/src/main/java/org/apache/flink/table/connector/source/partitioning/Partitioning.java
new file mode 100644
index 000000000000..a9615f2a46d2
--- /dev/null
+++ b/flink/v1.18/flink/src/main/java/org/apache/flink/table/connector/source/partitioning/Partitioning.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flink.table.connector.source.partitioning;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+// TODO -- remove this once this function can be imported from flink libraries
+@PublicEvolving
+public interface Partitioning {
+ /** Returns the number of partitions that the data is split across. */
+ int numPartitions();
+}
diff --git a/flink/v1.18/flink/src/main/java/org/apache/flink/table/expressions/TransformExpression.java b/flink/v1.18/flink/src/main/java/org/apache/flink/table/expressions/TransformExpression.java
new file mode 100644
index 000000000000..8b43d83dd242
--- /dev/null
+++ b/flink/v1.18/flink/src/main/java/org/apache/flink/table/expressions/TransformExpression.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flink.table.expressions;
+
+import java.util.Objects;
+import java.util.Optional;
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.util.Preconditions;
+
+/**
+ * Represents a transform expression that can be used for partitioning or other transformations. It
+ * consists of a key, an optional function name, and an optional number of buckets.
+ */
+// TODO -- remove this once this function can be imported from flink libraries
+@PublicEvolving
+public class TransformExpression {
+ private final String key;
+ private final Optional functionName;
+ private final Optional numBucketsOpt;
+
+ /**
+ * Creates a new TransformExpression with the given key, function name, and number of buckets.
+ *
+ * @param key the key to be transformed
+ * @param functionName the name of the transform function, can be null
+ * @param numBuckets the number of buckets for bucket transforms, can be null
+ */
+ public TransformExpression(
+ @Nonnull String key, @Nullable String functionName, @Nullable Integer numBuckets) {
+ this.key = Preconditions.checkNotNull(key, "Key must not be null");
+ this.functionName = Optional.ofNullable(functionName);
+ this.numBucketsOpt = Optional.ofNullable(numBuckets);
+ }
+
+ /**
+ * Returns the key to be transformed.
+ *
+ * @return the key
+ */
+ public String getKey() {
+ return key;
+ }
+
+ /**
+ * Returns the name of the transform function, if present.
+ *
+ * @return the function name, or empty if not set
+ */
+ public Optional getFunctionName() {
+ return functionName;
+ }
+
+ /**
+ * Returns the number of buckets if this is a bucket transform, or empty otherwise.
+ *
+ * @return the number of buckets, or empty if not a bucket transform
+ */
+ public Optional getNumBucketsOpt() {
+ return numBucketsOpt;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ TransformExpression that = (TransformExpression) o;
+ return key.equals(that.key)
+ && functionName.equals(that.functionName)
+ && numBucketsOpt.equals(that.numBucketsOpt);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(key, functionName, numBucketsOpt);
+ }
+
+ @Override
+ public String toString() {
+ if (functionName.isPresent()) {
+ StringBuilder builder =
+ new StringBuilder().append(functionName.get()).append("(").append(key);
+ if (numBucketsOpt.isPresent()) {
+ builder.append(", ").append(numBucketsOpt.get());
+ }
+ return builder.append(")").toString();
+ }
+ return key;
+ }
+
+ /**
+ * * Checks if this TransformExpression is compatible with another TransformExpression.
+ * Compatibility is defined by having the same function name and number of buckets. examples: -
+ * bucket(128, user_id) is compatible with bucket(64, user_id) - year(dt) is compatible with
+ * year(dt) but not compatible with month(dt)
+ */
+ public boolean isCompatible(TransformExpression other) {
+ return this.functionName.equals(other.functionName)
+ && this.numBucketsOpt.equals(other.numBucketsOpt);
+ }
+}
diff --git a/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/SpecTransformToFlinkTransform.java b/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/SpecTransformToFlinkTransform.java
new file mode 100644
index 000000000000..4df1bdbdbec2
--- /dev/null
+++ b/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/SpecTransformToFlinkTransform.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink;
+
+import org.apache.flink.table.expressions.TransformExpression;
+import org.apache.iceberg.transforms.PartitionSpecVisitor;
+
+/**
+ * A visitor to convert Iceberg partition transforms to Flink TransformExpression objects. Adapted
+ * from Spark's SpecTransformToSparkTransform for Flink's partition-aware optimizations.
+ */
+public class SpecTransformToFlinkTransform implements PartitionSpecVisitor {
+
+ public SpecTransformToFlinkTransform() {}
+
+ @Override
+ public TransformExpression identity(int fieldId, String sourceName, int sourceId) {
+ // TODO -- this maps to the "various" datatypes, are we sure we don't want to include 'identity'
+ // as the name
+ return new TransformExpression(sourceName, null, null);
+ }
+
+ @Override
+ public TransformExpression bucket(int fieldId, String sourceName, int sourceId, int numBuckets) {
+ return new TransformExpression(sourceName, "bucket", numBuckets);
+ }
+
+ @Override
+ public TransformExpression truncate(int fieldId, String sourceName, int sourceId, int width) {
+ // TODO: Implement truncate transform
+ throw new UnsupportedOperationException("Not implemented");
+ }
+
+ @Override
+ public TransformExpression year(int fieldId, String sourceName, int sourceId) {
+ return new TransformExpression(sourceName, "year", null);
+ }
+
+ @Override
+ public TransformExpression month(int fieldId, String sourceName, int sourceId) {
+ return new TransformExpression(sourceName, "month", null);
+ }
+
+ @Override
+ public TransformExpression day(int fieldId, String sourceName, int sourceId) {
+ return new TransformExpression(sourceName, "day", null);
+ }
+
+ @Override
+ public TransformExpression hour(int fieldId, String sourceName, int sourceId) {
+ return new TransformExpression(sourceName, "hour", null);
+ }
+
+ @Override
+ public TransformExpression alwaysNull(int fieldId, String sourceName, int sourceId) {
+ // TODO: Implement alwaysNull transform (may return null like Spark)
+ throw new UnsupportedOperationException("Not implemented");
+ }
+
+ @Override
+ public TransformExpression unknown(
+ int fieldId, String sourceName, int sourceId, String transform) {
+ // TODO: Implement unknown transform
+ throw new UnsupportedOperationException("Not implemented");
+ }
+}
diff --git a/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/source/FlinkSplitPlanner.java b/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/source/FlinkSplitPlanner.java
index 15078809714f..878059638983 100644
--- a/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/source/FlinkSplitPlanner.java
+++ b/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/source/FlinkSplitPlanner.java
@@ -20,13 +20,19 @@
import java.io.IOException;
import java.io.UncheckedIOException;
+import java.util.Collection;
import java.util.List;
import java.util.concurrent.ExecutorService;
+import java.util.stream.Collectors;
import org.apache.flink.annotation.Internal;
+import org.apache.iceberg.BaseCombinedScanTask;
import org.apache.iceberg.CombinedScanTask;
import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.IncrementalAppendScan;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Partitioning;
import org.apache.iceberg.Scan;
+import org.apache.iceberg.ScanTaskGroup;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.TableScan;
@@ -37,6 +43,8 @@
import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.types.Types;
+import org.apache.iceberg.util.TableScanUtil;
import org.apache.iceberg.util.Tasks;
@Internal
@@ -69,6 +77,69 @@ static FlinkInputSplit[] planInputSplits(
}
}
+ /** This returns partition-aware splits for the FLIP-27 source */
+ public static List planIcebergPartitionAwareSourceSplits(
+ Table table, ScanContext context, ExecutorService workerPool) {
+ Preconditions.checkArgument(table != null, "Table must be initialized");
+ Collection specs = table.specs().values();
+ Types.StructType groupingKeyType = null;
+ try {
+ groupingKeyType = Partitioning.groupingKeyType(null, specs);
+ // when groupingKeyType().fields().isEmpty() then Flink could not find a way to group the
+ // partitions
+ // PartitionAwareSplitAssignment should not be used and should have fallen back to default
+ // Split Assignment
+ Preconditions.checkArgument(
+ !groupingKeyType.fields().isEmpty(),
+ "Currently only Partitions that are able to be grouped are supported");
+ } catch (Exception ve) {
+ throw new RuntimeException(
+ "Currently only Partitions that are able to be grouped are supported: ", ve);
+ }
+
+ List fileTasks;
+ try (CloseableIterable filesIterable = planFiles(table, context, workerPool)) {
+ fileTasks = Lists.newArrayList(filesIterable);
+ List> taskGroups =
+ TableScanUtil.planTaskGroups(
+ fileTasks,
+ context.splitSize(),
+ context.splitLookback(),
+ context.splitOpenFileCost(),
+ groupingKeyType);
+ return taskGroups.stream()
+ .map(
+ taskGroup ->
+ IcebergSourceSplit.fromCombinedScanTask(
+ new BaseCombinedScanTask(Lists.newArrayList(taskGroup.tasks()))))
+ .collect(Collectors.toList());
+ } catch (IOException e) {
+ throw new UncheckedIOException("Failed to plan files: ", e);
+ }
+ }
+
+ // similar logic as planTasks, this is inspired from Spark's tasks() methods adapted for Flink
+ static CloseableIterable planFiles(
+ Table table, ScanContext context, ExecutorService workerPool) {
+ ScanMode scanMode = checkScanMode(context);
+ Preconditions.checkArgument(
+ scanMode == ScanMode.BATCH,
+ "planFiles is implemented for Batch execution mode only. Current mode: " + scanMode);
+ TableScan scan = table.newScan();
+ scan = refineScanWithBaseConfigs(scan, context, workerPool);
+ if (context.snapshotId() != null) {
+ scan = scan.useSnapshot(context.snapshotId());
+ } else if (context.tag() != null) {
+ scan = scan.useRef(context.tag());
+ } else if (context.branch() != null) {
+ scan = scan.useRef(context.branch());
+ }
+ if (context.asOfTimestamp() != null) {
+ scan = scan.asOfTime(context.asOfTimestamp());
+ }
+ return scan.planFiles();
+ }
+
/** This returns splits for the FLIP-27 source */
public static List planIcebergSourceSplits(
Table table, ScanContext context, ExecutorService workerPool) {
diff --git a/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/source/IcebergSource.java b/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/source/IcebergSource.java
index 0655cf87a996..82ea296fedc7 100644
--- a/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/source/IcebergSource.java
+++ b/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/source/IcebergSource.java
@@ -28,15 +28,22 @@
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
import org.apache.flink.annotation.Experimental;
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.api.connector.source.Source;
import org.apache.flink.api.connector.source.SourceReader;
import org.apache.flink.api.connector.source.SourceReaderContext;
import org.apache.flink.api.connector.source.SplitEnumerator;
import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.data.RowData;
import org.apache.flink.util.Preconditions;
@@ -51,6 +58,7 @@
import org.apache.iceberg.flink.FlinkSchemaUtil;
import org.apache.iceberg.flink.TableLoader;
import org.apache.iceberg.flink.source.assigner.OrderedSplitAssignerFactory;
+import org.apache.iceberg.flink.source.assigner.PartitionAwareSplitAssignerFactory;
import org.apache.iceberg.flink.source.assigner.SimpleSplitAssignerFactory;
import org.apache.iceberg.flink.source.assigner.SplitAssigner;
import org.apache.iceberg.flink.source.assigner.SplitAssignerFactory;
@@ -65,6 +73,7 @@
import org.apache.iceberg.flink.source.reader.IcebergSourceReaderMetrics;
import org.apache.iceberg.flink.source.reader.MetaDataReaderFunction;
import org.apache.iceberg.flink.source.reader.ReaderFunction;
+import org.apache.iceberg.flink.source.reader.RowDataConverter;
import org.apache.iceberg.flink.source.reader.RowDataReaderFunction;
import org.apache.iceberg.flink.source.reader.SerializableRecordEmitter;
import org.apache.iceberg.flink.source.reader.SplitWatermarkExtractor;
@@ -72,6 +81,7 @@
import org.apache.iceberg.flink.source.split.IcebergSourceSplitSerializer;
import org.apache.iceberg.flink.source.split.SerializableComparator;
import org.apache.iceberg.flink.source.split.SplitComparators;
+import org.apache.iceberg.flink.util.FlinkCompatibilityUtil;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.util.ThreadPools;
@@ -95,6 +105,24 @@ public class IcebergSource implements Source emitter;
private final String tableName;
+ // cache the discovered splits by planSplitsForBatch, which can be called twice. And they come
+ // from two different threads: (1) source/stream construction by main thread (2) enumerator
+ // creation. Hence need volatile here. When Storage Partition Join Optimization is enabled
+ // for Batch Execution, will be called on in main thread within IcebergTableSource
+ // as needed to notify Flink Planner to determine whether we can apply it to the table sources
+ private volatile List batchSplits;
+
+ // if Storage Partition Join (SPJ) is enabled on the Flink Engine side AND the Planner decided
+ // that
+ // SPJ can be used
+ private final boolean shouldApplyPartitionedRead;
+ // TODO -- remove once Flink changes are landed and we can use this property from
+ // OptimizerConfigOptions
+ static final ConfigOption TABLE_OPTIMIZER_STORAGE_PARTITION_JOIN_ENABLED =
+ ConfigOptions.key("table.optimizer.storage-partition-join-enabled")
+ .booleanType()
+ .defaultValue(false);
+
IcebergSource(
TableLoader tableLoader,
ScanContext scanContext,
@@ -102,7 +130,9 @@ public class IcebergSource implements Source splitComparator,
Table table,
- SerializableRecordEmitter emitter) {
+ SerializableRecordEmitter emitter,
+ boolean shouldApplyPartitionedRead,
+ List batchSplits) {
Preconditions.checkNotNull(tableLoader, "tableLoader is required.");
Preconditions.checkNotNull(readerFunction, "readerFunction is required.");
Preconditions.checkNotNull(assignerFactory, "assignerFactory is required.");
@@ -114,6 +144,8 @@ public class IcebergSource implements Source planSplitsForBatch(String threadName) {
+ if (batchSplits != null) {
+ return batchSplits;
+ }
+
ExecutorService workerPool =
ThreadPools.newWorkerPool(threadName, scanContext.planParallelism());
try (TableLoader loader = tableLoader.clone()) {
loader.open();
- List splits =
- FlinkSplitPlanner.planIcebergSourceSplits(loader.loadTable(), scanContext, workerPool);
+ if (shouldApplyPartitionedRead) {
+ // FlinkSplitPlanner.planIcebergPartitionAwareSourceSplits is called by IcebergTableSource
+ // as a requirement to notify the Flink Planner. So batchSplits should have already been
+ // called
+ // this is ideally checked in outputPartitioning() so this should never occur. We should
+ // only
+ // get partitionAware splits in Batch execution mode
+ Preconditions.checkArgument(
+ !scanContext.isStreaming(), "partition-awareness is only available in batch mode");
+ Preconditions.checkArgument(
+ batchSplits != null,
+ "Batch splits should have already been called on by IcebergTableSource");
+ // TODO -- see above comment and remove
+ this.batchSplits =
+ FlinkSplitPlanner.planIcebergPartitionAwareSourceSplits(
+ loader.loadTable(), scanContext, workerPool);
+ } else {
+ this.batchSplits =
+ FlinkSplitPlanner.planIcebergSourceSplits(loader.loadTable(), scanContext, workerPool);
+ }
LOG.info(
- "Discovered {} splits from table {} during job initialization", splits.size(), tableName);
- return splits;
+ "Discovered {} splits from table {} during job initialization",
+ batchSplits.size(),
+ tableName);
+ return batchSplits;
} catch (IOException e) {
throw new UncheckedIOException("Failed to close table loader", e);
} finally {
@@ -203,10 +263,33 @@ private SplitEnumerator createEnumer
} else {
List splits = planSplitsForBatch(planningThreadName());
assigner.onDiscoveredSplits(splits);
+ // clear the cached splits after enumerator creation as they won't be needed anymore
+ this.batchSplits = null;
return new StaticIcebergEnumerator(enumContext, assigner);
}
}
+ private boolean shouldInferParallelism() {
+ return !scanContext.isStreaming();
+ }
+
+ private int inferParallelism(ReadableConfig flinkConf, StreamExecutionEnvironment env) {
+ int parallelism =
+ SourceUtil.inferParallelism(
+ flinkConf,
+ scanContext.limit(),
+ () -> {
+ List splits = planSplitsForBatch(planningThreadName());
+ return splits.size();
+ });
+
+ if (env.getMaxParallelism() > 0) {
+ parallelism = Math.min(parallelism, env.getMaxParallelism());
+ }
+
+ return parallelism;
+ }
+
public static Builder builder() {
return new Builder<>();
}
@@ -221,11 +304,15 @@ public static class Builder {
private SplitAssignerFactory splitAssignerFactory;
private SerializableComparator splitComparator;
private ReaderFunction readerFunction;
+ private RowDataConverter converter;
private ReadableConfig flinkConfig = new Configuration();
private final ScanContext.Builder contextBuilder = ScanContext.builder();
private TableSchema projectedFlinkSchema;
private Boolean exposeLocality;
+ private boolean shouldApplyPartitionedRead;
+ private List batchSplits;
+
private final Map readOptions = Maps.newHashMap();
Builder() {}
@@ -467,6 +554,36 @@ public Builder properties(Map properties) {
return this;
}
+ /**
+ * Sets the splitAssigner to PartitionAwareSplitAssigner when true. If Storage Partition Join
+ * (SPJ) is enabled on the Flink Engine side AND the Planner decided that SPJ can be used, we
+ * should use PartitionAwareSplitAssigner to enabled ensure file locality and reduce unnecessary
+ * shuffle.
+ *
+ * Ensure that `table.optimizer.storage-partition-join-enabled` is set to true to enable the
+ * Flink Planner to consider whether it can use SPJ See {@link
+ * org.apache.iceberg.flink.source.assigner.PartitionAwareSplitAssigner}
+ */
+ public Builder applyPartitionedRead(boolean applyPartitionedRead) {
+ this.shouldApplyPartitionedRead = applyPartitionedRead;
+ return this;
+ }
+
+ /**
+ * Sets pre-computed batch splits to avoid expensive recomputation during execution.
+ *
+ * This method is used to pass splits that have already been computed by {@code
+ * IcebergTableSource.outputPartitioning()} for partition discovery. Without this, splits would
+ * be computed twice: once for output partitioning and once for execution.
+ *
+ * @param splits pre-computed splits from partition-aware planning
+ * @return this builder for method chaining
+ */
+ public Builder batchSplits(List splits) {
+ this.batchSplits = splits;
+ return this;
+ }
+
public IcebergSource build() {
if (table == null) {
try (TableLoader loader = tableLoader) {
@@ -521,6 +638,16 @@ public IcebergSource build() {
}
}
+ if (shouldApplyPartitionedRead) {
+ // TODO -- should be replaced by
+ // OptimizerConfigOptions.TABLE_OPTIMIZER_STORAGE_PARTITION_JOIN_ENABLED once
+ // flink configs are landed
+ Preconditions.checkArgument(
+ flinkConfig.get(TABLE_OPTIMIZER_STORAGE_PARTITION_JOIN_ENABLED),
+ "can only use PartitionAwareSplitAssigner if Storage Partition Join is enabled");
+ splitAssignerFactory = new PartitionAwareSplitAssignerFactory();
+ }
+
if (splitAssignerFactory == null) {
if (splitComparator == null) {
splitAssignerFactory = new SimpleSplitAssignerFactory();
@@ -537,7 +664,44 @@ public IcebergSource build() {
splitAssignerFactory,
splitComparator,
table,
- emitter);
+ emitter,
+ shouldApplyPartitionedRead,
+ batchSplits);
+ }
+
+ /**
+ * Build the {@link IcebergSource} and create a {@link DataStream} from the source. Watermark
+ * strategy is set to {@link WatermarkStrategy#noWatermarks()}.
+ *
+ * @return data stream from the Iceberg source
+ */
+ public DataStream buildStream(StreamExecutionEnvironment env) {
+ // buildStream should only be called with RowData or Converter paths.
+ Preconditions.checkState(
+ readerFunction == null,
+ "Cannot set reader function when building a data stream from the source");
+ IcebergSource source = build();
+ TypeInformation outputTypeInfo =
+ outputTypeInfo(converter, table.schema(), source.scanContext.project());
+ DataStreamSource stream =
+ env.fromSource(source, WatermarkStrategy.noWatermarks(), source.name(), outputTypeInfo);
+ if (source.shouldInferParallelism()) {
+ stream = stream.setParallelism(source.inferParallelism(flinkConfig, env));
+ }
+
+ return stream;
+ }
+
+ private static TypeInformation outputTypeInfo(
+ RowDataConverter converter, Schema tableSchema, Schema projected) {
+ if (converter != null) {
+ return converter.getProducedType();
+ } else {
+ // output type is RowData
+ Schema readSchema = projected != null ? projected : tableSchema;
+ return (TypeInformation)
+ FlinkCompatibilityUtil.toTypeInfo(FlinkSchemaUtil.convert(readSchema));
+ }
}
}
}
diff --git a/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/source/IcebergTableSource.java b/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/source/IcebergTableSource.java
index 610657e8d47b..4bfd6e9feaec 100644
--- a/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/source/IcebergTableSource.java
+++ b/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/source/IcebergTableSource.java
@@ -19,15 +19,16 @@
package org.apache.iceberg.flink.source;
import java.util.Arrays;
+import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.stream.Collectors;
import org.apache.flink.annotation.Internal;
-import org.apache.flink.api.common.eventtime.WatermarkStrategy;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.connector.ChangelogMode;
@@ -37,18 +38,41 @@
import org.apache.flink.table.connector.source.ScanTableSource;
import org.apache.flink.table.connector.source.abilities.SupportsFilterPushDown;
import org.apache.flink.table.connector.source.abilities.SupportsLimitPushDown;
+import org.apache.flink.table.connector.source.abilities.SupportsPartitioning;
import org.apache.flink.table.connector.source.abilities.SupportsProjectionPushDown;
+import org.apache.flink.table.connector.source.partitioning.KeyGroupedPartitioning;
+import org.apache.flink.table.connector.source.partitioning.Partitioning;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.expressions.ResolvedExpression;
+import org.apache.flink.table.expressions.TransformExpression;
import org.apache.flink.table.types.DataType;
+import org.apache.flink.types.Row;
+import org.apache.flink.types.RowKind;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.PartitionField;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.SortOrder;
+import org.apache.iceberg.SortOrderComparators;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.Table;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.flink.FlinkConfigOptions;
import org.apache.iceberg.flink.FlinkFilters;
+import org.apache.iceberg.flink.FlinkSchemaUtil;
+import org.apache.iceberg.flink.SpecTransformToFlinkTransform;
import org.apache.iceberg.flink.TableLoader;
import org.apache.iceberg.flink.source.assigner.SplitAssignerType;
+import org.apache.iceberg.flink.source.split.IcebergSourceSplit;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.transforms.PartitionSpecVisitor;
+import org.apache.iceberg.types.Types;
+import org.apache.iceberg.util.SnapshotUtil;
+import org.apache.iceberg.util.ThreadPools;
/** Flink Iceberg table source. */
@Internal
@@ -56,7 +80,8 @@ public class IcebergTableSource
implements ScanTableSource,
SupportsProjectionPushDown,
SupportsFilterPushDown,
- SupportsLimitPushDown {
+ SupportsLimitPushDown,
+ SupportsPartitioning {
private int[] projectedFields;
private Long limit;
@@ -68,6 +93,15 @@ public class IcebergTableSource
private final boolean isLimitPushDown;
private final ReadableConfig readableConfig;
+ /** The following section is needed for Storage Partition Join */
+ private boolean shouldApplyPartitionedRead;
+
+ private Optional groupingKeyTransforms;
+ private Optional> specs;
+ private Optional groupingKeyType;
+ private Optional table; // cache table for lazy loading
+ private Optional> batchSplits; // cache batch splits for lazy loading
+
private IcebergTableSource(IcebergTableSource toCopy) {
this.loader = toCopy.loader;
this.schema = toCopy.schema;
@@ -77,6 +111,12 @@ private IcebergTableSource(IcebergTableSource toCopy) {
this.limit = toCopy.limit;
this.filters = toCopy.filters;
this.readableConfig = toCopy.readableConfig;
+ this.shouldApplyPartitionedRead = toCopy.shouldApplyPartitionedRead;
+ this.groupingKeyTransforms = toCopy.groupingKeyTransforms;
+ this.specs = toCopy.specs;
+ this.groupingKeyType = toCopy.groupingKeyType;
+ this.table = toCopy.table;
+ this.batchSplits = toCopy.batchSplits;
}
public IcebergTableSource(
@@ -84,7 +124,7 @@ public IcebergTableSource(
TableSchema schema,
Map properties,
ReadableConfig readableConfig) {
- this(loader, schema, properties, null, false, null, ImmutableList.of(), readableConfig);
+ this(loader, schema, properties, null, false, null, ImmutableList.of(), readableConfig, false);
}
private IcebergTableSource(
@@ -95,7 +135,8 @@ private IcebergTableSource(
boolean isLimitPushDown,
Long limit,
List filters,
- ReadableConfig readableConfig) {
+ ReadableConfig readableConfig,
+ boolean shouldApplyPartitionedRead) {
this.loader = loader;
this.schema = schema;
this.properties = properties;
@@ -104,6 +145,12 @@ private IcebergTableSource(
this.limit = limit;
this.filters = filters;
this.readableConfig = readableConfig;
+ this.shouldApplyPartitionedRead = shouldApplyPartitionedRead;
+ this.table = Optional.empty();
+ this.groupingKeyType = Optional.empty();
+ this.specs = Optional.empty();
+ this.groupingKeyTransforms = Optional.empty();
+ this.batchSplits = Optional.empty();
}
@Override
@@ -128,26 +175,23 @@ private DataStream createDataStream(StreamExecutionEnvironment execEnv)
.build();
}
- private DataStreamSource createFLIP27Stream(StreamExecutionEnvironment env) {
+ private DataStream createFLIP27Stream(StreamExecutionEnvironment env) {
SplitAssignerType assignerType =
readableConfig.get(FlinkConfigOptions.TABLE_EXEC_SPLIT_ASSIGNER_TYPE);
- IcebergSource source =
- IcebergSource.forRowData()
- .tableLoader(loader)
- .assignerFactory(assignerType.factory())
- .properties(properties)
- .project(getProjectedSchema())
- .limit(limit)
- .filters(filters)
- .flinkConfig(readableConfig)
- .build();
- DataStreamSource stream =
- env.fromSource(
- source,
- WatermarkStrategy.noWatermarks(),
- source.name(),
- TypeInformation.of(RowData.class));
- return stream;
+ return IcebergSource.forRowData()
+ .tableLoader(loader)
+ .assignerFactory(assignerType.factory())
+ .properties(properties)
+ .project(getProjectedSchema())
+ .limit(limit)
+ .filters(filters)
+ .flinkConfig(readableConfig)
+ // TODO -- one future optimization is to call .table(table.orElse(null)) to prevent double
+ // loading
+ // when SPJ is used. Not adding now, due to production risks (i.e, unknown side effects)
+ .applyPartitionedRead(shouldApplyPartitionedRead)
+ .batchSplits(batchSplits.orElse(null))
+ .buildStream(env);
}
private TableSchema getProjectedSchema() {
@@ -226,4 +270,172 @@ public DynamicTableSource copy() {
public String asSummaryString() {
return "Iceberg table source";
}
+
+ @Override
+ public Partitioning outputPartitioning() {
+ if (groupingKeyType().fields().isEmpty()) {
+ // TODO -- discuss with Lu on what we plan to return in this case. Spark returns
+ // `UnknownPartitioning(taskGroups().size());`
+ return null;
+ } else {
+ List fileTasks = tasks();
+ Set uniquePartitions =
+ fileTasks.stream().map(task -> task.file().partition()).collect(Collectors.toSet());
+ // Convert StructLike partitions to Flink Row objects with proper ordering
+ Row[] partitionValues = discoverPartitionValues(uniquePartitions);
+ return new KeyGroupedPartitioning(
+ groupingKeyTransforms(), partitionValues, partitionValues.length);
+ }
+ }
+
+ private List tasks() {
+ if (batchSplits.isPresent()) {
+ return batchSplits.get().stream()
+ .flatMap(split -> split.task().tasks().stream())
+ .collect(Collectors.toList());
+ }
+ org.apache.iceberg.flink.source.ScanContext.Builder contextBuilder =
+ org.apache.iceberg.flink.source.ScanContext.builder();
+ Preconditions.checkArgument(getTable().isPresent(), "Table must be defined");
+ contextBuilder.resolveConfig(getTable().get(), properties, readableConfig);
+ contextBuilder.filters(filters);
+ Schema icebergSchema = getTable().get().schema();
+ // TODO -- this is called twice now, we may want to cache in the future
+ // Taken from Build in IcebergSource
+ TableSchema projectedFlinkSchema = getProjectedSchema();
+ if (projectedFlinkSchema != null) {
+ contextBuilder.project(FlinkSchemaUtil.convert(icebergSchema, projectedFlinkSchema));
+ }
+ org.apache.iceberg.flink.source.ScanContext context = contextBuilder.build();
+ Preconditions.checkArgument(
+ !context.isStreaming(), "partition-awareness is only available in batch mode");
+ ExecutorService workerPool =
+ ThreadPools.newWorkerPool("IcebergTableSource", context.planParallelism());
+ List fileTasks = null;
+ try {
+ try {
+ this.batchSplits =
+ Optional.of(
+ FlinkSplitPlanner.planIcebergPartitionAwareSourceSplits(
+ getTable().get(), context, workerPool));
+ fileTasks =
+ batchSplits.get().stream()
+ .flatMap(split -> split.task().tasks().stream())
+ .collect(Collectors.toList());
+ } catch (Exception e) {
+ throw new RuntimeException("Failed to get batch splits: ", e);
+ }
+ } finally {
+ workerPool.shutdown();
+ }
+ return fileTasks;
+ }
+
+ private Row[] discoverPartitionValues(Set uniquePartitions) {
+ // TODO -- determine whether this is needed
+ if (uniquePartitions.isEmpty()) {
+ return new Row[0];
+ }
+ Types.StructType partitionGroupingKeyType = groupingKeyType();
+
+ // Sort partitions using Iceberg's built-in comparator to ensure consistent ordering
+ Schema groupingSchema = new Schema(partitionGroupingKeyType.fields());
+ SortOrder.Builder sortBuilder = SortOrder.builderFor(groupingSchema);
+
+ // Add each field to the sort order for consistent ordering
+ for (Types.NestedField field : partitionGroupingKeyType.fields()) {
+ sortBuilder.asc(field.name());
+ }
+ SortOrder sortOrder = sortBuilder.build();
+
+ Comparator comparator = SortOrderComparators.forSchema(groupingSchema, sortOrder);
+
+ List sortedPartitions =
+ uniquePartitions.stream().sorted(comparator).collect(Collectors.toList());
+
+ Row[] partitions = new Row[sortedPartitions.size()];
+ int index = 0;
+
+ for (StructLike partition : sortedPartitions) {
+ Row row = Row.ofKind(RowKind.INSERT, new Object[partitionGroupingKeyType.fields().size()]);
+ for (int i = 0; i < partitionGroupingKeyType.fields().size(); i++) {
+ Object value = partition.get(i, Object.class);
+ row.setField(i, value);
+ }
+ partitions[index++] = row;
+ }
+ return partitions;
+ }
+
+ @Override
+ public void applyPartitionedRead() {
+ this.shouldApplyPartitionedRead = true;
+ }
+
+ private Set specs() {
+ Preconditions.checkArgument(getTable().isPresent(), "Table must be defined");
+ return Sets.newHashSet(getTable().get().specs().values());
+ }
+
+ private Types.StructType groupingKeyType() {
+ if (!groupingKeyType.isPresent()) {
+ // TODO -- determine whether schema is needed for Flink / current use-cases, its used
+ // on the Spark-Side but given method definition, this should will also work as expected
+ this.groupingKeyType =
+ Optional.of(org.apache.iceberg.Partitioning.groupingKeyType(null, specs()));
+ }
+ return groupingKeyType.get();
+ }
+
+ // taken directly from SparkPartitioningAwarenessScan and Adapted for Flink
+ private TransformExpression[] groupingKeyTransforms() {
+ if (!groupingKeyTransforms.isPresent()) {
+ Map fieldsById = indexFieldsById(specs());
+
+ List groupingKeyFields =
+ groupingKeyType().fields().stream()
+ .map(field -> fieldsById.get(field.fieldId()))
+ .collect(Collectors.toList());
+
+ Preconditions.checkArgument(getTable().isPresent(), "Table must exist to get table schema");
+ // TODO -- handle case where we need the schema for specific snapshot-id or branch as done in
+ // spark
+ // for now, same as getTable().get().schema() but leaving as is as note for future iterations
+ Schema tableSchema = SnapshotUtil.schemaFor(getTable().get(), null);
+ List transforms = Lists.newArrayList();
+ SpecTransformToFlinkTransform visitor = new SpecTransformToFlinkTransform();
+ for (PartitionField field : groupingKeyFields) {
+ TransformExpression transform = PartitionSpecVisitor.visit(tableSchema, field, visitor);
+ if (transform != null) {
+ transforms.add(transform);
+ }
+ }
+ this.groupingKeyTransforms = Optional.of(transforms.toArray(new TransformExpression[0]));
+ }
+ return groupingKeyTransforms.get();
+ }
+
+ private Map indexFieldsById(Iterable specIterable) {
+ Map fieldsById = Maps.newHashMap();
+
+ for (PartitionSpec spec : specIterable) {
+ for (PartitionField field : spec.fields()) {
+ fieldsById.putIfAbsent(field.fieldId(), field);
+ }
+ }
+ return fieldsById;
+ }
+
+ private Optional getTable() {
+ if (!table.isPresent()) {
+ try {
+ this.loader.open();
+ this.table = Optional.of(this.loader.loadTable());
+ return table;
+ } catch (Exception e) {
+ throw new RuntimeException("Unable to load Source Table, bug", e);
+ }
+ }
+ return table;
+ }
}
diff --git a/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/source/assigner/DefaultSplitAssigner.java b/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/source/assigner/DefaultSplitAssigner.java
index e7447d08c985..050013b3e762 100644
--- a/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/source/assigner/DefaultSplitAssigner.java
+++ b/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/source/assigner/DefaultSplitAssigner.java
@@ -55,7 +55,8 @@ public DefaultSplitAssigner(
}
@Override
- public synchronized GetSplitResult getNext(@Nullable String hostname) {
+ public synchronized GetSplitResult getNext(
+ @Nullable String hostname, int subtaskId, int numRegisteredTasks) {
if (pendingSplits.isEmpty()) {
return GetSplitResult.unavailable();
} else {
diff --git a/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/source/assigner/PartitionAwareSplitAssigner.java b/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/source/assigner/PartitionAwareSplitAssigner.java
new file mode 100644
index 000000000000..576d389fc5e5
--- /dev/null
+++ b/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/source/assigner/PartitionAwareSplitAssigner.java
@@ -0,0 +1,381 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.source.assigner;
+
+import java.util.ArrayDeque;
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+import javax.annotation.Nullable;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.PartitionData;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Partitioning;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.flink.source.split.IcebergSourceSplit;
+import org.apache.iceberg.flink.source.split.IcebergSourceSplitState;
+import org.apache.iceberg.flink.source.split.IcebergSourceSplitStatus;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.types.Types;
+import org.apache.iceberg.util.StructProjection;
+
+/**
+ * A partition-aware split assigner that distributes splits to tasks based on consistent hashing of
+ * partition grouping keys, ensuring better data locality and more balanced workload distribution.
+ *
+ * This assigner groups splits by their partition information and uses Iceberg's partition
+ * grouping key computation along with consistent hashing to assign splits to specific task IDs.
+ *
+ *
Lifecycle and Initialization
+ *
+ * The assigner uses lazy initialization because the number of registered tasks is only known at
+ * runtime during the first {@link #getNext(String, int, int)} call:
+ *
+ *
Partition Grouping
+ *
+ * Uses {@link org.apache.iceberg.Partitioning#groupingKeyType(String, java.util.Set)} to compute
+ * partition grouping keys that work across different partition specifications. This handles:
+ *
+ *
+ * - Identity partitions
+ *
- Transform partitions (bucket, truncate, year, month, day, hour)
+ *
- Mixed partition schemes in the same table
+ *
+ *
+ * Thread Safety
+ *
+ * All public methods are synchronized as required by the Flink split enumerator contract. Since
+ * all methods are called from the source coordinator thread, synchronization provides consistency
+ * without performance concerns.
+ *
+ *
State Management
+ *
+ * Supports checkpoint/restore through the {@link #state()} method. The assigner tracks all
+ * pending splits regardless of their assignment state, allowing for complete state recovery.
+ *
+ *
Compared to {@link DefaultSplitAssigner}, this implementation provides better data locality
+ * for partitioned tables at the cost of slightly more complex initialization logic.
+ *
+ * @see DefaultSplitAssigner for simple round-robin assignment
+ * @see org.apache.iceberg.Partitioning for partition grouping key computation
+ */
+public class PartitionAwareSplitAssigner implements SplitAssigner {
+ /**
+ * Temporary task ID used to store splits before the number of registered tasks is known. This
+ * enables lazy initialization where splits can be added before the job is fully initialized, and
+ * then properly distributed using consistent hashing once the number of tasks is known.
+ *
+ *
addSplits() is called before getNext(), which is when the initialization happens. At this
+ * stage we know the number of registeredTasks, but not before
+ */
+ private static final int UNINITIALIZED_TASK_ID = -1;
+
+ private final Map> pendingSplitsByTaskId;
+ private Optional> availableFuture;
+
+ private final Map groupingKeyProjectionsBySpec;
+ // TODO -- no utilClass found for write-once, we can later on determine more robust way for
+ // write-once
+ private int registeredTasks;
+ private Optional groupingKeyType;
+
+ // TODO -- fail PartitionAwareSplitAssigner if execution mode is streaming
+ public PartitionAwareSplitAssigner() {
+ this.pendingSplitsByTaskId = Maps.newHashMap();
+ this.registeredTasks = -1;
+ this.groupingKeyProjectionsBySpec = Maps.newHashMap();
+ this.availableFuture = Optional.empty();
+ this.groupingKeyType = Optional.empty();
+ }
+
+ public PartitionAwareSplitAssigner(Collection assignerState) {
+ this();
+ List restoredSplits = Lists.newArrayList();
+ assignerState.stream().map(IcebergSourceSplitState::split).forEach(restoredSplits::add);
+ this.addSplits(restoredSplits);
+ }
+
+ @Override
+ public synchronized GetSplitResult getNext(
+ @Nullable String hostname, int subtaskId, int numRegisteredTasks) {
+ if (hasSplitsAvailable()) {
+ // Initialize if we have uninitialized splits and now know the registered tasks
+ if (isUninitialized()) {
+ initialize(numRegisteredTasks);
+ }
+ // After initialization, registeredTasks must remain consistent
+ Preconditions.checkArgument(
+ numRegisteredTasks == this.registeredTasks,
+ "registeredTasks must remain consistent after initialization. Expected: %s, but got: %s",
+ this.registeredTasks,
+ numRegisteredTasks);
+ Preconditions.checkState(
+ !pendingSplitsByTaskId.containsKey(UNINITIALIZED_TASK_ID),
+ "After initialization, unregistered_task_id must be removed from pending splits mapping");
+ Queue taskSplits = pendingSplitsByTaskId.get(subtaskId);
+ if (taskSplits != null && !taskSplits.isEmpty()) {
+ IcebergSourceSplit split = taskSplits.poll();
+ return GetSplitResult.forSplit(split);
+ }
+ }
+ return GetSplitResult.unavailable();
+ }
+
+ @Override
+ public synchronized void onDiscoveredSplits(Collection splits) {
+ addSplits(splits);
+ }
+
+ @Override
+ public synchronized void onUnassignedSplits(Collection splits) {
+ addSplits(splits);
+ }
+
+ @Override
+ public synchronized Collection state() {
+ return pendingSplitsByTaskId.values().stream()
+ .flatMap(Queue::stream)
+ .map(split -> new IcebergSourceSplitState(split, IcebergSourceSplitStatus.UNASSIGNED))
+ .collect(Collectors.toList());
+ }
+
+ @Override
+ public synchronized CompletableFuture isAvailable() {
+ if (!availableFuture.isPresent()) {
+ availableFuture = Optional.of(new CompletableFuture<>());
+ }
+ return availableFuture.get();
+ }
+
+ @Override
+ public synchronized int pendingSplitCount() {
+ return pendingSplitsByTaskId.values().stream().mapToInt(Queue::size).sum();
+ }
+
+ @Override
+ public synchronized long pendingRecords() {
+ return pendingSplitsByTaskId.values().stream()
+ .flatMap(Queue::stream)
+ .mapToLong(split -> split.task().estimatedRowsCount())
+ .sum();
+ }
+
+ private synchronized void addSplits(Collection splits) {
+ if (!splits.isEmpty()) {
+ // registeredTasks are defined upon initialization
+ if (registeredTasks > 0) {
+ distributeNewSplitsToTasks(splits);
+ } else {
+ pendingSplitsByTaskId
+ .computeIfAbsent(UNINITIALIZED_TASK_ID, k -> new ArrayDeque<>())
+ .addAll(splits);
+ }
+ completeAvailableFuturesIfNeeded();
+ }
+ }
+
+ private synchronized void completeAvailableFuturesIfNeeded() {
+ if (availableFuture.isPresent() && hasSplitsAvailable()) {
+ availableFuture.get().complete(null);
+ }
+ availableFuture = Optional.empty();
+ }
+
+ private synchronized void initialize(int numRegisteredTasks) {
+ // Validate all state is in unset/initial state before initialization
+ Preconditions.checkState(
+ this.registeredTasks == -1,
+ "registeredTasks must be unset, but was: %s",
+ this.registeredTasks);
+ Preconditions.checkState(
+ !this.groupingKeyType.isPresent(),
+ "groupingKeyType must be unset, but was: %s",
+ this.groupingKeyType);
+ // Validate that only uninitialized splits exist (no task-specific queues)
+ long taskSpecificSplitCount =
+ pendingSplitsByTaskId.entrySet().stream()
+ .filter(entry -> entry.getKey() != UNINITIALIZED_TASK_ID)
+ .mapToLong(entry -> entry.getValue().size())
+ .sum();
+ Preconditions.checkState(
+ taskSpecificSplitCount == 0,
+ "Expected no task-specific splits before initialization, but found %s splits across task-specific queues",
+ taskSpecificSplitCount);
+ // initialize() should only be called when uninitialized splits are present
+ Queue uninitializedSplits =
+ pendingSplitsByTaskId.get(UNINITIALIZED_TASK_ID);
+ Preconditions.checkState(
+ uninitializedSplits != null && !uninitializedSplits.isEmpty(),
+ "Expected uninitialized splits to be present when initialize() is called, but uninitializedSplits was: %s",
+ uninitializedSplits);
+
+ // TODO -- in future if need to deal with Schema evolution of partitioning, should revisit this
+ // function call
+ Types.StructType computedGroupingKeyType =
+ Partitioning.groupingKeyType(null, getPartitionSpecs(uninitializedSplits));
+ // Compute the grouping key type using Iceberg's Partitioning utility
+ // This automatically handles all transform types (Month, bucket, Identity, etc.)
+ this.registeredTasks = numRegisteredTasks;
+ // Store the grouping key type for consistent extraction across all splits
+ this.groupingKeyType = Optional.of(computedGroupingKeyType);
+ // Distribute uninitialized splits to actual task IDs
+ distributeNewSplitsToTasks(uninitializedSplits);
+ // Remove the uninitialized queue
+ pendingSplitsByTaskId.remove(UNINITIALIZED_TASK_ID);
+ }
+
+ /**
+ * Distributes splits to tasks using a two-level partition-aware approach that ensures both
+ * partition locality and balanced task utilization.
+ *
+ * This method follows Spark's proven pattern by first grouping splits by their logical
+ * partition, then distributing partition groups deterministically across available tasks using
+ * round-robin assignment.
+ *
+ *
Algorithm Overview:
+ *
+ *
+ * Step 1: Group splits by partition key
+ * Splits: [s1, s2, s3, s4, s5, s6]
+ * ↓ (group by partition)
+ * Partition A: [s1, s2]
+ * Partition B: [s3, s4]
+ * Partition C: [s5, s6]
+ *
+ * Step 2: Sort partitions deterministically
+ * [Partition A, Partition B, Partition C]
+ * ↓ (sort by string representation)
+ * [Partition A, Partition B, Partition C]
+ *
+ * Step 3: Distribute partitions round-robin to tasks
+ * Task 0: Partition A → [s1, s2]
+ * Task 1: Partition B → [s3, s4]
+ * Task 2: Partition C → [s5, s6]
+ * Task 0: (next partition would go here)
+ *
+ *
+ * Benefits:
+ *
+ *
+ * - Partition Locality: All splits from the same logical partition are assigned to the
+ * same task, enabling storage-level optimizations and efficient joins.
+ *
- Balanced Distribution: Round-robin assignment ensures even task utilization across
+ * all available tasks.
+ *
- Deterministic Assignment: Sorting partition keys ensures consistent assignment
+ * across tables, enabling storage partition joins.
+ *
- No Hash Collisions: Avoids hash-based assignment issues that could cause uneven
+ * distribution or co-location problems.
+ *
+ *
+ * @param splits the splits to distribute across tasks
+ */
+ private synchronized void distributeNewSplitsToTasks(Collection splits) {
+ Preconditions.checkState(
+ registeredTasks > 0, "registeredTasks must be positive, but was: %s", registeredTasks);
+ Map> splitsByPartition = Maps.newHashMap();
+ for (IcebergSourceSplit split : splits) {
+ StructLike groupingKey = extractGroupingKey(split);
+ splitsByPartition.computeIfAbsent(groupingKey, k -> Lists.newArrayList()).add(split);
+ }
+
+ // Sort partition keys deterministically for cross-table consistency
+ List sortedPartitionKeys =
+ Lists.newArrayList(splitsByPartition.keySet().iterator());
+ sortedPartitionKeys.sort(Comparator.comparing(Object::toString));
+
+ int assignedTaskId = -1;
+ for (int i = 0; i < sortedPartitionKeys.size(); i++) {
+ StructLike partitionKey = sortedPartitionKeys.get(i);
+ List partitionSplits = splitsByPartition.get(partitionKey);
+ assignedTaskId = i % registeredTasks;
+ pendingSplitsByTaskId
+ .computeIfAbsent(assignedTaskId, k -> new ArrayDeque<>())
+ .addAll(partitionSplits);
+ }
+ }
+
+ /**
+ * Extracts the shared partition key from a split's file tasks.
+ *
+ * Since each {@code IcebergSourceSplit} comes from {@code TableScanUtil#planTaskGroups()}, all
+ * file tasks within the split already share the same partition values. This method extracts that
+ * common partition key for task assignment.
+ *
+ *
For example, if a split contains files from partition {@code (dt=2024-01-01, bucket=5)},
+ * this method returns a {@code StructLike} representing that partition key, which will be used to
+ * assign the entire split to a specific task.
+ *
+ * @param split the split to extract partition key from (all files have same partition)
+ * @return the shared partition key for all files in this split
+ */
+ private synchronized StructLike extractGroupingKey(IcebergSourceSplit split) {
+ // motivation from `TableScanUtil#lanTaskGroups`
+ // Use the consistent grouping key type computed during initialization
+ Preconditions.checkState(
+ groupingKeyType.isPresent(),
+ "groupingKeyType must be initialized before extracting grouping keys");
+
+ FileScanTask firstTask = split.task().files().iterator().next();
+ PartitionSpec spec = firstTask.spec();
+ StructLike partition = firstTask.partition();
+ Types.StructType currentGroupingKeyType = this.groupingKeyType.get();
+ StructProjection projection =
+ groupingKeyProjectionsBySpec.computeIfAbsent(
+ spec.specId(),
+ specId -> StructProjection.create(spec.partitionType(), currentGroupingKeyType));
+ PartitionData groupingKeyTemplate = new PartitionData(currentGroupingKeyType);
+ return groupingKeyTemplate.copyFor(projection.wrap(partition));
+ }
+
+ /**
+ * Extracts all unique partition specifications from the given splits.
+ *
+ *
Returns the set of {@code PartitionSpec} objects used by the file tasks within the splits. A
+ * {@code PartitionSpec} defines how a table is partitioned (e.g., by date, by bucket, etc.). This
+ * is used to compute the grouping key type that works across different partition specifications.
+ *
+ *
For example, may return specs that produce grouping keys like: {@code struct<1000: dt:
+ * optional string, 1001: user_id_bucket: optional int>}
+ *
+ * @param splits the splits to extract partition specs from
+ * @return set of unique partition specifications
+ */
+ private synchronized Set getPartitionSpecs(Collection splits) {
+ return splits.stream()
+ .flatMap(split -> split.task().files().stream())
+ .map(FileScanTask::spec)
+ .collect(Collectors.toSet());
+ }
+
+ private synchronized boolean hasSplitsAvailable() {
+ return pendingSplitsByTaskId.values().stream().anyMatch(queue -> !queue.isEmpty());
+ }
+
+ private synchronized boolean isUninitialized() {
+ return this.registeredTasks == -1;
+ }
+}
diff --git a/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/source/assigner/PartitionAwareSplitAssignerFactory.java b/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/source/assigner/PartitionAwareSplitAssignerFactory.java
new file mode 100644
index 000000000000..48ade9365aeb
--- /dev/null
+++ b/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/source/assigner/PartitionAwareSplitAssignerFactory.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.source.assigner;
+
+import java.util.Collection;
+import org.apache.iceberg.flink.source.split.IcebergSourceSplitState;
+
+public class PartitionAwareSplitAssignerFactory implements SplitAssignerFactory {
+ public PartitionAwareSplitAssignerFactory() {}
+
+ /**
+ * Creates a new partition-aware split assigner with no initial state.
+ *
+ * @return a new {@link PartitionAwareSplitAssigner} instance
+ */
+ @Override
+ public SplitAssigner createAssigner() {
+ return new PartitionAwareSplitAssigner();
+ }
+
+ /**
+ * Creates a new partition-aware split assigner initialized with the given state.
+ *
+ * This method is typically used for recovery scenarios where the assigner needs to be restored
+ * from a checkpoint with existing split assignments.
+ *
+ * @param assignerState the collection of split states to initialize the assigner with
+ * @return a new {@link PartitionAwareSplitAssigner} instance initialized with the given state
+ */
+ @Override
+ public SplitAssigner createAssigner(Collection assignerState) {
+ return new PartitionAwareSplitAssigner(assignerState);
+ }
+}
diff --git a/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/source/assigner/SplitAssigner.java b/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/source/assigner/SplitAssigner.java
index dae7c8cca70c..6d19bda098d7 100644
--- a/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/source/assigner/SplitAssigner.java
+++ b/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/source/assigner/SplitAssigner.java
@@ -64,8 +64,13 @@ default void close() {}
*
* If enumerator wasn't able to assign the split (e.g., reader disconnected), enumerator should
* call {@link SplitAssigner#onUnassignedSplits} to return the split.
+ *
+ * @param hostname the hostname of the requesting reader
+ * @param subtaskId the subtask ID of the requesting reader
+ * @param numRegisteredTasks the total number of registered tasks
+ * @return the split assignment result
*/
- GetSplitResult getNext(@Nullable String hostname);
+ GetSplitResult getNext(@Nullable String hostname, int subtaskId, int numRegisteredTasks);
/** Add new splits discovered by enumerator */
void onDiscoveredSplits(Collection splits);
diff --git a/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/AbstractIcebergEnumerator.java b/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/AbstractIcebergEnumerator.java
index 6c9a855bc149..994136a64fa7 100644
--- a/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/AbstractIcebergEnumerator.java
+++ b/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/AbstractIcebergEnumerator.java
@@ -122,7 +122,8 @@ private void assignSplits() {
int awaitingSubtask = nextAwaiting.getKey();
String hostname = nextAwaiting.getValue();
- GetSplitResult getResult = assigner.getNext(hostname);
+ GetSplitResult getResult =
+ assigner.getNext(hostname, awaitingSubtask, enumeratorContext.currentParallelism());
if (getResult.status() == GetSplitResult.Status.AVAILABLE) {
LOG.info("Assign split to subtask {}: {}", awaitingSubtask, getResult.split());
enumeratorContext.assignSplit(getResult.split(), awaitingSubtask);
diff --git a/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/source/reader/RowDataConverter.java b/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/source/reader/RowDataConverter.java
new file mode 100644
index 000000000000..98bb7e981840
--- /dev/null
+++ b/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/source/reader/RowDataConverter.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.source.reader;
+
+import java.io.Serializable;
+import java.util.function.Function;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.table.data.RowData;
+
+/**
+ * Convert RowData to a different output type.
+ *
+ * @param output type
+ */
+public interface RowDataConverter
+ extends Function, ResultTypeQueryable, Serializable {}
diff --git a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/TestStoragePartitionedJoin.java b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/TestStoragePartitionedJoin.java
new file mode 100644
index 000000000000..3c5992a1612b
--- /dev/null
+++ b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/TestStoragePartitionedJoin.java
@@ -0,0 +1,604 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink;
+
+import static org.apache.iceberg.types.Types.NestedField.optional;
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.math.BigDecimal;
+import java.nio.file.Path;
+import java.time.LocalDateTime;
+import java.time.ZoneOffset;
+import java.util.List;
+import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.TableResult;
+import org.apache.flink.table.connector.source.partitioning.KeyGroupedPartitioning;
+import org.apache.flink.table.connector.source.partitioning.Partitioning;
+import org.apache.flink.table.expressions.TransformExpression;
+import org.apache.flink.test.junit5.MiniClusterExtension;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.CloseableIterator;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.data.GenericAppenderHelper;
+import org.apache.iceberg.data.GenericRecord;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.flink.source.IcebergTableSource;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.transforms.Transform;
+import org.apache.iceberg.transforms.Transforms;
+import org.apache.iceberg.types.Types;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+import org.junit.jupiter.api.io.TempDir;
+
+/**
+ * Test for Flink storage partitioned joins using tables with mixed partition transforms.
+ *
+ * This test validates that tables partitioned with both identity and bucket transforms can be
+ * joined efficiently by leveraging Flink's partition-aware capabilities.
+ */
+// TODO: add to end integration testing after importing flink libraries after table planner change
+public class TestStoragePartitionedJoin {
+
+ @RegisterExtension
+ protected static MiniClusterExtension miniClusterResource =
+ MiniFlinkClusterExtension.createWithClassloaderCheckDisabled();
+
+ @TempDir protected Path temporaryDirectory;
+
+ @RegisterExtension
+ protected static final HadoopCatalogExtension catalogExtension =
+ new HadoopCatalogExtension("testdb", "test_table");
+
+ private static final String TABLE_1_NAME = "partitioned_table_1";
+ private static final String TABLE_2_NAME = "partitioned_table_2";
+ private static final String TABLE_3_NAME = "partitioned_table_3";
+ private static final String TABLE_4_NAME = "partitioned_table_4_month";
+ private static final String TABLE_5_NAME = "partitioned_table_5_month";
+ private static final String TABLE_6_NAME = "partitioned_table_6_day";
+ private static final int BUCKET_COUNT = 128;
+
+ private static final Schema TABLE_1_SCHEMA =
+ new Schema(
+ optional(1, "id", Types.LongType.get()),
+ optional(2, "dt", Types.StringType.get()),
+ optional(3, "name", Types.StringType.get()),
+ optional(4, "salary", Types.IntegerType.get()));
+
+ private static final Schema TABLE_2_SCHEMA =
+ new Schema(
+ optional(1, "id", Types.LongType.get()),
+ optional(2, "dt", Types.StringType.get()),
+ optional(3, "company", Types.StringType.get()),
+ optional(4, "title", Types.StringType.get()));
+
+ private static final Schema TABLE_3_SCHEMA =
+ new Schema(
+ optional(1, "id", Types.LongType.get()),
+ optional(2, "dt", Types.StringType.get()),
+ optional(3, "data", Types.StringType.get()),
+ optional(4, "value", Types.DoubleType.get()));
+
+ // Schema for tables with timestamp partitioning (month/day)
+ private static final Schema TABLE_4_SCHEMA =
+ new Schema(
+ optional(1, "id", Types.LongType.get()),
+ optional(2, "event_time", Types.TimestampType.withoutZone()),
+ optional(3, "event_type", Types.StringType.get()),
+ optional(4, "count", Types.IntegerType.get()));
+
+ private static final Schema TABLE_5_SCHEMA =
+ new Schema(
+ optional(1, "id", Types.LongType.get()),
+ optional(2, "event_time", Types.TimestampType.withoutZone()),
+ optional(3, "category", Types.StringType.get()),
+ optional(4, "amount", Types.DecimalType.of(10, 2)));
+
+ private static final Schema TABLE_6_SCHEMA =
+ new Schema(
+ optional(1, "id", Types.LongType.get()),
+ optional(2, "event_time", Types.TimestampType.withoutZone()),
+ optional(3, "status", Types.StringType.get()),
+ optional(4, "metric", Types.DoubleType.get()));
+
+ // Partition spec for both tables: partitioned by dt (identity) and bucket(id, 128)
+ private static final PartitionSpec TABLE_1_PARTITION_SPEC =
+ PartitionSpec.builderFor(TABLE_1_SCHEMA).identity("dt").bucket("id", BUCKET_COUNT).build();
+
+ private static final PartitionSpec TABLE_2_PARTITION_SPEC =
+ PartitionSpec.builderFor(TABLE_2_SCHEMA).identity("dt").bucket("id", BUCKET_COUNT).build();
+
+ private static final PartitionSpec TABLE_3_PARTITION_SPEC =
+ PartitionSpec.builderFor(TABLE_3_SCHEMA)
+ .bucket("id", BUCKET_COUNT) // bucket first
+ .identity("dt") // dt second
+ .build();
+
+ private static final PartitionSpec TABLE_4_PARTITION_SPEC =
+ PartitionSpec.builderFor(TABLE_4_SCHEMA).month("event_time").build();
+
+ private static final PartitionSpec TABLE_5_PARTITION_SPEC =
+ PartitionSpec.builderFor(TABLE_5_SCHEMA).month("event_time").build();
+
+ private static final PartitionSpec TABLE_6_PARTITION_SPEC =
+ PartitionSpec.builderFor(TABLE_6_SCHEMA).day("event_time").build();
+
+ private Table table1;
+ private Table table2;
+ private Table table3;
+ private Table table4;
+ private Table table5;
+ private Table table6;
+ private TableEnvironment tableEnv;
+
+ private static final Transform BUCKET_TRANSFORM =
+ Transforms.bucket(Types.LongType.get(), BUCKET_COUNT);
+
+ private static final Transform MONTH_TRANSFORM =
+ Transforms.month(Types.TimestampType.withoutZone());
+
+ private static final Transform DAY_TRANSFORM =
+ Transforms.day(Types.TimestampType.withoutZone());
+
+ @BeforeEach
+ public void setupTables() throws Exception {
+ // Create table 1 with schema (id, dt, name, salary)
+ table1 =
+ catalogExtension
+ .catalog()
+ .createTable(
+ TableIdentifier.of("testdb", TABLE_1_NAME), TABLE_1_SCHEMA, TABLE_1_PARTITION_SPEC);
+
+ // Create table 2 with schema (id, dt, company, title)
+ table2 =
+ catalogExtension
+ .catalog()
+ .createTable(
+ TableIdentifier.of("testdb", TABLE_2_NAME), TABLE_2_SCHEMA, TABLE_2_PARTITION_SPEC);
+
+ // Create table 3 with schema (id, dt, data, value)
+ table3 =
+ catalogExtension
+ .catalog()
+ .createTable(
+ TableIdentifier.of("testdb", TABLE_3_NAME), TABLE_3_SCHEMA, TABLE_3_PARTITION_SPEC);
+
+ // Create table 4 with schema (id, event_time, event_type, count)
+ table4 =
+ catalogExtension
+ .catalog()
+ .createTable(
+ TableIdentifier.of("testdb", TABLE_4_NAME), TABLE_4_SCHEMA, TABLE_4_PARTITION_SPEC);
+
+ // Create table 5 with schema (id, event_time, category, amount)
+ table5 =
+ catalogExtension
+ .catalog()
+ .createTable(
+ TableIdentifier.of("testdb", TABLE_5_NAME), TABLE_5_SCHEMA, TABLE_5_PARTITION_SPEC);
+
+ // Create table 6 with schema (id, event_time, status, metric)
+ table6 =
+ catalogExtension
+ .catalog()
+ .createTable(
+ TableIdentifier.of("testdb", TABLE_6_NAME), TABLE_6_SCHEMA, TABLE_6_PARTITION_SPEC);
+
+ setupTableEnvironment();
+ writeTestDataToTables();
+ }
+
+ @AfterEach
+ public void cleanupTables() {
+ if (table1 != null) {
+ catalogExtension.catalog().dropTable(TableIdentifier.of("testdb", TABLE_1_NAME));
+ }
+ if (table2 != null) {
+ catalogExtension.catalog().dropTable(TableIdentifier.of("testdb", TABLE_2_NAME));
+ }
+ if (table3 != null) {
+ catalogExtension.catalog().dropTable(TableIdentifier.of("testdb", TABLE_3_NAME));
+ }
+ if (table4 != null) {
+ catalogExtension.catalog().dropTable(TableIdentifier.of("testdb", TABLE_4_NAME));
+ }
+ if (table5 != null) {
+ catalogExtension.catalog().dropTable(TableIdentifier.of("testdb", TABLE_5_NAME));
+ }
+ if (table6 != null) {
+ catalogExtension.catalog().dropTable(TableIdentifier.of("testdb", TABLE_6_NAME));
+ }
+ }
+
+ @Test
+ public void testSimplePartitionedTables() throws Exception {
+ testPartitionCompatibility(
+ table1, TABLE_1_SCHEMA, "table1", table2, TABLE_2_SCHEMA, "table2", true);
+ }
+
+ @Test
+ public void testSimpleIncompatiblePartitionedTables() throws Exception {
+ testPartitionCompatibility(
+ table1, TABLE_1_SCHEMA, "table1", table3, TABLE_3_SCHEMA, "table3", false);
+ }
+
+ @Test
+ public void testMonthPartitionedTables() throws Exception {
+ testPartitionCompatibility(
+ table4, TABLE_4_SCHEMA, "table4", table5, TABLE_5_SCHEMA, "table5", true);
+ }
+
+ @Test
+ public void testMonthVsDayPartitionedTables() throws Exception {
+ testPartitionCompatibility(
+ table4, TABLE_4_SCHEMA, "table4", table6, TABLE_6_SCHEMA, "table6", false);
+ }
+
+ /**
+ * Generic helper method to test partition compatibility between two tables.
+ *
+ * @param t1 First table to test
+ * @param schema1 Schema of first table
+ * @param table1Name Name of first table for error messages
+ * @param t2 Second table to test
+ * @param schema2 Schema of second table
+ * @param table2Name Name of second table for error messages
+ * @param expectedCompatible Whether tables should be compatible (true) or incompatible (false)
+ */
+ private void testPartitionCompatibility(
+ Table t1,
+ Schema schema1,
+ String table1Name,
+ Table t2,
+ Schema schema2,
+ String table2Name,
+ boolean expectedCompatible)
+ throws Exception {
+ // Extract output partitioning for both tables
+ Partitioning table1Partitioning = extractOutputPartitioning(t1, schema1, table1Name);
+ Partitioning table2Partitioning = extractOutputPartitioning(t2, schema2, table2Name);
+
+ // Verify both tables use KeyGroupedPartitioning
+ assertThat(table1Partitioning)
+ .withFailMessage("%s is not partitioned by KeyGroupedPartitioning", table1Name)
+ .isInstanceOf(KeyGroupedPartitioning.class);
+ assertThat(table2Partitioning)
+ .withFailMessage("%s is not partitioned by KeyGroupedPartitioning", table2Name)
+ .isInstanceOf(KeyGroupedPartitioning.class);
+
+ // Test compatibility
+ KeyGroupedPartitioning table1KGP = (KeyGroupedPartitioning) table1Partitioning;
+ KeyGroupedPartitioning table2KGP = (KeyGroupedPartitioning) table2Partitioning;
+
+ String compatibilityMessage =
+ expectedCompatible
+ ? String.format(
+ "%s and %s should have compatible partitioning for storage partitioned joins",
+ table1Name, table2Name)
+ : String.format(
+ "%s and %s should have incompatible partitioning for storage partitioned joins",
+ table1Name, table2Name);
+
+ assertThat(table1KGP.isCompatible(table2KGP))
+ .withFailMessage(compatibilityMessage)
+ .isEqualTo(expectedCompatible);
+
+ // Test symmetry - compatibility should be symmetric
+ assertThat(table2KGP.isCompatible(table1KGP))
+ .withFailMessage(
+ "Compatibility should be symmetric between %s and %s", table2Name, table1Name)
+ .isEqualTo(expectedCompatible);
+ }
+
+ private void setupTableEnvironment() {
+ // Ensure batch execution mode
+ EnvironmentSettings settings = EnvironmentSettings.newInstance().inBatchMode().build();
+ tableEnv = TableEnvironment.create(settings);
+
+ // Configure storage partition join optimization (can be toggled for testing)
+ setStoragePartitionJoinEnabled(true);
+
+ tableEnv.executeSql(
+ "CREATE CATALOG iceberg_catalog WITH ("
+ + "'type'='iceberg',"
+ + "'catalog-type'='hadoop',"
+ + "'warehouse'='"
+ + catalogExtension.warehouse()
+ + "'"
+ + ")");
+
+ tableEnv.executeSql("USE CATALOG iceberg_catalog");
+ tableEnv.executeSql("USE testdb");
+ }
+
+ protected void setStoragePartitionJoinEnabled(boolean enabled) {
+ tableEnv
+ .getConfig()
+ .getConfiguration()
+ .setString("table.optimizer.storage-partition-join-enabled", String.valueOf(enabled));
+ }
+
+ protected List sql(String query, Object... args) {
+ TableResult tableResult = exec(query, args);
+ try (CloseableIterator iter = tableResult.collect()) {
+ return Lists.newArrayList(iter);
+ } catch (Exception e) {
+ throw new RuntimeException("Failed to collect table result", e);
+ }
+ }
+
+ // TODO -- maybe combine the above
+ protected TableResult exec(String query, Object... args) {
+ return tableEnv.executeSql(String.format(query, args));
+ }
+
+ private List createTable1TestData() {
+ List records = Lists.newArrayList();
+ String[] dates = {"2025-01-01", "2025-05-02", "2025-06-03"};
+ String[] names = {"Alice", "Bob", "Charlie", "Diana", "Eve"};
+
+ for (int i = 0; i < 50; i++) {
+ Record record = GenericRecord.create(TABLE_1_SCHEMA);
+ record.setField("id", (long) i);
+ record.setField("dt", dates[i % dates.length]);
+ record.setField("name", names[i % names.length]);
+ record.setField("salary", 50000 + (i % 20) * 2500);
+ records.add(record);
+ }
+
+ return records;
+ }
+
+ private List createTable2TestData() {
+ List records = Lists.newArrayList();
+ String[] dates = {"2025-01-01", "2025-05-02", "2025-06-03"};
+ String[] companies = {"TechCorp", "DataSys", "CloudInc", "AILabs", "DevCo"};
+ String[] titles = {"Engineer", "Manager", "Analyst", "Director", "Scientist"};
+
+ for (int i = 0; i < 50; i++) {
+ Record record = GenericRecord.create(TABLE_2_SCHEMA);
+ record.setField("id", (long) i);
+ record.setField("dt", dates[i % dates.length]);
+ record.setField("company", companies[i % companies.length]);
+ record.setField("title", titles[i % titles.length]);
+ records.add(record);
+ }
+
+ return records;
+ }
+
+ private List createTable3TestData() {
+ List records = Lists.newArrayList();
+ String[] dates = {"2025-01-01", "2025-05-02", "2025-06-03"};
+ String[] data = {"data1", "data2", "data3", "data4", "data5"};
+ double[] values = {10.0, 20.0, 30.0, 40.0, 50.0};
+
+ for (int i = 0; i < 50; i++) {
+ Record record = GenericRecord.create(TABLE_3_SCHEMA);
+ record.setField("id", (long) i);
+ record.setField("dt", dates[i % dates.length]);
+ record.setField("data", data[i % data.length]);
+ record.setField("value", values[i % values.length]);
+ records.add(record);
+ }
+
+ return records;
+ }
+
+ private List createTable4TestData() {
+ List records = Lists.newArrayList();
+ LocalDateTime[] eventTimes = {
+ LocalDateTime.of(2022, 2, 1, 10, 30, 0),
+ LocalDateTime.of(2022, 3, 1, 10, 30, 0),
+ LocalDateTime.of(2022, 4, 1, 10, 30, 0)
+ };
+ String[] eventTypes = {"click", "view", "purchase"};
+ int[] counts = {10, 20, 30};
+
+ for (int i = 0; i < 50; i++) {
+ Record record = GenericRecord.create(TABLE_4_SCHEMA);
+ record.setField("id", (long) i);
+ record.setField("event_time", eventTimes[i % eventTimes.length]);
+ record.setField("event_type", eventTypes[i % eventTypes.length]);
+ record.setField("count", counts[i % counts.length]);
+ records.add(record);
+ }
+
+ return records;
+ }
+
+ private List createTable5TestData() {
+ List records = Lists.newArrayList();
+ LocalDateTime[] eventTimes = {
+ LocalDateTime.of(2022, 2, 1, 10, 30, 0),
+ LocalDateTime.of(2022, 3, 1, 10, 30, 0),
+ LocalDateTime.of(2022, 4, 1, 10, 30, 0)
+ };
+ String[] categories = {"A", "B", "C"};
+ BigDecimal[] amounts = {
+ new BigDecimal("10.99"), new BigDecimal("20.99"), new BigDecimal("30.99")
+ };
+
+ for (int i = 0; i < 50; i++) {
+ Record record = GenericRecord.create(TABLE_5_SCHEMA);
+ record.setField("id", (long) i);
+ record.setField("event_time", eventTimes[i % eventTimes.length]);
+ record.setField("category", categories[i % categories.length]);
+ record.setField("amount", amounts[i % amounts.length]);
+ records.add(record);
+ }
+
+ return records;
+ }
+
+ private List createTable6TestData() {
+ List records = Lists.newArrayList();
+ LocalDateTime[] eventTimes = {
+ LocalDateTime.of(2022, 2, 1, 10, 30, 0),
+ LocalDateTime.of(2022, 3, 15, 10, 30, 0),
+ LocalDateTime.of(2022, 4, 20, 10, 30, 0)
+ };
+ String[] statuses = {"success", "failure", "pending"};
+ double[] metrics = {0.5, 0.7, 0.9};
+
+ for (int i = 0; i < 50; i++) {
+ Record record = GenericRecord.create(TABLE_6_SCHEMA);
+ record.setField("id", (long) i);
+ record.setField("event_time", eventTimes[i % eventTimes.length]);
+ record.setField("status", statuses[i % statuses.length]);
+ record.setField("metric", metrics[i % metrics.length]);
+ records.add(record);
+ }
+
+ return records;
+ }
+
+ private Partitioning extractOutputPartitioning(Table table, Schema schema, String tableName)
+ throws Exception {
+ // Create TableLoader and IcebergTableSource
+ TableLoader tableLoader = TableLoader.fromHadoopTable(table.location());
+ IcebergTableSource tableSource =
+ new IcebergTableSource(
+ tableLoader,
+ FlinkSchemaUtil.toSchema(schema),
+ java.util.Collections.emptyMap(),
+ new org.apache.flink.configuration.Configuration());
+ // Get output partitioning
+ Partitioning outputPartitioning = tableSource.outputPartitioning();
+ if (!(outputPartitioning instanceof KeyGroupedPartitioning)) {
+ // TODO -- adapt later
+ return null;
+ }
+ assertThat(outputPartitioning)
+ .withFailMessage("OutputPartitioning for %s should be KeyGroupedPartitioning", tableName)
+ .isInstanceOf(KeyGroupedPartitioning.class);
+ KeyGroupedPartitioning keyGroupedPartitioning = (KeyGroupedPartitioning) outputPartitioning;
+
+ // Extract partitioning info
+ int numPartitions = keyGroupedPartitioning.getPartitionValues().length;
+ TransformExpression[] keys = keyGroupedPartitioning.keys();
+ org.apache.flink.types.Row[] partitionValues = keyGroupedPartitioning.getPartitionValues();
+
+ // Validate basic structure
+ assertThat(numPartitions)
+ .withFailMessage("NumPartitions for %s should be > 0", tableName)
+ .isGreaterThan(0);
+ assertThat(keys).withFailMessage("Keys for %s should not be empty", tableName).isNotEmpty();
+ assertThat(partitionValues)
+ .withFailMessage("PartitionValues for %s should not be empty", tableName)
+ .isNotEmpty();
+ return outputPartitioning;
+ }
+
+ // TODO -- we want to make this generic for any table types as we expand these tests. For now,
+ // we are do this as the rest is out of scope for the PoC
+ private void writeTestDataToTables() throws Exception {
+ // Create test data for both tables
+ List table1Records = createTable1TestData();
+ List table2Records = createTable2TestData();
+ List table3Records = createTable3TestData();
+ List table4Records = createTable4TestData();
+ List table5Records = createTable5TestData();
+ List table6Records = createTable6TestData();
+
+ // Write data using GenericAppenderHelper with proper partition values
+ GenericAppenderHelper table1Appender =
+ new GenericAppenderHelper(table1, FileFormat.PARQUET, temporaryDirectory);
+ GenericAppenderHelper table2Appender =
+ new GenericAppenderHelper(table2, FileFormat.PARQUET, temporaryDirectory);
+ GenericAppenderHelper table3Appender =
+ new GenericAppenderHelper(table3, FileFormat.PARQUET, temporaryDirectory);
+ GenericAppenderHelper table4Appender =
+ new GenericAppenderHelper(table4, FileFormat.PARQUET, temporaryDirectory);
+ GenericAppenderHelper table5Appender =
+ new GenericAppenderHelper(table5, FileFormat.PARQUET, temporaryDirectory);
+ GenericAppenderHelper table6Appender =
+ new GenericAppenderHelper(table6, FileFormat.PARQUET, temporaryDirectory);
+
+ // Group records by partition and write them
+ for (Record record : table1Records) {
+ String dt = (String) record.getField("dt");
+ Long id = (Long) record.getField("id");
+ int bucket = BUCKET_TRANSFORM.apply(id);
+
+ // Create partition row: [dt, bucket_value]
+ org.apache.iceberg.TestHelpers.Row partitionRow =
+ org.apache.iceberg.TestHelpers.Row.of(dt, bucket);
+ table1Appender.appendToTable(partitionRow, Lists.newArrayList(record));
+ }
+
+ for (Record record : table2Records) {
+ String dt = (String) record.getField("dt");
+ Long id = (Long) record.getField("id");
+ int bucket = BUCKET_TRANSFORM.apply(id);
+
+ // Create partition row: [dt, bucket_value]
+ org.apache.iceberg.TestHelpers.Row partitionRow =
+ org.apache.iceberg.TestHelpers.Row.of(dt, bucket);
+ table2Appender.appendToTable(partitionRow, Lists.newArrayList(record));
+ }
+
+ for (Record record : table3Records) {
+ String dt = (String) record.getField("dt");
+ Long id = (Long) record.getField("id");
+ int bucket = BUCKET_TRANSFORM.apply(id);
+
+ // Create partition row: [bucket_value, dt]
+ org.apache.iceberg.TestHelpers.Row partitionRow =
+ org.apache.iceberg.TestHelpers.Row.of(bucket, dt);
+ table3Appender.appendToTable(partitionRow, Lists.newArrayList(record));
+ }
+
+ for (Record record : table4Records) {
+ LocalDateTime eventTime = (LocalDateTime) record.getField("event_time");
+ int month = MONTH_TRANSFORM.apply(eventTime.toEpochSecond(ZoneOffset.UTC));
+
+ // Create partition row: [month]
+ org.apache.iceberg.TestHelpers.Row partitionRow =
+ org.apache.iceberg.TestHelpers.Row.of(month);
+ table4Appender.appendToTable(partitionRow, Lists.newArrayList(record));
+ }
+
+ for (Record record : table5Records) {
+ LocalDateTime eventTime = (LocalDateTime) record.getField("event_time");
+ int month = MONTH_TRANSFORM.apply(eventTime.toEpochSecond(ZoneOffset.UTC));
+
+ // Create partition row: [month]
+ org.apache.iceberg.TestHelpers.Row partitionRow =
+ org.apache.iceberg.TestHelpers.Row.of(month);
+ table5Appender.appendToTable(partitionRow, Lists.newArrayList(record));
+ }
+
+ for (Record record : table6Records) {
+ LocalDateTime eventTime = (LocalDateTime) record.getField("event_time");
+ int day = DAY_TRANSFORM.apply(eventTime.toEpochSecond(ZoneOffset.UTC));
+
+ // Create partition row: [day]
+ org.apache.iceberg.TestHelpers.Row partitionRow = org.apache.iceberg.TestHelpers.Row.of(day);
+ table6Appender.appendToTable(partitionRow, Lists.newArrayList(record));
+ }
+ }
+}
diff --git a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkSplitPlanner.java b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkSplitPlanner.java
new file mode 100644
index 000000000000..a731bdf0843a
--- /dev/null
+++ b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkSplitPlanner.java
@@ -0,0 +1,248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.source;
+
+import static org.apache.iceberg.types.Types.NestedField.required;
+
+import java.io.File;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.HasTableOperations;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableMetadata;
+import org.apache.iceberg.TableOperations;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.data.GenericAppenderHelper;
+import org.apache.iceberg.data.RandomGenericData;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.flink.TestFixtures;
+import org.apache.iceberg.flink.source.split.IcebergSourceSplit;
+import org.apache.iceberg.hadoop.HadoopCatalog;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+public class TestFlinkSplitPlanner {
+ @ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder();
+
+ private static final FileFormat FILE_FORMAT = FileFormat.PARQUET;
+ private ExecutorService workerPool;
+ private HadoopCatalog catalog;
+ private Table table;
+ private Schema tableSchema;
+
+ @Before
+ public void before() throws Exception {
+ workerPool = Executors.newFixedThreadPool(4);
+
+ // Create a temporary catalog
+ File warehouseFile = TEMPORARY_FOLDER.newFolder();
+ Assert.assertTrue(warehouseFile.delete());
+ String warehouse = "file:" + warehouseFile;
+ Configuration hadoopConf = new Configuration();
+ catalog = new HadoopCatalog(hadoopConf, warehouse);
+
+ // Create a table with identity partitioning on 'data' field
+ this.tableSchema =
+ new Schema(
+ required(1, "id", org.apache.iceberg.types.Types.LongType.get()),
+ required(2, "data", org.apache.iceberg.types.Types.StringType.get()));
+ PartitionSpec spec = PartitionSpec.builderFor(tableSchema).bucket("id", 4).build();
+
+ ImmutableMap properties = ImmutableMap.of(TableProperties.FORMAT_VERSION, "2");
+
+ table = catalog.createTable(TestFixtures.TABLE_IDENTIFIER, tableSchema, spec, null, properties);
+ }
+
+ @After
+ public void after() throws Exception {
+ workerPool.shutdown();
+ catalog.dropTable(TestFixtures.TABLE_IDENTIFIER);
+ catalog.close();
+ }
+
+ @Test
+ public void testPlanPartitionAwareSplitsWithBucketPartitioning() throws Exception {
+ try {
+ // Create records for each bucket (0, 1, 2, 3)
+ // bucket(id, 4) = id % 4, so we use ids: 100, 101, 102, 103
+ GenericAppenderHelper dataAppender =
+ new GenericAppenderHelper(table, FILE_FORMAT, TEMPORARY_FOLDER);
+
+ List bucket0Records = RandomGenericData.generate(tableSchema, 2, 0L);
+ bucket0Records.get(0).setField("id", 100L); // bucket(100, 4) = 0
+ bucket0Records.get(1).setField("id", 104L); // bucket(104, 4) = 0
+
+ List bucket1Records = RandomGenericData.generate(tableSchema, 2, 1L);
+ bucket1Records.get(0).setField("id", 101L); // bucket(101, 4) = 1
+ bucket1Records.get(1).setField("id", 105L); // bucket(105, 4) = 1
+
+ List bucket2Records = RandomGenericData.generate(tableSchema, 2, 2L);
+ bucket2Records.get(0).setField("id", 102L); // bucket(102, 4) = 2
+ bucket2Records.get(1).setField("id", 106L); // bucket(106, 4) = 2
+
+ List bucket3Records = RandomGenericData.generate(tableSchema, 2, 3L);
+ bucket3Records.get(0).setField("id", 103L); // bucket(103, 4) = 3
+ bucket3Records.get(1).setField("id", 107L); // bucket(107, 4) = 3
+
+ // Write files with explicit bucket values
+ dataAppender.appendToTable(org.apache.iceberg.TestHelpers.Row.of(0), bucket0Records);
+ dataAppender.appendToTable(org.apache.iceberg.TestHelpers.Row.of(1), bucket1Records);
+ dataAppender.appendToTable(org.apache.iceberg.TestHelpers.Row.of(2), bucket2Records);
+ dataAppender.appendToTable(org.apache.iceberg.TestHelpers.Row.of(3), bucket3Records);
+
+ ScanContext scanContext = ScanContext.builder().build();
+ List splits =
+ FlinkSplitPlanner.planIcebergPartitionAwareSourceSplits(table, scanContext, workerPool);
+
+ Assert.assertEquals("Should have one split per bucket", 4, splits.size());
+ for (IcebergSourceSplit split : splits) {
+ long totalRecords =
+ split.task().files().stream()
+ .mapToLong(fileScanTask -> fileScanTask.file().recordCount())
+ .sum();
+ Assert.assertEquals("Each split should contain 2 records", 2, totalRecords);
+ }
+ } finally {
+ catalog.dropTable(TestFixtures.TABLE_IDENTIFIER);
+ }
+ }
+
+ @Test
+ public void testPlanPartitionAwareSplitsWithUnpartitionedTable() throws Exception {
+ // Create an unpartitioned table (no partition spec)
+ Schema unpartitionedSchema =
+ new Schema(
+ required(1, "id", org.apache.iceberg.types.Types.LongType.get()),
+ required(2, "data", org.apache.iceberg.types.Types.StringType.get()));
+
+ // Create table with unpartitioned spec
+ PartitionSpec unpartitionedSpec = PartitionSpec.unpartitioned();
+
+ Table unpartitionedTable =
+ catalog.createTable(
+ TableIdentifier.of("default", "unpartitioned_test"),
+ unpartitionedSchema,
+ unpartitionedSpec,
+ null,
+ ImmutableMap.of(TableProperties.FORMAT_VERSION, "2"));
+
+ try {
+ GenericAppenderHelper dataAppender =
+ new GenericAppenderHelper(unpartitionedTable, FILE_FORMAT, TEMPORARY_FOLDER);
+ List records = RandomGenericData.generate(unpartitionedSchema, 5, 0L);
+ dataAppender.appendToTable(records);
+
+ ScanContext scanContext = ScanContext.builder().build();
+
+ // This should throw IllegalArgumentException
+ RuntimeException exception =
+ Assert.assertThrows(
+ RuntimeException.class,
+ () ->
+ FlinkSplitPlanner.planIcebergPartitionAwareSourceSplits(
+ unpartitionedTable, scanContext, workerPool));
+
+ Assert.assertTrue(
+ "Error message should mention grouping fields",
+ exception
+ .getMessage()
+ .contains("Currently only Partitions that are able to be grouped are supported"));
+
+ } finally {
+ catalog.dropTable(TableIdentifier.of("default", "unpartitioned_test"));
+ }
+ }
+
+ @Test
+ public void testPlanPartitionAwareSplitsWithConflictingPartitionSpecs() throws Exception {
+ // Table with conflicting/incompatible partition specs schema(id, data, category)
+ Schema schema =
+ new Schema(
+ required(1, "id", org.apache.iceberg.types.Types.LongType.get()),
+ required(2, "data", org.apache.iceberg.types.Types.StringType.get()),
+ required(3, "category", org.apache.iceberg.types.Types.StringType.get()));
+
+ // PARTITION BY data
+ PartitionSpec initialSpec = PartitionSpec.builderFor(schema).identity("data").build();
+ Table conflictingTable =
+ catalog.createTable(
+ TableIdentifier.of("default", "conflicting_specs"),
+ schema,
+ initialSpec,
+ null,
+ ImmutableMap.of(TableProperties.FORMAT_VERSION, "2"));
+
+ try {
+ // Add some data with the initial spec
+ GenericAppenderHelper dataAppender =
+ new GenericAppenderHelper(conflictingTable, FILE_FORMAT, TEMPORARY_FOLDER);
+ List records = RandomGenericData.generate(schema, 2, 0L);
+ records.get(0).setField("data", "test1");
+ records.get(0).setField("category", "cat1");
+ records.get(1).setField("data", "test2");
+ records.get(1).setField("category", "cat2");
+ dataAppender.appendToTable(
+ org.apache.iceberg.TestHelpers.Row.of("test1"), records.subList(0, 1));
+ dataAppender.appendToTable(
+ org.apache.iceberg.TestHelpers.Row.of("test2"), records.subList(1, 2));
+
+ // Create a new spec that conflicts with the existing one
+ // This creates incompatible specs that can't be grouped together
+ // PARTITIONED BY category
+ PartitionSpec conflictingSpec = PartitionSpec.builderFor(schema).identity("category").build();
+
+ // Manually add the conflicting spec to create incompatible partition evolution
+ TableOperations ops = ((HasTableOperations) conflictingTable).operations();
+ TableMetadata current = ops.current();
+ ops.commit(current, current.updatePartitionSpec(conflictingSpec));
+
+ Assert.assertEquals("Should have 2 specs", 2, conflictingTable.specs().size());
+
+ ScanContext scanContext = ScanContext.builder().build();
+
+ // This should throw IllegalArgumentException due to conflicting specs
+ RuntimeException exception =
+ Assert.assertThrows(
+ RuntimeException.class,
+ () ->
+ FlinkSplitPlanner.planIcebergPartitionAwareSourceSplits(
+ conflictingTable, scanContext, workerPool));
+
+ Assert.assertTrue(
+ "Error message should mention grouping fields",
+ exception
+ .getMessage()
+ .contains("Currently only Partitions that are able to be grouped are supported"));
+
+ } finally {
+ catalog.dropTable(TableIdentifier.of("default", "conflicting_specs"));
+ }
+ }
+}
diff --git a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceBounded.java b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceBounded.java
index 3c0c38e1115d..6bf78415c144 100644
--- a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceBounded.java
+++ b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceBounded.java
@@ -21,8 +21,6 @@
import java.util.Collections;
import java.util.List;
import java.util.Map;
-import org.apache.flink.api.common.eventtime.WatermarkStrategy;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
@@ -110,11 +108,8 @@ protected List run(
sourceBuilder.properties(options);
DataStream stream =
- env.fromSource(
- sourceBuilder.build(),
- WatermarkStrategy.noWatermarks(),
- "testBasicRead",
- TypeInformation.of(RowData.class))
+ sourceBuilder
+ .buildStream(env)
.map(
new RowDataToRowMapper(
FlinkSchemaUtil.convert(
diff --git a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceBoundedSql.java b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceBoundedSql.java
index ff3348bbc3a3..b46b634d2baa 100644
--- a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceBoundedSql.java
+++ b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceBoundedSql.java
@@ -37,7 +37,7 @@ public class TestIcebergSourceBoundedSql extends TestIcebergSourceBounded {
@BeforeEach
public void before() throws IOException {
Configuration tableConf = getTableEnv().getConfig().getConfiguration();
- tableConf.setBoolean(FlinkConfigOptions.TABLE_EXEC_ICEBERG_USE_FLIP27_SOURCE.key(), true);
+ tableConf.set(FlinkConfigOptions.TABLE_EXEC_ICEBERG_USE_FLIP27_SOURCE, true);
SqlHelpers.sql(
getTableEnv(),
"create catalog iceberg_catalog with ('type'='iceberg', 'catalog-type'='hadoop', 'warehouse'='%s')",
diff --git a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceInferParallelism.java b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceInferParallelism.java
new file mode 100644
index 000000000000..2908cb927269
--- /dev/null
+++ b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceInferParallelism.java
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.source;
+
+import static org.apache.iceberg.flink.MiniFlinkClusterExtension.DISABLE_CLASSLOADER_CHECK_CONFIG;
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.io.IOException;
+import java.lang.reflect.Field;
+import java.nio.file.Path;
+import java.util.List;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
+import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
+import org.apache.flink.runtime.minicluster.MiniCluster;
+import org.apache.flink.runtime.testutils.InternalMiniClusterExtension;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.test.junit5.MiniClusterExtension;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.CloseableIterator;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.data.GenericAppenderHelper;
+import org.apache.iceberg.data.RandomGenericData;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.flink.FlinkConfigOptions;
+import org.apache.iceberg.flink.FlinkSchemaUtil;
+import org.apache.iceberg.flink.HadoopCatalogExtension;
+import org.apache.iceberg.flink.TestFixtures;
+import org.apache.iceberg.flink.data.RowDataToRowMapper;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+import org.junit.jupiter.api.io.TempDir;
+
+public class TestIcebergSourceInferParallelism {
+ private static final int NUM_TMS = 2;
+ private static final int SLOTS_PER_TM = 2;
+ private static final int PARALLELISM = NUM_TMS * SLOTS_PER_TM;
+ private static final int MAX_INFERRED_PARALLELISM = 3;
+
+ @RegisterExtension
+ private static final MiniClusterExtension MINI_CLUSTER_EXTENSION =
+ new MiniClusterExtension(
+ new MiniClusterResourceConfiguration.Builder()
+ .setNumberTaskManagers(NUM_TMS)
+ .setNumberSlotsPerTaskManager(SLOTS_PER_TM)
+ .setConfiguration(DISABLE_CLASSLOADER_CHECK_CONFIG)
+ .build());
+
+ @RegisterExtension
+ protected static final HadoopCatalogExtension CATALOG_EXTENSION =
+ new HadoopCatalogExtension(TestFixtures.DATABASE, TestFixtures.TABLE);
+
+ @TempDir private Path tmpDir;
+
+ private Table table;
+ private GenericAppenderHelper dataAppender;
+
+ @BeforeEach
+ public void before() throws IOException {
+ this.table =
+ CATALOG_EXTENSION.catalog().createTable(TestFixtures.TABLE_IDENTIFIER, TestFixtures.SCHEMA);
+ this.dataAppender = new GenericAppenderHelper(table, FileFormat.PARQUET, tmpDir);
+ }
+
+ @AfterEach
+ public void after() {
+ CATALOG_EXTENSION.catalog().dropTable(TestFixtures.TABLE_IDENTIFIER);
+ }
+
+ @Test
+ public void testEmptyTable() throws Exception {
+ // Inferred parallelism should be at least 1 even if table is empty
+ test(1, 0);
+ }
+
+ @Test
+ public void testTableWithFilesLessThanMaxInferredParallelism() throws Exception {
+ // Append files to the table
+ for (int i = 0; i < 2; ++i) {
+ List batch = RandomGenericData.generate(table.schema(), 1, 0);
+ dataAppender.appendToTable(batch);
+ }
+
+ // Inferred parallelism should equal to 2 splits
+ test(2, 2);
+ }
+
+ @Test
+ public void testTableWithFilesMoreThanMaxInferredParallelism() throws Exception {
+ // Append files to the table
+ for (int i = 0; i < MAX_INFERRED_PARALLELISM + 1; ++i) {
+ List batch = RandomGenericData.generate(table.schema(), 1, 0);
+ dataAppender.appendToTable(batch);
+ }
+
+ // Inferred parallelism should be capped by the MAX_INFERRED_PARALLELISM
+ test(MAX_INFERRED_PARALLELISM, MAX_INFERRED_PARALLELISM + 1);
+ }
+
+ private void test(int expectedParallelism, int expectedRecords) throws Exception {
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(PARALLELISM);
+
+ Configuration config = new Configuration();
+ config.set(FlinkConfigOptions.TABLE_EXEC_ICEBERG_INFER_SOURCE_PARALLELISM, true);
+ config.set(
+ FlinkConfigOptions.TABLE_EXEC_ICEBERG_INFER_SOURCE_PARALLELISM_MAX,
+ MAX_INFERRED_PARALLELISM);
+
+ DataStream dataStream =
+ IcebergSource.forRowData()
+ .tableLoader(CATALOG_EXTENSION.tableLoader())
+ .table(table)
+ .flinkConfig(config)
+ // force one file per split
+ .splitSize(1L)
+ .buildStream(env)
+ .map(new RowDataToRowMapper(FlinkSchemaUtil.convert(table.schema())));
+
+ DataStream.Collector collector = new DataStream.Collector<>();
+ dataStream.collectAsync(collector);
+ JobClient jobClient = env.executeAsync();
+ try (CloseableIterator iterator = collector.getOutput()) {
+ List result = Lists.newArrayList();
+ while (iterator.hasNext()) {
+ result.add(iterator.next());
+ }
+
+ assertThat(result).hasSize(expectedRecords);
+ verifySourceParallelism(
+ expectedParallelism, miniCluster().getExecutionGraph(jobClient.getJobID()).get());
+ }
+ }
+
+ /**
+ * Borrowed this approach from Flink {@code FileSourceTextLinesITCase} to get source parallelism
+ * from execution graph.
+ */
+ private static void verifySourceParallelism(
+ int expectedParallelism, AccessExecutionGraph executionGraph) {
+ AccessExecutionJobVertex sourceVertex =
+ executionGraph.getVerticesTopologically().iterator().next();
+ assertThat(sourceVertex.getParallelism()).isEqualTo(expectedParallelism);
+ }
+
+ /**
+ * Use reflection to get {@code InternalMiniClusterExtension} and {@code MiniCluster} to get
+ * execution graph and source parallelism. Haven't find other way via public APIS.
+ */
+ private static MiniCluster miniCluster() throws Exception {
+ Field privateField =
+ MiniClusterExtension.class.getDeclaredField("internalMiniClusterExtension");
+ privateField.setAccessible(true);
+ InternalMiniClusterExtension internalExtension =
+ (InternalMiniClusterExtension) privateField.get(MINI_CLUSTER_EXTENSION);
+ return internalExtension.getMiniCluster();
+ }
+}
diff --git a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceSql.java b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceSql.java
index 4250460d278d..e404c08ee854 100644
--- a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceSql.java
+++ b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceSql.java
@@ -53,7 +53,12 @@ public class TestIcebergSourceSql extends TestSqlBase {
public void before() throws IOException {
TableEnvironment tableEnvironment = getTableEnv();
Configuration tableConf = tableEnvironment.getConfig().getConfiguration();
- tableConf.setBoolean(FlinkConfigOptions.TABLE_EXEC_ICEBERG_USE_FLIP27_SOURCE.key(), true);
+ tableConf.set(FlinkConfigOptions.TABLE_EXEC_ICEBERG_USE_FLIP27_SOURCE, true);
+ // Disable inferring parallelism to avoid interfering watermark tests
+ // that check split assignment is ordered by the watermark column.
+ // The tests assumes default parallelism of 1 with single reader task
+ // in order to check the order of read records.
+ tableConf.set(FlinkConfigOptions.TABLE_EXEC_ICEBERG_INFER_SOURCE_PARALLELISM, false);
tableEnvironment.getConfig().set("table.exec.resource.default-parallelism", "1");
SqlHelpers.sql(
diff --git a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/assigner/SplitAssignerTestBase.java b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/assigner/SplitAssignerTestBase.java
index 090b304942c6..792f15eee1dd 100644
--- a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/assigner/SplitAssignerTestBase.java
+++ b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/assigner/SplitAssignerTestBase.java
@@ -100,7 +100,7 @@ private void assertAvailableFuture(
}
protected void assertGetNext(SplitAssigner assigner, GetSplitResult.Status expectedStatus) {
- GetSplitResult result = assigner.getNext(null);
+ GetSplitResult result = assigner.getNext(null, 0, -1);
Assert.assertEquals(expectedStatus, result.status());
switch (expectedStatus) {
case AVAILABLE:
diff --git a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/assigner/TestFileSequenceNumberBasedSplitAssigner.java b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/assigner/TestFileSequenceNumberBasedSplitAssigner.java
index e78634e6b873..ccaccc7c88f1 100644
--- a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/assigner/TestFileSequenceNumberBasedSplitAssigner.java
+++ b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/assigner/TestFileSequenceNumberBasedSplitAssigner.java
@@ -73,7 +73,7 @@ public void testSerializable() {
}
private void assertGetNext(SplitAssigner assigner, Long expectedSequenceNumber) {
- GetSplitResult result = assigner.getNext(null);
+ GetSplitResult result = assigner.getNext(null, 0, -1);
ContentFile file = result.split().task().files().iterator().next().file();
Assert.assertEquals(expectedSequenceNumber, file.fileSequenceNumber());
}
diff --git a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/assigner/TestPartitionAwareSplitAssigner.java b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/assigner/TestPartitionAwareSplitAssigner.java
new file mode 100644
index 000000000000..a7beb0eb7d09
--- /dev/null
+++ b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/assigner/TestPartitionAwareSplitAssigner.java
@@ -0,0 +1,563 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.source.assigner;
+
+import static org.apache.iceberg.types.Types.NestedField.required;
+
+import java.io.File;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.BaseCombinedScanTask;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.PartitionData;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Partitioning;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.data.GenericAppenderHelper;
+import org.apache.iceberg.data.RandomGenericData;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.flink.TestFixtures;
+import org.apache.iceberg.flink.source.FlinkSplitPlanner;
+import org.apache.iceberg.flink.source.ScanContext;
+import org.apache.iceberg.flink.source.split.IcebergSourceSplit;
+import org.apache.iceberg.flink.source.split.IcebergSourceSplitState;
+import org.apache.iceberg.flink.source.split.IcebergSourceSplitStatus;
+import org.apache.iceberg.hadoop.HadoopCatalog;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.types.Types;
+import org.apache.iceberg.util.StructProjection;
+import org.apache.iceberg.util.ThreadPools;
+import org.junit.Assert;
+import org.junit.Test;
+
+// TODO -- in long term, we should consolidate some of the Partitioned Split Generation
+// to SplitHelpers like done in the other enumeration tests
+public class TestPartitionAwareSplitAssigner extends SplitAssignerTestBase {
+
+ // Simple schema for testing: id, data
+ private static final Schema TEST_SCHEMA =
+ new Schema(
+ required(1, "id", Types.LongType.get()), required(2, "data", Types.StringType.get()));
+
+ // Partition spec with bucket partitioning on id (4 buckets)
+ private static final PartitionSpec TEST_PARTITION_SPEC =
+ PartitionSpec.builderFor(TEST_SCHEMA).bucket("id", 4).build();
+
+ @Override
+ protected SplitAssigner splitAssigner() {
+ return new PartitionAwareSplitAssigner();
+ }
+
+ /** Test 4 buckets assigned to 4 subtasks - each subtask gets exactly one bucket. */
+ @Test
+ public void testFourBucketsToFourSubtasks() throws Exception {
+ SplitAssigner assigner = splitAssigner();
+ assigner.onDiscoveredSplits(createBucketPartitionedSplits());
+
+ Map assignments = assignSplitsAndTrackPartitions(assigner, 4);
+
+ // Verify each subtask got exactly one partition (4 buckets to 4 subtasks)
+ Assert.assertEquals("Should have 4 partitions", 4, assignments.size());
+ Assert.assertEquals(
+ "Each subtask should get one partition", 4, Sets.newHashSet(assignments.values()).size());
+ Assert.assertEquals("All splits should be consumed", 0, assigner.pendingSplitCount());
+ }
+
+ /** Test 4 buckets assigned to 2 subtasks - each subtask gets multiple buckets. */
+ @Test
+ public void testFourBucketsToTwoSubtasks() throws Exception {
+ SplitAssigner assigner = splitAssigner();
+ assigner.onDiscoveredSplits(createBucketPartitionedSplits());
+
+ Map assignments = assignSplitsAndTrackPartitions(assigner, 2);
+
+ // Verify both subtasks got partitions (4 buckets distributed to 2 subtasks)
+ Assert.assertEquals("Should have 4 partitions", 4, assignments.size());
+ Assert.assertEquals(
+ "Both subtasks should get partitions", 2, Sets.newHashSet(assignments.values()).size());
+ Assert.assertEquals("All splits should be consumed", 0, assigner.pendingSplitCount());
+ }
+
+ /** Test 4 buckets assigned to 6 subtasks - 2 subtasks won't have data. */
+ @Test
+ public void testFourBucketsToSixSubtasks() throws Exception {
+ SplitAssigner assigner = splitAssigner();
+ assigner.onDiscoveredSplits(createBucketPartitionedSplits());
+
+ Map assignments = assignSplitsAndTrackPartitions(assigner, 6);
+
+ // Verify exactly 4 subtasks got partitions (only 4 buckets available) and 2 got none
+ Assert.assertEquals("Should have 4 partitions", 4, assignments.size());
+ Assert.assertEquals(
+ "Only 4 subtasks should get partitions", 4, Sets.newHashSet(assignments.values()).size());
+ Assert.assertEquals("All splits should be consumed", 0, assigner.pendingSplitCount());
+ }
+
+ /** Test mixed dt (identity) and bucket partitioning with 3 files assigned to 2 tasks. */
+ @Test
+ public void testMixedPartitioningThreeFilesToTwoTasks() throws Exception {
+ SplitAssigner assigner = splitAssigner();
+ assigner.onDiscoveredSplits(createMixedPartitionedSplits());
+
+ Map assignments = assignSplitsAndTrackPartitions(assigner, 2);
+
+ // Verify partitions distributed to both subtasks (3 mixed partitions to 2 subtasks)
+ Assert.assertEquals("Should have 3 partitions", 3, assignments.size());
+ Assert.assertEquals(
+ "Both subtasks should get partitions", 2, Sets.newHashSet(assignments.values()).size());
+ Assert.assertEquals("All splits should be consumed", 0, assigner.pendingSplitCount());
+ }
+
+ @Test
+ public void testStateFailedTask() throws Exception {
+ // 1. Create assigner and add partitioned splits
+ SplitAssigner assigner = splitAssigner();
+ List splits = createBucketPartitionedSplits(); // 4 bucket splits
+ assigner.onDiscoveredSplits(splits);
+
+ // 2. Take a snapshot of the state (before any splits are assigned)
+ Collection state = assigner.state();
+ Assert.assertEquals("State should contain all splits", splits.size(), state.size());
+
+ // Verify all splits are in UNASSIGNED status
+ for (IcebergSourceSplitState splitState : state) {
+ Assert.assertEquals(
+ "Splits should be unassigned in state",
+ IcebergSourceSplitStatus.UNASSIGNED,
+ splitState.status());
+ Assert.assertNotNull("Split should not be null in state", splitState.split());
+ }
+
+ // 3. Create a new assigner from the saved state (simulating task restart)
+ SplitAssigner restored = new PartitionAwareSplitAssigner(state);
+
+ // 4. Verify the restored assigner has the same state
+ Assert.assertEquals(
+ "Restored assigner should have same split count",
+ splits.size(),
+ restored.pendingSplitCount());
+ Collection restoredState = restored.state();
+ Assert.assertEquals("Restored state should have same size", state.size(), restoredState.size());
+
+ // 5. Test that partition-aware assignment works correctly after restoration
+ Map restoredAssignments = assignSplitsAndTrackPartitions(restored, 4);
+
+ // 6. Verify all splits were retrieved and partition affinity is maintained
+ Assert.assertEquals(
+ "All partitions should be assignable after restore",
+ splits.size(),
+ restoredAssignments.size());
+ Assert.assertEquals(
+ "Each subtask should get exactly one partition after restore",
+ restoredAssignments.size(),
+ Sets.newHashSet(restoredAssignments.values()).size());
+ Assert.assertEquals(
+ "No splits should remain after assignment", 0, restored.pendingSplitCount());
+ }
+
+ @Test
+ public void testConsistentAssignmentPartitionAcrossTables() throws Exception {
+ int[] partitionCounts = {2, 4, 6, 8};
+ int totalTasks = 4;
+
+ for (int partitionCount : partitionCounts) {
+ testConsistentAssignmentWithPartitionCount(partitionCount, totalTasks);
+ }
+ }
+
+ private void testConsistentAssignmentWithPartitionCount(int partitionCount, int totalTasks)
+ throws Exception {
+ // Create two tables with identical partitioning but different data
+ List table1Splits =
+ createIdentityPartitionedSplits("table1", partitionCount);
+ List table2Splits =
+ createIdentityPartitionedSplits("table2", partitionCount);
+
+ // Create separate assigners for each table
+ SplitAssigner assigner1 = new PartitionAwareSplitAssigner();
+ SplitAssigner assigner2 = new PartitionAwareSplitAssigner();
+
+ assigner1.onDiscoveredSplits(table1Splits);
+ assigner2.onDiscoveredSplits(table2Splits);
+
+ // Assign splits and track partition assignments for both tables
+ Map table1Assignments =
+ assignSplitsAndTrackPartitions(assigner1, totalTasks);
+ Map table2Assignments =
+ assignSplitsAndTrackPartitions(assigner2, totalTasks);
+
+ // Assert consistent assignment across tables
+ Assert.assertEquals(
+ "Both tables should have same number of partitions assigned",
+ table1Assignments.size(),
+ table2Assignments.size());
+
+ for (StructLike partition : table1Assignments.keySet()) {
+ Assert.assertTrue(
+ "Table2 should have assignment for partition: " + partition,
+ table2Assignments.containsKey(partition));
+ Assert.assertEquals(
+ "Partition " + partition + " should be assigned to same task in both tables",
+ table1Assignments.get(partition),
+ table2Assignments.get(partition));
+ }
+
+ // Verify all splits were assigned
+ Assert.assertEquals("All table1 splits should be consumed", 0, assigner1.pendingSplitCount());
+ Assert.assertEquals("All table2 splits should be consumed", 0, assigner2.pendingSplitCount());
+ }
+
+ /** Assigns splits using round-robin Returns a map of partition -> subtask assignments. */
+ private Map assignSplitsAndTrackPartitions(
+ SplitAssigner assigner, int totalTasks) {
+ Map assignments = Maps.newHashMap();
+ int currentSubtask = 0;
+ while (assigner.pendingSplitCount() > 0) {
+ GetSplitResult result = assigner.getNext(null, currentSubtask, totalTasks);
+ if (result.status() == GetSplitResult.Status.AVAILABLE) {
+ StructLike partitionKey = extractPartitionKey(result.split());
+ // same partition must always go to same subtask
+ Integer existingSubtask = assignments.put(partitionKey, currentSubtask);
+ if (existingSubtask != null) {
+ Assert.assertEquals(
+ "Partition " + partitionKey + " must consistently go to same subtask",
+ existingSubtask,
+ Integer.valueOf(currentSubtask));
+ }
+ }
+ currentSubtask = (currentSubtask + 1) % totalTasks;
+ }
+
+ return assignments;
+ }
+
+ // Same logic as seen in PartitionAwareSplitAssigner
+ private StructLike extractPartitionKey(IcebergSourceSplit split) {
+ // Reuse the same approach as PartitionAwareSplitAssigner for consistent partition key
+ // extraction
+ FileScanTask firstTask = split.task().files().iterator().next();
+ PartitionSpec spec = firstTask.spec();
+ StructLike partition = firstTask.partition();
+
+ // Compute consistent grouping key type for this split's partition specs
+ Set specs =
+ split.task().files().stream().map(FileScanTask::spec).collect(Collectors.toSet());
+ Types.StructType groupingKeyType = Partitioning.groupingKeyType(null, specs);
+
+ // Create projection to convert partition to consistent grouping key
+ StructProjection projection = StructProjection.create(spec.partitionType(), groupingKeyType);
+ PartitionData groupingKeyTemplate = new PartitionData(groupingKeyType);
+ return groupingKeyTemplate.copyFor(projection.wrap(partition));
+ }
+
+ @Override
+ @Test
+ public void testEmptyInitialization() {
+ SplitAssigner assigner = splitAssigner();
+ // Partition-aware assigner requires subtaskId and registeredTasks
+ GetSplitResult result = assigner.getNext(null, 0, 1);
+ Assert.assertEquals(GetSplitResult.Status.UNAVAILABLE, result.status());
+ }
+
+ @Override
+ @Test
+ // Right now PartitionAwareSplitAssigner is only designed for Batch Execution mode
+ public void testContinuousEnumeratorSequence() {}
+
+ @Override
+ @Test
+ public void testStaticEnumeratorSequence() throws Exception {
+ SplitAssigner assigner = splitAssigner();
+ List splits = createBucketPartitionedSplits();
+ assigner.onDiscoveredSplits(splits);
+
+ // We override this function from SplitAssignerTestBase, so no need to overcomplicated, all
+ // splits
+ // are assigned to the same taskId
+ int registeredTasks = 1;
+
+ assertGetNext(assigner, GetSplitResult.Status.AVAILABLE, 0, registeredTasks);
+ assertGetNext(assigner, GetSplitResult.Status.AVAILABLE, 0, registeredTasks);
+ assertGetNext(assigner, GetSplitResult.Status.AVAILABLE, 0, registeredTasks);
+ assertSnapshot(assigner, 1);
+ assigner.onUnassignedSplits(createSplits(1, 1, "1"));
+ assertSnapshot(assigner, 2);
+
+ assertGetNext(assigner, GetSplitResult.Status.AVAILABLE, 0, registeredTasks);
+ assertGetNext(assigner, GetSplitResult.Status.AVAILABLE, 0, registeredTasks);
+ assertGetNext(assigner, GetSplitResult.Status.UNAVAILABLE, 0, registeredTasks);
+ assertSnapshot(assigner, 0);
+ }
+
+ /**
+ * Create splits with bucket partitioning - one file per bucket (4 buckets total). TODO: Move this
+ * logic to SplitHelpers once we have a robust implementation.
+ */
+ private List createBucketPartitionedSplits() throws Exception {
+ final File warehouseFile = TEMPORARY_FOLDER.newFolder();
+ Assert.assertTrue(warehouseFile.delete());
+ final String warehouse = "file:" + warehouseFile;
+ Configuration hadoopConf = new Configuration();
+ final HadoopCatalog catalog = new HadoopCatalog(hadoopConf, warehouse);
+
+ ImmutableMap properties = ImmutableMap.of(TableProperties.FORMAT_VERSION, "1");
+
+ try {
+ final Table table =
+ catalog.createTable(
+ TestFixtures.TABLE_IDENTIFIER, TEST_SCHEMA, TEST_PARTITION_SPEC, null, properties);
+ final GenericAppenderHelper dataAppender =
+ new GenericAppenderHelper(table, FileFormat.PARQUET, TEMPORARY_FOLDER);
+
+ // Create records for each bucket (0, 1, 2, 3)
+ // bucket(id, 4) = id % 4, so we use ids: 100, 101, 102, 103
+ List bucket0Records = RandomGenericData.generate(TEST_SCHEMA, 2, 0L);
+ bucket0Records.get(0).setField("id", 100L); // bucket(100, 4) = 0
+ bucket0Records.get(1).setField("id", 104L); // bucket(104, 4) = 0
+
+ List bucket1Records = RandomGenericData.generate(TEST_SCHEMA, 2, 1L);
+ bucket1Records.get(0).setField("id", 101L); // bucket(101, 4) = 1
+ bucket1Records.get(1).setField("id", 105L); // bucket(105, 4) = 1
+
+ List bucket2Records = RandomGenericData.generate(TEST_SCHEMA, 2, 2L);
+ bucket2Records.get(0).setField("id", 102L); // bucket(102, 4) = 2
+ bucket2Records.get(1).setField("id", 106L); // bucket(106, 4) = 2
+
+ List bucket3Records = RandomGenericData.generate(TEST_SCHEMA, 2, 3L);
+ bucket3Records.get(0).setField("id", 103L); // bucket(103, 4) = 3
+ bucket3Records.get(1).setField("id", 107L); // bucket(107, 4) = 3
+
+ // Write files with explicit bucket values
+ dataAppender.appendToTable(
+ org.apache.iceberg.TestHelpers.Row.of(0), bucket0Records); // bucket 0
+ dataAppender.appendToTable(
+ org.apache.iceberg.TestHelpers.Row.of(1), bucket1Records); // bucket 1
+ dataAppender.appendToTable(
+ org.apache.iceberg.TestHelpers.Row.of(2), bucket2Records); // bucket 2
+ dataAppender.appendToTable(
+ org.apache.iceberg.TestHelpers.Row.of(3), bucket3Records); // bucket 3
+
+ final ScanContext scanContext = ScanContext.builder().build();
+ final List splits =
+ FlinkSplitPlanner.planIcebergSourceSplits(
+ table, scanContext, ThreadPools.getWorkerPool());
+
+ return splits.stream()
+ .flatMap(
+ split -> {
+ List> filesList =
+ Lists.partition(Lists.newArrayList(split.task().files()), 1);
+ return filesList.stream()
+ .map(files -> new BaseCombinedScanTask(files))
+ .map(
+ combinedScanTask ->
+ IcebergSourceSplit.fromCombinedScanTask(combinedScanTask));
+ })
+ .collect(Collectors.toList());
+ } finally {
+ catalog.dropTable(TestFixtures.TABLE_IDENTIFIER);
+ catalog.close();
+ }
+ }
+
+ /**
+ * Create splits with mixed dt (identity) and bucket partitioning - 3 files total. TODO: Move this
+ * logic to SplitHelpers once we have a robust implementation.
+ */
+ private List createMixedPartitionedSplits() throws Exception {
+ // Schema with both dt and id fields
+ Schema mixedSchema =
+ new Schema(
+ required(1, "id", Types.LongType.get()),
+ required(2, "dt", Types.StringType.get()),
+ required(3, "data", Types.StringType.get()));
+
+ // Partition spec with both identity and bucket partitioning
+ PartitionSpec mixedSpec =
+ PartitionSpec.builderFor(mixedSchema).identity("dt").bucket("id", 4).build();
+
+ final File warehouseFile = TEMPORARY_FOLDER.newFolder();
+ Assert.assertTrue(warehouseFile.delete());
+ final String warehouse = "file:" + warehouseFile;
+ Configuration hadoopConf = new Configuration();
+ final HadoopCatalog catalog = new HadoopCatalog(hadoopConf, warehouse);
+
+ ImmutableMap properties = ImmutableMap.of(TableProperties.FORMAT_VERSION, "1");
+
+ try {
+ final Table table =
+ catalog.createTable(
+ TestFixtures.TABLE_IDENTIFIER, mixedSchema, mixedSpec, null, properties);
+ final GenericAppenderHelper dataAppender =
+ new GenericAppenderHelper(table, FileFormat.PARQUET, TEMPORARY_FOLDER);
+
+ // File 1: part(dt=2020-06-12/bucket=0)
+ List file1Records = RandomGenericData.generate(mixedSchema, 2, 0L);
+ file1Records.get(0).setField("id", 100L); // bucket(100, 4) = 0
+ file1Records.get(0).setField("dt", "2020-06-12");
+ file1Records.get(1).setField("id", 104L); // bucket(104, 4) = 0
+ file1Records.get(1).setField("dt", "2020-06-12");
+
+ // File 2: part(dt=2025-06-11/bucket=1)
+ List file2Records = RandomGenericData.generate(mixedSchema, 2, 1L);
+ file2Records.get(0).setField("id", 101L); // bucket(101, 4) = 1
+ file2Records.get(0).setField("dt", "2025-06-11");
+ file2Records.get(1).setField("id", 105L); // bucket(105, 4) = 1
+ file2Records.get(1).setField("dt", "2025-06-11");
+
+ // File 3: part(dt=2025-06-10/bucket=2)
+ List file3Records = RandomGenericData.generate(mixedSchema, 2, 2L);
+ file3Records.get(0).setField("id", 102L); // bucket(102, 4) = 2
+ file3Records.get(0).setField("dt", "2025-06-10");
+ file3Records.get(1).setField("id", 106L); // bucket(106, 4) = 2
+ file3Records.get(1).setField("dt", "2025-06-10");
+
+ // Write files with explicit partition values (dt, bucket)
+ dataAppender.appendToTable(
+ org.apache.iceberg.TestHelpers.Row.of("2020-06-12", 0), file1Records);
+ dataAppender.appendToTable(
+ org.apache.iceberg.TestHelpers.Row.of("2025-06-11", 1), file2Records);
+ dataAppender.appendToTable(
+ org.apache.iceberg.TestHelpers.Row.of("2025-06-10", 2), file3Records);
+
+ final ScanContext scanContext = ScanContext.builder().build();
+ final List splits =
+ FlinkSplitPlanner.planIcebergSourceSplits(
+ table, scanContext, ThreadPools.getWorkerPool());
+
+ return splits.stream()
+ .flatMap(
+ split -> {
+ List> filesList =
+ Lists.partition(Lists.newArrayList(split.task().files()), 1);
+ return filesList.stream()
+ .map(files -> new BaseCombinedScanTask(files))
+ .map(
+ combinedScanTask ->
+ IcebergSourceSplit.fromCombinedScanTask(combinedScanTask));
+ })
+ .collect(Collectors.toList());
+ } finally {
+ catalog.dropTable(TestFixtures.TABLE_IDENTIFIER);
+ catalog.close();
+ }
+ }
+
+ /**
+ * Create splits with identity partitioning on dt and id fields. Used for testing consistent
+ * assignment across tables.
+ */
+ private List createIdentityPartitionedSplits(
+ String tablePrefix, int partitionCount) throws Exception {
+ // Schema with dt and id fields for identity partitioning
+ Schema identitySchema =
+ new Schema(
+ required(1, "dt", Types.StringType.get()),
+ required(2, "id", Types.LongType.get()),
+ required(3, "data", Types.StringType.get()));
+
+ // Partition spec with identity partitioning on both dt and id
+ PartitionSpec identitySpec =
+ PartitionSpec.builderFor(identitySchema).identity("dt").identity("id").build();
+
+ final File warehouseFile = TEMPORARY_FOLDER.newFolder();
+ Assert.assertTrue(warehouseFile.delete());
+ final String warehouse = "file:" + warehouseFile;
+ Configuration hadoopConf = new Configuration();
+ final HadoopCatalog catalog = new HadoopCatalog(hadoopConf, warehouse);
+
+ ImmutableMap properties = ImmutableMap.of(TableProperties.FORMAT_VERSION, "1");
+
+ try {
+ final Table table =
+ catalog.createTable(
+ TestFixtures.TABLE_IDENTIFIER, identitySchema, identitySpec, null, properties);
+ final GenericAppenderHelper dataAppender =
+ new GenericAppenderHelper(table, FileFormat.PARQUET, TEMPORARY_FOLDER);
+
+ // Create partitions with predictable dt and id values
+ for (int i = 0; i < partitionCount; i++) {
+ String dt = "2024-01-" + String.format("%02d", (i % 30) + 1);
+ Long id = (long) (i + 1000); // Start IDs from 1000
+
+ List records = RandomGenericData.generate(identitySchema, 2, i);
+ // Set the partition field values
+ records.get(0).setField("dt", dt);
+ records.get(0).setField("id", id);
+ records.get(1).setField("dt", dt);
+ records.get(1).setField("id", id);
+
+ // Write file for this partition
+ dataAppender.appendToTable(org.apache.iceberg.TestHelpers.Row.of(dt, id), records);
+ }
+
+ final ScanContext scanContext = ScanContext.builder().build();
+ final List splits =
+ FlinkSplitPlanner.planIcebergSourceSplits(
+ table, scanContext, ThreadPools.getWorkerPool());
+
+ return splits.stream()
+ .flatMap(
+ split -> {
+ List> filesList =
+ Lists.partition(Lists.newArrayList(split.task().files()), 1);
+ return filesList.stream()
+ .map(files -> new BaseCombinedScanTask(files))
+ .map(
+ combinedScanTask ->
+ IcebergSourceSplit.fromCombinedScanTask(combinedScanTask));
+ })
+ .collect(Collectors.toList());
+ } finally {
+ catalog.dropTable(TestFixtures.TABLE_IDENTIFIER);
+ catalog.close();
+ }
+ }
+
+ protected void assertGetNext(
+ SplitAssigner assigner,
+ GetSplitResult.Status expectedStatus,
+ int taskId,
+ int registeredTasks) {
+ GetSplitResult result = assigner.getNext(null, taskId, registeredTasks);
+ Assert.assertEquals(expectedStatus, result.status());
+ switch (expectedStatus) {
+ case AVAILABLE:
+ Assert.assertNotNull(result.split());
+ break;
+ case CONSTRAINED:
+ case UNAVAILABLE:
+ Assert.assertNull(result.split());
+ break;
+ default:
+ Assert.fail("Unknown status: " + expectedStatus);
+ }
+ }
+}
diff --git a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/assigner/TestWatermarkBasedSplitAssigner.java b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/assigner/TestWatermarkBasedSplitAssigner.java
index e1fc63fda918..6bbfc7649930 100644
--- a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/assigner/TestWatermarkBasedSplitAssigner.java
+++ b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/assigner/TestWatermarkBasedSplitAssigner.java
@@ -108,7 +108,7 @@ public void testSerializable() {
}
private void assertGetNext(SplitAssigner assigner, IcebergSourceSplit split) {
- GetSplitResult result = assigner.getNext(null);
+ GetSplitResult result = assigner.getNext(null, 0, -1);
Assert.assertEquals(result.split(), split);
}