diff --git a/core/src/main/java/org/apache/iceberg/SerializableTable.java b/core/src/main/java/org/apache/iceberg/SerializableTable.java index 4f5ddef0c61d..68b1166d27f5 100644 --- a/core/src/main/java/org/apache/iceberg/SerializableTable.java +++ b/core/src/main/java/org/apache/iceberg/SerializableTable.java @@ -257,6 +257,11 @@ public TableScan newScan() { return lazyTable().newScan(); } + @Override + public IncrementalAppendScan newIncrementalAppendScan() { + return lazyTable().newIncrementalAppendScan(); + } + @Override public Snapshot currentSnapshot() { return lazyTable().currentSnapshot(); diff --git a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/actions/RewriteDataFilesAction.java b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/actions/RewriteDataFilesAction.java index 9876bb3861c4..b94733010bf3 100644 --- a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/actions/RewriteDataFilesAction.java +++ b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/actions/RewriteDataFilesAction.java @@ -23,6 +23,7 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.iceberg.CombinedScanTask; import org.apache.iceberg.DataFile; +import org.apache.iceberg.SerializableTable; import org.apache.iceberg.Table; import org.apache.iceberg.actions.BaseRewriteDataFilesAction; import org.apache.iceberg.flink.source.RowDataRewriter; @@ -51,7 +52,7 @@ protected List rewriteDataForTasks(List combinedScan int parallelism = Math.min(size, maxParallelism); DataStream dataStream = env.fromCollection(combinedScanTasks); RowDataRewriter rowDataRewriter = - new RowDataRewriter(table(), caseSensitive(), fileIO(), encryptionManager()); + new RowDataRewriter((SerializableTable) SerializableTable.copyOf(table()), caseSensitive()); try { return rowDataRewriter.rewriteDataForTasks(dataStream, parallelism); } catch (Exception e) { diff --git a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/DataIterator.java b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/DataIterator.java index 91d975349b19..82ff08d039c3 100644 --- a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/DataIterator.java +++ b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/DataIterator.java @@ -24,10 +24,9 @@ import org.apache.flink.annotation.Internal; import org.apache.iceberg.CombinedScanTask; import org.apache.iceberg.FileScanTask; -import org.apache.iceberg.encryption.EncryptionManager; +import org.apache.iceberg.Table; import org.apache.iceberg.encryption.InputFilesDecryptor; import org.apache.iceberg.io.CloseableIterator; -import org.apache.iceberg.io.FileIO; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; /** @@ -49,13 +48,10 @@ public class DataIterator implements CloseableIterator { private long recordOffset; public DataIterator( - FileScanTaskReader fileScanTaskReader, - CombinedScanTask task, - FileIO io, - EncryptionManager encryption) { + Table table, FileScanTaskReader fileScanTaskReader, CombinedScanTask task) { this.fileScanTaskReader = fileScanTaskReader; - this.inputFilesDecryptor = new InputFilesDecryptor(task, io, encryption); + this.inputFilesDecryptor = new InputFilesDecryptor(task, table.io(), table.encryption()); this.combinedTask = task; this.tasks = task.files().iterator(); diff --git a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/FlinkInputFormat.java b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/FlinkInputFormat.java index 44b35522becb..dac46eee604e 100644 --- a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/FlinkInputFormat.java +++ b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/FlinkInputFormat.java @@ -29,10 +29,8 @@ import org.apache.flink.core.io.InputSplitAssigner; import org.apache.flink.table.data.RowData; import org.apache.iceberg.Schema; +import org.apache.iceberg.SerializableTable; import org.apache.iceberg.Table; -import org.apache.iceberg.encryption.EncryptionManager; -import org.apache.iceberg.flink.TableLoader; -import org.apache.iceberg.io.FileIO; import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting; import org.apache.iceberg.util.ThreadPools; @@ -41,28 +39,19 @@ public class FlinkInputFormat extends RichInputFormat private static final long serialVersionUID = 1L; - private final TableLoader tableLoader; - private final FileIO io; - private final EncryptionManager encryption; + private final Table table; private final ScanContext context; private final RowDataFileScanTaskReader rowDataReader; private transient DataIterator iterator; private transient long currentReadCount = 0L; - FlinkInputFormat( - TableLoader tableLoader, - Schema tableSchema, - FileIO io, - EncryptionManager encryption, - ScanContext context) { - this.tableLoader = tableLoader; - this.io = io; - this.encryption = encryption; + FlinkInputFormat(SerializableTable table, ScanContext context) { + this.table = table; this.context = context; this.rowDataReader = new RowDataFileScanTaskReader( - tableSchema, context.project(), context.nameMapping(), context.caseSensitive()); + table.schema(), context.project(), context.nameMapping(), context.caseSensitive()); } @VisibleForTesting @@ -79,15 +68,9 @@ public BaseStatistics getStatistics(BaseStatistics cachedStatistics) { @Override public FlinkInputSplit[] createInputSplits(int minNumSplits) throws IOException { // Called in Job manager, so it is OK to load table from catalog. - tableLoader.open(); final ExecutorService workerPool = ThreadPools.newWorkerPool("iceberg-plan-worker-pool", context.planParallelism()); - try (TableLoader loader = tableLoader) { - Table table = loader.loadTable(); - return FlinkSplitPlanner.planInputSplits(table, context, workerPool); - } finally { - workerPool.shutdown(); - } + return FlinkSplitPlanner.planInputSplits(table, context, workerPool); } @Override @@ -102,7 +85,7 @@ public void configure(Configuration parameters) {} @Override public void open(FlinkInputSplit split) { - this.iterator = new DataIterator<>(rowDataReader, split.getTask(), io, encryption); + this.iterator = new DataIterator<>(table, rowDataReader, split.getTask()); } @Override diff --git a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/FlinkSource.java b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/FlinkSource.java index cb1c5b088956..ef03de3f6bd3 100644 --- a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/FlinkSource.java +++ b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/FlinkSource.java @@ -31,15 +31,14 @@ import org.apache.flink.table.api.TableSchema; import org.apache.flink.table.data.RowData; import org.apache.iceberg.Schema; +import org.apache.iceberg.SerializableTable; import org.apache.iceberg.Table; import org.apache.iceberg.TableScan; -import org.apache.iceberg.encryption.EncryptionManager; import org.apache.iceberg.expressions.Expression; import org.apache.iceberg.flink.FlinkConfigOptions; import org.apache.iceberg.flink.FlinkSchemaUtil; import org.apache.iceberg.flink.TableLoader; import org.apache.iceberg.flink.util.FlinkCompatibilityUtil; -import org.apache.iceberg.io.FileIO; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -189,25 +188,17 @@ public FlinkInputFormat buildFormat() { Preconditions.checkNotNull(tableLoader, "TableLoader should not be null"); Schema icebergSchema; - FileIO io; - EncryptionManager encryption; if (table == null) { // load required fields by table loader. tableLoader.open(); try (TableLoader loader = tableLoader) { table = loader.loadTable(); - icebergSchema = table.schema(); - io = table.io(); - encryption = table.encryption(); } catch (IOException e) { throw new UncheckedIOException(e); } - } else { - icebergSchema = table.schema(); - io = table.io(); - encryption = table.encryption(); } + icebergSchema = table.schema(); if (projectedSchema == null) { contextBuilder.project(icebergSchema); } else { @@ -220,7 +211,7 @@ public FlinkInputFormat buildFormat() { readableConfig.get(FlinkConfigOptions.TABLE_EXEC_ICEBERG_WORKER_POOL_SIZE)); return new FlinkInputFormat( - tableLoader, icebergSchema, io, encryption, contextBuilder.build()); + (SerializableTable) SerializableTable.copyOf(table), contextBuilder.build()); } public DataStream build() { diff --git a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/IcebergSource.java b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/IcebergSource.java index 9728aeb2b394..c970ee54d555 100644 --- a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/IcebergSource.java +++ b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/IcebergSource.java @@ -40,6 +40,7 @@ import org.apache.flink.table.data.RowData; import org.apache.flink.util.Preconditions; import org.apache.iceberg.Schema; +import org.apache.iceberg.SerializableTable; import org.apache.iceberg.Table; import org.apache.iceberg.expressions.Expression; import org.apache.iceberg.flink.FlinkSchemaUtil; @@ -357,13 +358,11 @@ public IcebergSource build() { if (readerFunction == null) { RowDataReaderFunction rowDataReaderFunction = new RowDataReaderFunction( + (SerializableTable) SerializableTable.copyOf(table), flinkConfig, - table.schema(), context.project(), context.nameMapping(), - context.caseSensitive(), - table.io(), - table.encryption()); + context.caseSensitive()); this.readerFunction = (ReaderFunction) rowDataReaderFunction; } diff --git a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/RowDataRewriter.java b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/RowDataRewriter.java index 23665b7c9f0f..595b38ab5a0d 100644 --- a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/RowDataRewriter.java +++ b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/RowDataRewriter.java @@ -31,15 +31,12 @@ import org.apache.iceberg.CombinedScanTask; import org.apache.iceberg.DataFile; import org.apache.iceberg.FileFormat; -import org.apache.iceberg.Schema; import org.apache.iceberg.SerializableTable; import org.apache.iceberg.Table; import org.apache.iceberg.TableProperties; -import org.apache.iceberg.encryption.EncryptionManager; import org.apache.iceberg.flink.FlinkSchemaUtil; import org.apache.iceberg.flink.sink.RowDataTaskWriterFactory; import org.apache.iceberg.flink.sink.TaskWriterFactory; -import org.apache.iceberg.io.FileIO; import org.apache.iceberg.io.TaskWriter; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.util.PropertyUtil; @@ -50,22 +47,14 @@ public class RowDataRewriter { private static final Logger LOG = LoggerFactory.getLogger(RowDataRewriter.class); - private final Schema schema; - private final String nameMapping; - private final FileIO io; + private final Table table; private final boolean caseSensitive; - private final EncryptionManager encryptionManager; private final TaskWriterFactory taskWriterFactory; private final String tableName; - public RowDataRewriter( - Table table, boolean caseSensitive, FileIO io, EncryptionManager encryptionManager) { - this.schema = table.schema(); + public RowDataRewriter(SerializableTable table, boolean caseSensitive) { + this.table = table; this.caseSensitive = caseSensitive; - this.io = io; - this.encryptionManager = encryptionManager; - this.nameMapping = - PropertyUtil.propertyAsString(table.properties(), DEFAULT_NAME_MAPPING, null); this.tableName = table.name(); String formatString = @@ -77,20 +66,12 @@ public RowDataRewriter( RowType flinkSchema = FlinkSchemaUtil.convert(table.schema()); this.taskWriterFactory = new RowDataTaskWriterFactory( - SerializableTable.copyOf(table), - flinkSchema, - Long.MAX_VALUE, - format, - table.properties(), - null, - false); + table, flinkSchema, Long.MAX_VALUE, format, table.properties(), null, false); } public List rewriteDataForTasks( DataStream dataStream, int parallelism) throws Exception { - RewriteMap map = - new RewriteMap( - schema, nameMapping, io, caseSensitive, encryptionManager, taskWriterFactory); + RewriteMap map = new RewriteMap(table, caseSensitive, taskWriterFactory); DataStream> ds = dataStream.map(map).setParallelism(parallelism); return Lists.newArrayList(ds.executeAndCollect("Rewrite table :" + tableName)).stream() .flatMap(Collection::stream) @@ -103,29 +84,20 @@ public static class RewriteMap extends RichMapFunction taskWriterFactory; private final RowDataFileScanTaskReader rowDataReader; public RewriteMap( - Schema schema, - String nameMapping, - FileIO io, - boolean caseSensitive, - EncryptionManager encryptionManager, - TaskWriterFactory taskWriterFactory) { - this.schema = schema; - this.nameMapping = nameMapping; - this.io = io; - this.caseSensitive = caseSensitive; - this.encryptionManager = encryptionManager; + Table table, boolean caseSensitive, TaskWriterFactory taskWriterFactory) { + this.table = table; this.taskWriterFactory = taskWriterFactory; this.rowDataReader = - new RowDataFileScanTaskReader(schema, schema, nameMapping, caseSensitive); + new RowDataFileScanTaskReader( + table.schema(), + table.schema(), + table.properties().get(DEFAULT_NAME_MAPPING), + caseSensitive); } @Override @@ -140,8 +112,7 @@ public void open(Configuration parameters) { public List map(CombinedScanTask task) throws Exception { // Initialize the task writer. this.writer = taskWriterFactory.create(); - try (DataIterator iterator = - new DataIterator<>(rowDataReader, task, io, encryptionManager)) { + try (DataIterator iterator = new DataIterator<>(table, rowDataReader, task)) { while (iterator.hasNext()) { RowData rowData = iterator.next(); writer.write(rowData); diff --git a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/reader/RowDataReaderFunction.java b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/reader/RowDataReaderFunction.java index c747375d2a28..080fea50f7b1 100644 --- a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/reader/RowDataReaderFunction.java +++ b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/reader/RowDataReaderFunction.java @@ -21,50 +21,43 @@ import org.apache.flink.configuration.ReadableConfig; import org.apache.flink.table.data.RowData; import org.apache.iceberg.Schema; -import org.apache.iceberg.encryption.EncryptionManager; +import org.apache.iceberg.SerializableTable; +import org.apache.iceberg.Table; import org.apache.iceberg.flink.FlinkSchemaUtil; import org.apache.iceberg.flink.source.DataIterator; import org.apache.iceberg.flink.source.RowDataFileScanTaskReader; import org.apache.iceberg.flink.source.split.IcebergSourceSplit; -import org.apache.iceberg.io.FileIO; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; public class RowDataReaderFunction extends DataIteratorReaderFunction { - private final Schema tableSchema; + private final Table table; private final Schema readSchema; private final String nameMapping; private final boolean caseSensitive; - private final FileIO io; - private final EncryptionManager encryption; public RowDataReaderFunction( + SerializableTable table, ReadableConfig config, - Schema tableSchema, Schema projectedSchema, String nameMapping, - boolean caseSensitive, - FileIO io, - EncryptionManager encryption) { + boolean caseSensitive) { super( new ArrayPoolDataIteratorBatcher<>( config, new RowDataRecordFactory( - FlinkSchemaUtil.convert(readSchema(tableSchema, projectedSchema))))); - this.tableSchema = tableSchema; - this.readSchema = readSchema(tableSchema, projectedSchema); + FlinkSchemaUtil.convert(readSchema(table.schema(), projectedSchema))))); + this.table = table; + this.readSchema = readSchema(table.schema(), projectedSchema); this.nameMapping = nameMapping; this.caseSensitive = caseSensitive; - this.io = io; - this.encryption = encryption; } @Override public DataIterator createDataIterator(IcebergSourceSplit split) { return new DataIterator<>( - new RowDataFileScanTaskReader(tableSchema, readSchema, nameMapping, caseSensitive), - split.task(), - io, - encryption); + table, + new RowDataFileScanTaskReader(table.schema(), readSchema, nameMapping, caseSensitive), + split.task()); } private static Schema readSchema(Schema tableSchema, Schema projectedSchema) { diff --git a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/reader/ReaderFunctionTestBase.java b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/reader/ReaderFunctionTestBase.java index 0d33e4ed08ad..7db87cfe5b90 100644 --- a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/reader/ReaderFunctionTestBase.java +++ b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/reader/ReaderFunctionTestBase.java @@ -24,14 +24,17 @@ import org.apache.iceberg.CombinedScanTask; import org.apache.iceberg.FileFormat; import org.apache.iceberg.Schema; +import org.apache.iceberg.Table; import org.apache.iceberg.data.GenericAppenderFactory; import org.apache.iceberg.data.Record; +import org.apache.iceberg.flink.HadoopTableResource; import org.apache.iceberg.flink.TestFixtures; import org.apache.iceberg.flink.source.split.IcebergSourceSplit; import org.apache.iceberg.io.CloseableIterator; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.junit.Assert; import org.junit.ClassRule; +import org.junit.Rule; import org.junit.Test; import org.junit.rules.TemporaryFolder; import org.junit.runner.RunWith; @@ -51,6 +54,11 @@ public static Object[][] parameters() { @ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder(); + @Rule + public final HadoopTableResource tableResource = + new HadoopTableResource( + TEMPORARY_FOLDER, TestFixtures.DATABASE, TestFixtures.TABLE, TestFixtures.SCHEMA); + protected abstract ReaderFunction readerFunction(); protected abstract void assertRecords(List expected, List actual, Schema schema); @@ -63,6 +71,10 @@ public ReaderFunctionTestBase(FileFormat fileFormat) { this.appenderFactory = new GenericAppenderFactory(TestFixtures.SCHEMA); } + public Table table() { + return tableResource.table(); + } + private void assertRecordsAndPosition( List expectedRecords, int expectedFileOffset, diff --git a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/reader/ReaderUtil.java b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/reader/ReaderUtil.java index b2ccbff2d6c2..93416f2d4e37 100644 --- a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/reader/ReaderUtil.java +++ b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/reader/ReaderUtil.java @@ -34,16 +34,15 @@ import org.apache.iceberg.PartitionSpecParser; import org.apache.iceberg.Schema; import org.apache.iceberg.SchemaParser; +import org.apache.iceberg.SerializableTable; import org.apache.iceberg.data.GenericAppenderFactory; import org.apache.iceberg.data.RandomGenericData; import org.apache.iceberg.data.Record; -import org.apache.iceberg.encryption.PlaintextEncryptionManager; import org.apache.iceberg.expressions.Expressions; import org.apache.iceberg.expressions.ResidualEvaluator; import org.apache.iceberg.flink.TestFixtures; import org.apache.iceberg.flink.source.DataIterator; import org.apache.iceberg.flink.source.RowDataFileScanTaskReader; -import org.apache.iceberg.hadoop.HadoopFileIO; import org.apache.iceberg.io.FileAppender; import org.apache.iceberg.io.FileAppenderFactory; import org.apache.iceberg.relocated.com.google.common.collect.Lists; @@ -81,12 +80,12 @@ public static FileScanTask createFileTask( residuals); } - public static DataIterator createDataIterator(CombinedScanTask combinedTask) { + public static DataIterator createDataIterator( + SerializableTable table, CombinedScanTask combinedTask) { return new DataIterator<>( + table, new RowDataFileScanTaskReader(TestFixtures.SCHEMA, TestFixtures.SCHEMA, null, true), - combinedTask, - new HadoopFileIO(new org.apache.hadoop.conf.Configuration()), - new PlaintextEncryptionManager()); + combinedTask); } public static List> createRecordBatchList( diff --git a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/reader/TestArrayPoolDataIteratorBatcherRowData.java b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/reader/TestArrayPoolDataIteratorBatcherRowData.java index f964a7707689..2990b74ce01c 100644 --- a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/reader/TestArrayPoolDataIteratorBatcherRowData.java +++ b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/reader/TestArrayPoolDataIteratorBatcherRowData.java @@ -28,16 +28,19 @@ import org.apache.iceberg.CombinedScanTask; import org.apache.iceberg.FileFormat; import org.apache.iceberg.FileScanTask; +import org.apache.iceberg.SerializableTable; import org.apache.iceberg.data.GenericAppenderFactory; import org.apache.iceberg.data.RandomGenericData; import org.apache.iceberg.data.Record; import org.apache.iceberg.flink.FlinkConfigOptions; +import org.apache.iceberg.flink.HadoopTableResource; import org.apache.iceberg.flink.TestFixtures; import org.apache.iceberg.flink.TestHelpers; import org.apache.iceberg.flink.source.DataIterator; import org.apache.iceberg.io.CloseableIterator; import org.junit.Assert; import org.junit.ClassRule; +import org.junit.Rule; import org.junit.Test; import org.junit.rules.TemporaryFolder; @@ -46,6 +49,11 @@ public class TestArrayPoolDataIteratorBatcherRowData { @ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder(); private static final FileFormat fileFormat = FileFormat.PARQUET; + @Rule + public final HadoopTableResource tableResource = + new HadoopTableResource( + TEMPORARY_FOLDER, TestFixtures.DATABASE, TestFixtures.TABLE, TestFixtures.SCHEMA); + private final GenericAppenderFactory appenderFactory; private final DataIteratorBatcher batcher; @@ -67,7 +75,9 @@ public void testSingleFileLessThanOneFullBatch() throws Exception { FileScanTask fileTask = ReaderUtil.createFileTask(records, TEMPORARY_FOLDER.newFile(), fileFormat, appenderFactory); CombinedScanTask combinedTask = new BaseCombinedScanTask(fileTask); - DataIterator dataIterator = ReaderUtil.createDataIterator(combinedTask); + DataIterator dataIterator = + ReaderUtil.createDataIterator( + (SerializableTable) SerializableTable.copyOf(tableResource.table()), combinedTask); String splitId = "someSplitId"; CloseableIterator>> recordBatchIterator = batcher.batch(splitId, dataIterator); @@ -109,7 +119,9 @@ public void testSingleFileWithMultipleBatches() throws Exception { FileScanTask fileTask = ReaderUtil.createFileTask(records, TEMPORARY_FOLDER.newFile(), fileFormat, appenderFactory); CombinedScanTask combinedTask = new BaseCombinedScanTask(fileTask); - DataIterator dataIterator = ReaderUtil.createDataIterator(combinedTask); + DataIterator dataIterator = + ReaderUtil.createDataIterator( + (SerializableTable) SerializableTable.copyOf(tableResource.table()), combinedTask); String splitId = "someSplitId"; CloseableIterator>> recordBatchIterator = batcher.batch(splitId, dataIterator); @@ -226,7 +238,9 @@ public void testMultipleFilesWithSeekPosition() throws Exception { CombinedScanTask combinedTask = new BaseCombinedScanTask(Arrays.asList(fileTask0, fileTask1, fileTask2)); - DataIterator dataIterator = ReaderUtil.createDataIterator(combinedTask); + DataIterator dataIterator = + ReaderUtil.createDataIterator( + (SerializableTable) SerializableTable.copyOf(tableResource.table()), combinedTask); // seek to file1 and after record 1 dataIterator.seek(1, 1); diff --git a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/reader/TestIcebergSourceReader.java b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/reader/TestIcebergSourceReader.java index ddc144be883c..30864726a20b 100644 --- a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/reader/TestIcebergSourceReader.java +++ b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/reader/TestIcebergSourceReader.java @@ -28,21 +28,27 @@ import org.apache.flink.table.data.RowData; import org.apache.iceberg.CombinedScanTask; import org.apache.iceberg.FileFormat; +import org.apache.iceberg.SerializableTable; import org.apache.iceberg.data.GenericAppenderFactory; import org.apache.iceberg.data.Record; -import org.apache.iceberg.encryption.PlaintextEncryptionManager; +import org.apache.iceberg.flink.HadoopTableResource; import org.apache.iceberg.flink.TestFixtures; import org.apache.iceberg.flink.TestHelpers; import org.apache.iceberg.flink.source.split.IcebergSourceSplit; -import org.apache.iceberg.hadoop.HadoopFileIO; import org.junit.Assert; import org.junit.ClassRule; +import org.junit.Rule; import org.junit.Test; import org.junit.rules.TemporaryFolder; public class TestIcebergSourceReader { @ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder(); + @Rule + public final HadoopTableResource tableResource = + new HadoopTableResource( + TEMPORARY_FOLDER, TestFixtures.DATABASE, TestFixtures.TABLE, TestFixtures.SCHEMA); + private final GenericAppenderFactory appenderFactory; public TestIcebergSourceReader() { @@ -100,13 +106,11 @@ private IcebergSourceReader createReader( new IcebergSourceReaderMetrics(metricGroup, "db.tbl"); RowDataReaderFunction readerFunction = new RowDataReaderFunction( + (SerializableTable) SerializableTable.copyOf(tableResource.table()), new Configuration(), TestFixtures.SCHEMA, - TestFixtures.SCHEMA, null, - true, - new HadoopFileIO(new org.apache.hadoop.conf.Configuration()), - new PlaintextEncryptionManager()); + true); return new IcebergSourceReader<>(readerMetrics, readerFunction, readerContext); } } diff --git a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/reader/TestRowDataReaderFunction.java b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/reader/TestRowDataReaderFunction.java index aee271a3a7b8..826f61b7b1ee 100644 --- a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/reader/TestRowDataReaderFunction.java +++ b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/reader/TestRowDataReaderFunction.java @@ -29,12 +29,11 @@ import org.apache.flink.types.Row; import org.apache.iceberg.FileFormat; import org.apache.iceberg.Schema; +import org.apache.iceberg.SerializableTable; import org.apache.iceberg.data.Record; -import org.apache.iceberg.encryption.PlaintextEncryptionManager; import org.apache.iceberg.flink.FlinkSchemaUtil; import org.apache.iceberg.flink.TestFixtures; import org.apache.iceberg.flink.TestHelpers; -import org.apache.iceberg.hadoop.HadoopFileIO; public class TestRowDataReaderFunction extends ReaderFunctionTestBase { @@ -49,13 +48,11 @@ public TestRowDataReaderFunction(FileFormat fileFormat) { @Override protected ReaderFunction readerFunction() { return new RowDataReaderFunction( + (SerializableTable) SerializableTable.copyOf(table()), new Configuration(), TestFixtures.SCHEMA, - TestFixtures.SCHEMA, null, - true, - new HadoopFileIO(new org.apache.hadoop.conf.Configuration()), - new PlaintextEncryptionManager()); + true); } @Override