diff --git a/flink/src/main/java/org/apache/iceberg/flink/actions/RewriteDataFilesAction.java b/flink/src/main/java/org/apache/iceberg/flink/actions/RewriteDataFilesAction.java index 291be0ca26bf..2dc4b7ec1bb5 100644 --- a/flink/src/main/java/org/apache/iceberg/flink/actions/RewriteDataFilesAction.java +++ b/flink/src/main/java/org/apache/iceberg/flink/actions/RewriteDataFilesAction.java @@ -24,6 +24,7 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.iceberg.CombinedScanTask; import org.apache.iceberg.DataFile; +import org.apache.iceberg.SerializableTable; import org.apache.iceberg.Table; import org.apache.iceberg.actions.BaseRewriteDataFilesAction; import org.apache.iceberg.flink.source.RowDataRewriter; @@ -51,7 +52,7 @@ protected List rewriteDataForTasks(List combinedScan int size = combinedScanTasks.size(); int parallelism = Math.min(size, maxParallelism); DataStream dataStream = env.fromCollection(combinedScanTasks); - RowDataRewriter rowDataRewriter = new RowDataRewriter(table(), caseSensitive(), fileIO(), encryptionManager()); + RowDataRewriter rowDataRewriter = new RowDataRewriter(SerializableTable.copyOf(table()), caseSensitive()); try { return rowDataRewriter.rewriteDataForTasks(dataStream, parallelism); } catch (Exception e) { diff --git a/flink/src/main/java/org/apache/iceberg/flink/source/FlinkInputFormat.java b/flink/src/main/java/org/apache/iceberg/flink/source/FlinkInputFormat.java index 1bad1c25952e..d564fb850669 100644 --- a/flink/src/main/java/org/apache/iceberg/flink/source/FlinkInputFormat.java +++ b/flink/src/main/java/org/apache/iceberg/flink/source/FlinkInputFormat.java @@ -29,9 +29,6 @@ import org.apache.flink.table.data.RowData; import org.apache.iceberg.Schema; import org.apache.iceberg.Table; -import org.apache.iceberg.encryption.EncryptionManager; -import org.apache.iceberg.flink.TableLoader; -import org.apache.iceberg.io.FileIO; import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting; /** @@ -41,21 +38,14 @@ public class FlinkInputFormat extends RichInputFormat private static final long serialVersionUID = 1L; - private final TableLoader tableLoader; - private final Schema tableSchema; - private final FileIO io; - private final EncryptionManager encryption; + private final Table table; private final ScanContext context; private transient RowDataIterator iterator; private transient long currentReadCount = 0L; - FlinkInputFormat(TableLoader tableLoader, Schema tableSchema, FileIO io, EncryptionManager encryption, - ScanContext context) { - this.tableLoader = tableLoader; - this.tableSchema = tableSchema; - this.io = io; - this.encryption = encryption; + FlinkInputFormat(Table table, ScanContext context) { + this.table = table; this.context = context; } @@ -72,12 +62,7 @@ public BaseStatistics getStatistics(BaseStatistics cachedStatistics) { @Override public FlinkInputSplit[] createInputSplits(int minNumSplits) throws IOException { - // Called in Job manager, so it is OK to load table from catalog. - tableLoader.open(); - try (TableLoader loader = tableLoader) { - Table table = loader.loadTable(); - return FlinkSplitGenerator.createInputSplits(table, context); - } + return FlinkSplitGenerator.createInputSplits(table, context); } @Override @@ -91,9 +76,7 @@ public void configure(Configuration parameters) { @Override public void open(FlinkInputSplit split) { - this.iterator = new RowDataIterator( - split.getTask(), io, encryption, tableSchema, context.project(), context.nameMapping(), - context.caseSensitive()); + this.iterator = new RowDataIterator(table, split.getTask(), context.project(), context.caseSensitive()); } @Override diff --git a/flink/src/main/java/org/apache/iceberg/flink/source/FlinkSource.java b/flink/src/main/java/org/apache/iceberg/flink/source/FlinkSource.java index a3263d284c0c..d6654980691d 100644 --- a/flink/src/main/java/org/apache/iceberg/flink/source/FlinkSource.java +++ b/flink/src/main/java/org/apache/iceberg/flink/source/FlinkSource.java @@ -32,15 +32,14 @@ import org.apache.flink.table.api.config.ExecutionConfigOptions; import org.apache.flink.table.data.RowData; import org.apache.iceberg.Schema; +import org.apache.iceberg.SerializableTable; import org.apache.iceberg.Table; import org.apache.iceberg.TableScan; -import org.apache.iceberg.encryption.EncryptionManager; import org.apache.iceberg.expressions.Expression; import org.apache.iceberg.flink.FlinkConfigOptions; import org.apache.iceberg.flink.FlinkSchemaUtil; import org.apache.iceberg.flink.TableLoader; import org.apache.iceberg.flink.util.FlinkCompatibilityUtil; -import org.apache.iceberg.io.FileIO; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; public class FlinkSource { @@ -157,11 +156,6 @@ public Builder streaming(boolean streaming) { return this; } - public Builder nameMapping(String nameMapping) { - contextBuilder.nameMapping(nameMapping); - return this; - } - public Builder flinkConf(ReadableConfig config) { this.readableConfig = config; return this; @@ -170,25 +164,16 @@ public Builder flinkConf(ReadableConfig config) { public FlinkInputFormat buildFormat() { Preconditions.checkNotNull(tableLoader, "TableLoader should not be null"); - Schema icebergSchema; - FileIO io; - EncryptionManager encryption; if (table == null) { // load required fields by table loader. tableLoader.open(); try (TableLoader loader = tableLoader) { table = loader.loadTable(); - icebergSchema = table.schema(); - io = table.io(); - encryption = table.encryption(); } catch (IOException e) { throw new UncheckedIOException(e); } - } else { - icebergSchema = table.schema(); - io = table.io(); - encryption = table.encryption(); } + Schema icebergSchema = table.schema(); if (projectedSchema == null) { contextBuilder.project(icebergSchema); @@ -196,7 +181,8 @@ public FlinkInputFormat buildFormat() { contextBuilder.project(FlinkSchemaUtil.convert(icebergSchema, projectedSchema)); } - return new FlinkInputFormat(tableLoader, icebergSchema, io, encryption, contextBuilder.build()); + // we pass a read-only serializable copy of the current table state to FlinkInputFormat + return new FlinkInputFormat(SerializableTable.copyOf(table), contextBuilder.build()); } public DataStream build() { diff --git a/flink/src/main/java/org/apache/iceberg/flink/source/RowDataIterator.java b/flink/src/main/java/org/apache/iceberg/flink/source/RowDataIterator.java index 5a568144d1f7..bda6b772c3a0 100644 --- a/flink/src/main/java/org/apache/iceberg/flink/source/RowDataIterator.java +++ b/flink/src/main/java/org/apache/iceberg/flink/source/RowDataIterator.java @@ -26,9 +26,9 @@ import org.apache.iceberg.MetadataColumns; import org.apache.iceberg.Schema; import org.apache.iceberg.StructLike; +import org.apache.iceberg.Table; import org.apache.iceberg.avro.Avro; import org.apache.iceberg.data.DeleteFilter; -import org.apache.iceberg.encryption.EncryptionManager; import org.apache.iceberg.flink.FlinkSchemaUtil; import org.apache.iceberg.flink.RowDataWrapper; import org.apache.iceberg.flink.data.FlinkAvroReader; @@ -37,7 +37,6 @@ import org.apache.iceberg.flink.data.RowDataUtil; import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.io.CloseableIterator; -import org.apache.iceberg.io.FileIO; import org.apache.iceberg.io.InputFile; import org.apache.iceberg.mapping.NameMappingParser; import org.apache.iceberg.orc.ORC; @@ -47,6 +46,8 @@ import org.apache.iceberg.types.TypeUtil; import org.apache.iceberg.util.PartitionUtil; +import static org.apache.iceberg.TableProperties.DEFAULT_NAME_MAPPING; + class RowDataIterator extends DataIterator { private final Schema tableSchema; @@ -54,12 +55,11 @@ class RowDataIterator extends DataIterator { private final String nameMapping; private final boolean caseSensitive; - RowDataIterator(CombinedScanTask task, FileIO io, EncryptionManager encryption, Schema tableSchema, - Schema projectedSchema, String nameMapping, boolean caseSensitive) { - super(task, io, encryption); - this.tableSchema = tableSchema; + RowDataIterator(Table table, CombinedScanTask task, Schema projectedSchema, boolean caseSensitive) { + super(task, table.io(), table.encryption()); + this.tableSchema = table.schema(); this.projectedSchema = projectedSchema; - this.nameMapping = nameMapping; + this.nameMapping = table.properties().get(DEFAULT_NAME_MAPPING); this.caseSensitive = caseSensitive; } diff --git a/flink/src/main/java/org/apache/iceberg/flink/source/RowDataRewriter.java b/flink/src/main/java/org/apache/iceberg/flink/source/RowDataRewriter.java index a6cd374c3044..1107f62b12ee 100644 --- a/flink/src/main/java/org/apache/iceberg/flink/source/RowDataRewriter.java +++ b/flink/src/main/java/org/apache/iceberg/flink/source/RowDataRewriter.java @@ -31,41 +31,29 @@ import org.apache.iceberg.CombinedScanTask; import org.apache.iceberg.DataFile; import org.apache.iceberg.FileFormat; -import org.apache.iceberg.Schema; -import org.apache.iceberg.SerializableTable; import org.apache.iceberg.Table; import org.apache.iceberg.TableProperties; -import org.apache.iceberg.encryption.EncryptionManager; import org.apache.iceberg.flink.FlinkSchemaUtil; import org.apache.iceberg.flink.sink.RowDataTaskWriterFactory; import org.apache.iceberg.flink.sink.TaskWriterFactory; -import org.apache.iceberg.io.FileIO; import org.apache.iceberg.io.TaskWriter; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.util.PropertyUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import static org.apache.iceberg.TableProperties.DEFAULT_NAME_MAPPING; - public class RowDataRewriter { private static final Logger LOG = LoggerFactory.getLogger(RowDataRewriter.class); - private final Schema schema; - private final String nameMapping; - private final FileIO io; + private final Table table; private final boolean caseSensitive; - private final EncryptionManager encryptionManager; private final TaskWriterFactory taskWriterFactory; private final String tableName; - public RowDataRewriter(Table table, boolean caseSensitive, FileIO io, EncryptionManager encryptionManager) { - this.schema = table.schema(); + public RowDataRewriter(Table table, boolean caseSensitive) { + this.table = table; this.caseSensitive = caseSensitive; - this.io = io; - this.encryptionManager = encryptionManager; - this.nameMapping = PropertyUtil.propertyAsString(table.properties(), DEFAULT_NAME_MAPPING, null); this.tableName = table.name(); String formatString = PropertyUtil.propertyAsString(table.properties(), TableProperties.DEFAULT_FILE_FORMAT, @@ -73,7 +61,7 @@ public RowDataRewriter(Table table, boolean caseSensitive, FileIO io, Encryption FileFormat format = FileFormat.valueOf(formatString.toUpperCase(Locale.ENGLISH)); RowType flinkSchema = FlinkSchemaUtil.convert(table.schema()); this.taskWriterFactory = new RowDataTaskWriterFactory( - SerializableTable.copyOf(table), + table, flinkSchema, Long.MAX_VALUE, format, @@ -81,7 +69,7 @@ public RowDataRewriter(Table table, boolean caseSensitive, FileIO io, Encryption } public List rewriteDataForTasks(DataStream dataStream, int parallelism) throws Exception { - RewriteMap map = new RewriteMap(schema, nameMapping, io, caseSensitive, encryptionManager, taskWriterFactory); + RewriteMap map = new RewriteMap(table, caseSensitive, taskWriterFactory); DataStream> ds = dataStream.map(map).setParallelism(parallelism); return Lists.newArrayList(ds.executeAndCollect("Rewrite table :" + tableName)).stream().flatMap(Collection::stream) .collect(Collectors.toList()); @@ -93,20 +81,13 @@ public static class RewriteMap extends RichMapFunction taskWriterFactory; - public RewriteMap(Schema schema, String nameMapping, FileIO io, boolean caseSensitive, - EncryptionManager encryptionManager, TaskWriterFactory taskWriterFactory) { - this.schema = schema; - this.nameMapping = nameMapping; - this.io = io; + public RewriteMap(Table table, boolean caseSensitive, TaskWriterFactory taskWriterFactory) { + this.table = table; this.caseSensitive = caseSensitive; - this.encryptionManager = encryptionManager; this.taskWriterFactory = taskWriterFactory; } @@ -122,8 +103,7 @@ public void open(Configuration parameters) { public List map(CombinedScanTask task) throws Exception { // Initialize the task writer. this.writer = taskWriterFactory.create(); - try (RowDataIterator iterator = - new RowDataIterator(task, io, encryptionManager, schema, schema, nameMapping, caseSensitive)) { + try (RowDataIterator iterator = new RowDataIterator(table, task, table.schema(), caseSensitive)) { while (iterator.hasNext()) { RowData rowData = iterator.next(); writer.write(rowData); diff --git a/flink/src/main/java/org/apache/iceberg/flink/source/ScanContext.java b/flink/src/main/java/org/apache/iceberg/flink/source/ScanContext.java index 2896efb39655..85c990f90ca8 100644 --- a/flink/src/main/java/org/apache/iceberg/flink/source/ScanContext.java +++ b/flink/src/main/java/org/apache/iceberg/flink/source/ScanContext.java @@ -29,8 +29,6 @@ import org.apache.iceberg.Schema; import org.apache.iceberg.expressions.Expression; -import static org.apache.iceberg.TableProperties.DEFAULT_NAME_MAPPING; - /** * Context object with optional arguments for a Flink Scan. */ @@ -79,14 +77,13 @@ class ScanContext implements Serializable { private final boolean isStreaming; private final Duration monitorInterval; - private final String nameMapping; private final Schema schema; private final List filters; private final long limit; private ScanContext(boolean caseSensitive, Long snapshotId, Long startSnapshotId, Long endSnapshotId, Long asOfTimestamp, Long splitSize, Integer splitLookback, Long splitOpenFileCost, - boolean isStreaming, Duration monitorInterval, String nameMapping, + boolean isStreaming, Duration monitorInterval, Schema schema, List filters, long limit) { this.caseSensitive = caseSensitive; this.snapshotId = snapshotId; @@ -99,7 +96,6 @@ private ScanContext(boolean caseSensitive, Long snapshotId, Long startSnapshotId this.isStreaming = isStreaming; this.monitorInterval = monitorInterval; - this.nameMapping = nameMapping; this.schema = schema; this.filters = filters; this.limit = limit; @@ -145,10 +141,6 @@ Duration monitorInterval() { return monitorInterval; } - String nameMapping() { - return nameMapping; - } - Schema project() { return schema; } @@ -173,7 +165,6 @@ ScanContext copyWithAppendsBetween(long newStartSnapshotId, long newEndSnapshotI .splitOpenFileCost(splitOpenFileCost) .streaming(isStreaming) .monitorInterval(monitorInterval) - .nameMapping(nameMapping) .project(schema) .filters(filters) .limit(limit) @@ -192,7 +183,6 @@ ScanContext copyWithSnapshotId(long newSnapshotId) { .splitOpenFileCost(splitOpenFileCost) .streaming(isStreaming) .monitorInterval(monitorInterval) - .nameMapping(nameMapping) .project(schema) .filters(filters) .limit(limit) @@ -214,7 +204,6 @@ static class Builder { private Long splitOpenFileCost = SPLIT_FILE_OPEN_COST.defaultValue(); private boolean isStreaming = STREAMING.defaultValue(); private Duration monitorInterval = MONITOR_INTERVAL.defaultValue(); - private String nameMapping; private Schema projectedSchema; private List filters; private long limit = -1L; @@ -272,11 +261,6 @@ Builder monitorInterval(Duration newMonitorInterval) { return this; } - Builder nameMapping(String newNameMapping) { - this.nameMapping = newNameMapping; - return this; - } - Builder project(Schema newProjectedSchema) { this.projectedSchema = newProjectedSchema; return this; @@ -305,14 +289,13 @@ Builder fromProperties(Map properties) { .splitLookback(config.get(SPLIT_LOOKBACK)) .splitOpenFileCost(config.get(SPLIT_FILE_OPEN_COST)) .streaming(config.get(STREAMING)) - .monitorInterval(config.get(MONITOR_INTERVAL)) - .nameMapping(properties.get(DEFAULT_NAME_MAPPING)); + .monitorInterval(config.get(MONITOR_INTERVAL)); } public ScanContext build() { return new ScanContext(caseSensitive, snapshotId, startSnapshotId, endSnapshotId, asOfTimestamp, splitSize, splitLookback, - splitOpenFileCost, isStreaming, monitorInterval, nameMapping, projectedSchema, + splitOpenFileCost, isStreaming, monitorInterval, projectedSchema, filters, limit); } } diff --git a/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkScanSql.java b/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkScanSql.java index 9af1b7c65331..23ac5a4e4bd9 100644 --- a/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkScanSql.java +++ b/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkScanSql.java @@ -147,6 +147,9 @@ public void testInferedParallelism() throws IOException { long maxFileLen = Math.max(dataFile1.fileSizeInBytes(), dataFile2.fileSizeInBytes()); sql("ALTER TABLE t SET ('read.split.open-file-cost'='1', 'read.split.target-size'='%s')", maxFileLen); + table.refresh(); + flinkInputFormat = FlinkSource.forRowData().tableLoader(tableLoader).table(table).buildFormat(); + // 2 splits (max infer is the default value 100 , max > splits num), the parallelism is splits num : 2 parallelism = FlinkSource.forRowData().inferParallelism(flinkInputFormat, scanContext); Assert.assertEquals("Should produce the expected parallelism.", 2, parallelism);