From 27e9f1349b3bdb521e9ded91c065cff1311eab7a Mon Sep 17 00:00:00 2001 From: Junjie Chen Date: Mon, 12 Dec 2022 14:52:06 +0800 Subject: [PATCH 1/4] Flink: use SerializableTable for source --- .../iceberg/data/GenericAppenderFactory.java | 5 ++ .../flink/actions/RewriteDataFilesAction.java | 3 +- .../iceberg/flink/source/DataIterator.java | 18 ++++++ .../flink/source/FlinkInputFormat.java | 31 +++------- .../iceberg/flink/source/FlinkSource.java | 16 ++---- .../iceberg/flink/source/IcebergSource.java | 8 +-- .../iceberg/flink/source/RowDataRewriter.java | 56 +++++-------------- .../source/reader/RowDataReaderFunction.java | 38 +++++-------- .../source/reader/ReaderFunctionTestBase.java | 20 ++++++- .../reader/TestIcebergSourceReader.java | 30 ++++++---- .../reader/TestRowDataReaderFunction.java | 14 +---- 11 files changed, 104 insertions(+), 135 deletions(-) diff --git a/data/src/main/java/org/apache/iceberg/data/GenericAppenderFactory.java b/data/src/main/java/org/apache/iceberg/data/GenericAppenderFactory.java index 23a94ebc9944..e6ec11853529 100644 --- a/data/src/main/java/org/apache/iceberg/data/GenericAppenderFactory.java +++ b/data/src/main/java/org/apache/iceberg/data/GenericAppenderFactory.java @@ -26,6 +26,7 @@ import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; import org.apache.iceberg.StructLike; +import org.apache.iceberg.Table; import org.apache.iceberg.avro.Avro; import org.apache.iceberg.data.avro.DataWriter; import org.apache.iceberg.data.orc.GenericOrcWriter; @@ -59,6 +60,10 @@ public GenericAppenderFactory(Schema schema, PartitionSpec spec) { this(schema, spec, null, null, null); } + public GenericAppenderFactory(Table table) { + this(table.schema(), table.spec(), null, null, null); + } + public GenericAppenderFactory( Schema schema, PartitionSpec spec, diff --git a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/actions/RewriteDataFilesAction.java b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/actions/RewriteDataFilesAction.java index 9876bb3861c4..309757bc5c94 100644 --- a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/actions/RewriteDataFilesAction.java +++ b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/actions/RewriteDataFilesAction.java @@ -50,8 +50,7 @@ protected List rewriteDataForTasks(List combinedScan int size = combinedScanTasks.size(); int parallelism = Math.min(size, maxParallelism); DataStream dataStream = env.fromCollection(combinedScanTasks); - RowDataRewriter rowDataRewriter = - new RowDataRewriter(table(), caseSensitive(), fileIO(), encryptionManager()); + RowDataRewriter rowDataRewriter = new RowDataRewriter(table(), caseSensitive()); try { return rowDataRewriter.rewriteDataForTasks(dataStream, parallelism); } catch (Exception e) { diff --git a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/DataIterator.java b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/DataIterator.java index 91d975349b19..2cff13447c4e 100644 --- a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/DataIterator.java +++ b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/DataIterator.java @@ -24,6 +24,7 @@ import org.apache.flink.annotation.Internal; import org.apache.iceberg.CombinedScanTask; import org.apache.iceberg.FileScanTask; +import org.apache.iceberg.Table; import org.apache.iceberg.encryption.EncryptionManager; import org.apache.iceberg.encryption.InputFilesDecryptor; import org.apache.iceberg.io.CloseableIterator; @@ -68,6 +69,23 @@ public DataIterator( this.recordOffset = 0L; } + public DataIterator( + Table table, FileScanTaskReader fileScanTaskReader, CombinedScanTask task) { + this.fileScanTaskReader = fileScanTaskReader; + + this.inputFilesDecryptor = new InputFilesDecryptor(task, table.io(), table.encryption()); + this.combinedTask = task; + + this.tasks = task.files().iterator(); + this.currentIterator = CloseableIterator.empty(); + + // fileOffset starts at -1 because we started + // from an empty iterator that is not from the split files. + this.fileOffset = -1; + // record offset points to the record that next() should return when called + this.recordOffset = 0L; + } + /** * (startingFileOffset, startingRecordOffset) points to the next row that reader should resume * from. E.g., if the seek position is (file=0, record=1), seek moves the iterator position to the diff --git a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/FlinkInputFormat.java b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/FlinkInputFormat.java index 44b35522becb..35878d2398fa 100644 --- a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/FlinkInputFormat.java +++ b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/FlinkInputFormat.java @@ -29,10 +29,8 @@ import org.apache.flink.core.io.InputSplitAssigner; import org.apache.flink.table.data.RowData; import org.apache.iceberg.Schema; +import org.apache.iceberg.SerializableTable; import org.apache.iceberg.Table; -import org.apache.iceberg.encryption.EncryptionManager; -import org.apache.iceberg.flink.TableLoader; -import org.apache.iceberg.io.FileIO; import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting; import org.apache.iceberg.util.ThreadPools; @@ -41,28 +39,19 @@ public class FlinkInputFormat extends RichInputFormat private static final long serialVersionUID = 1L; - private final TableLoader tableLoader; - private final FileIO io; - private final EncryptionManager encryption; private final ScanContext context; private final RowDataFileScanTaskReader rowDataReader; + private final Table table; private transient DataIterator iterator; private transient long currentReadCount = 0L; - FlinkInputFormat( - TableLoader tableLoader, - Schema tableSchema, - FileIO io, - EncryptionManager encryption, - ScanContext context) { - this.tableLoader = tableLoader; - this.io = io; - this.encryption = encryption; + FlinkInputFormat(SerializableTable table, ScanContext context) { + this.table = table; this.context = context; this.rowDataReader = new RowDataFileScanTaskReader( - tableSchema, context.project(), context.nameMapping(), context.caseSensitive()); + table.schema(), context.project(), context.nameMapping(), context.caseSensitive()); } @VisibleForTesting @@ -79,15 +68,9 @@ public BaseStatistics getStatistics(BaseStatistics cachedStatistics) { @Override public FlinkInputSplit[] createInputSplits(int minNumSplits) throws IOException { // Called in Job manager, so it is OK to load table from catalog. - tableLoader.open(); final ExecutorService workerPool = ThreadPools.newWorkerPool("iceberg-plan-worker-pool", context.planParallelism()); - try (TableLoader loader = tableLoader) { - Table table = loader.loadTable(); - return FlinkSplitPlanner.planInputSplits(table, context, workerPool); - } finally { - workerPool.shutdown(); - } + return FlinkSplitPlanner.planInputSplits(table, context, workerPool); } @Override @@ -102,7 +85,7 @@ public void configure(Configuration parameters) {} @Override public void open(FlinkInputSplit split) { - this.iterator = new DataIterator<>(rowDataReader, split.getTask(), io, encryption); + this.iterator = new DataIterator<>(table, rowDataReader, split.getTask()); } @Override diff --git a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/FlinkSource.java b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/FlinkSource.java index cb1c5b088956..06f444de0083 100644 --- a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/FlinkSource.java +++ b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/FlinkSource.java @@ -31,15 +31,14 @@ import org.apache.flink.table.api.TableSchema; import org.apache.flink.table.data.RowData; import org.apache.iceberg.Schema; +import org.apache.iceberg.SerializableTable; import org.apache.iceberg.Table; import org.apache.iceberg.TableScan; -import org.apache.iceberg.encryption.EncryptionManager; import org.apache.iceberg.expressions.Expression; import org.apache.iceberg.flink.FlinkConfigOptions; import org.apache.iceberg.flink.FlinkSchemaUtil; import org.apache.iceberg.flink.TableLoader; import org.apache.iceberg.flink.util.FlinkCompatibilityUtil; -import org.apache.iceberg.io.FileIO; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -189,25 +188,18 @@ public FlinkInputFormat buildFormat() { Preconditions.checkNotNull(tableLoader, "TableLoader should not be null"); Schema icebergSchema; - FileIO io; - EncryptionManager encryption; if (table == null) { // load required fields by table loader. tableLoader.open(); try (TableLoader loader = tableLoader) { table = loader.loadTable(); - icebergSchema = table.schema(); - io = table.io(); - encryption = table.encryption(); } catch (IOException e) { throw new UncheckedIOException(e); } - } else { - icebergSchema = table.schema(); - io = table.io(); - encryption = table.encryption(); } + icebergSchema = table.schema(); + if (projectedSchema == null) { contextBuilder.project(icebergSchema); } else { @@ -220,7 +212,7 @@ public FlinkInputFormat buildFormat() { readableConfig.get(FlinkConfigOptions.TABLE_EXEC_ICEBERG_WORKER_POOL_SIZE)); return new FlinkInputFormat( - tableLoader, icebergSchema, io, encryption, contextBuilder.build()); + (SerializableTable) SerializableTable.copyOf(table), contextBuilder.build()); } public DataStream build() { diff --git a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/IcebergSource.java b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/IcebergSource.java index 9728aeb2b394..6a912e8de76e 100644 --- a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/IcebergSource.java +++ b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/IcebergSource.java @@ -357,13 +357,7 @@ public IcebergSource build() { if (readerFunction == null) { RowDataReaderFunction rowDataReaderFunction = new RowDataReaderFunction( - flinkConfig, - table.schema(), - context.project(), - context.nameMapping(), - context.caseSensitive(), - table.io(), - table.encryption()); + table, flinkConfig, context.project(), context.caseSensitive()); this.readerFunction = (ReaderFunction) rowDataReaderFunction; } diff --git a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/RowDataRewriter.java b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/RowDataRewriter.java index 23665b7c9f0f..67c4a58a03b3 100644 --- a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/RowDataRewriter.java +++ b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/RowDataRewriter.java @@ -31,15 +31,11 @@ import org.apache.iceberg.CombinedScanTask; import org.apache.iceberg.DataFile; import org.apache.iceberg.FileFormat; -import org.apache.iceberg.Schema; -import org.apache.iceberg.SerializableTable; import org.apache.iceberg.Table; import org.apache.iceberg.TableProperties; -import org.apache.iceberg.encryption.EncryptionManager; import org.apache.iceberg.flink.FlinkSchemaUtil; import org.apache.iceberg.flink.sink.RowDataTaskWriterFactory; import org.apache.iceberg.flink.sink.TaskWriterFactory; -import org.apache.iceberg.io.FileIO; import org.apache.iceberg.io.TaskWriter; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.util.PropertyUtil; @@ -50,22 +46,14 @@ public class RowDataRewriter { private static final Logger LOG = LoggerFactory.getLogger(RowDataRewriter.class); - private final Schema schema; - private final String nameMapping; - private final FileIO io; + private final Table table; private final boolean caseSensitive; - private final EncryptionManager encryptionManager; private final TaskWriterFactory taskWriterFactory; private final String tableName; - public RowDataRewriter( - Table table, boolean caseSensitive, FileIO io, EncryptionManager encryptionManager) { - this.schema = table.schema(); + public RowDataRewriter(Table table, boolean caseSensitive) { + this.table = table; this.caseSensitive = caseSensitive; - this.io = io; - this.encryptionManager = encryptionManager; - this.nameMapping = - PropertyUtil.propertyAsString(table.properties(), DEFAULT_NAME_MAPPING, null); this.tableName = table.name(); String formatString = @@ -77,20 +65,12 @@ public RowDataRewriter( RowType flinkSchema = FlinkSchemaUtil.convert(table.schema()); this.taskWriterFactory = new RowDataTaskWriterFactory( - SerializableTable.copyOf(table), - flinkSchema, - Long.MAX_VALUE, - format, - table.properties(), - null, - false); + table, flinkSchema, Long.MAX_VALUE, format, table.properties(), null, false); } public List rewriteDataForTasks( DataStream dataStream, int parallelism) throws Exception { - RewriteMap map = - new RewriteMap( - schema, nameMapping, io, caseSensitive, encryptionManager, taskWriterFactory); + RewriteMap map = new RewriteMap(table, caseSensitive, taskWriterFactory); DataStream> ds = dataStream.map(map).setParallelism(parallelism); return Lists.newArrayList(ds.executeAndCollect("Rewrite table :" + tableName)).stream() .flatMap(Collection::stream) @@ -103,29 +83,22 @@ public static class RewriteMap extends RichMapFunction taskWriterFactory; private final RowDataFileScanTaskReader rowDataReader; public RewriteMap( - Schema schema, - String nameMapping, - FileIO io, - boolean caseSensitive, - EncryptionManager encryptionManager, - TaskWriterFactory taskWriterFactory) { - this.schema = schema; - this.nameMapping = nameMapping; - this.io = io; + Table table, boolean caseSensitive, TaskWriterFactory taskWriterFactory) { + this.table = table; this.caseSensitive = caseSensitive; - this.encryptionManager = encryptionManager; this.taskWriterFactory = taskWriterFactory; this.rowDataReader = - new RowDataFileScanTaskReader(schema, schema, nameMapping, caseSensitive); + new RowDataFileScanTaskReader( + table.schema(), + table.schema(), + table.properties().get(DEFAULT_NAME_MAPPING), + caseSensitive); } @Override @@ -140,8 +113,7 @@ public void open(Configuration parameters) { public List map(CombinedScanTask task) throws Exception { // Initialize the task writer. this.writer = taskWriterFactory.create(); - try (DataIterator iterator = - new DataIterator<>(rowDataReader, task, io, encryptionManager)) { + try (DataIterator iterator = new DataIterator<>(table, rowDataReader, task)) { while (iterator.hasNext()) { RowData rowData = iterator.next(); writer.write(rowData); diff --git a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/reader/RowDataReaderFunction.java b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/reader/RowDataReaderFunction.java index c747375d2a28..9332fe5f6f59 100644 --- a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/reader/RowDataReaderFunction.java +++ b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/reader/RowDataReaderFunction.java @@ -18,53 +18,45 @@ */ package org.apache.iceberg.flink.source.reader; +import static org.apache.iceberg.TableProperties.DEFAULT_NAME_MAPPING; + import org.apache.flink.configuration.ReadableConfig; import org.apache.flink.table.data.RowData; import org.apache.iceberg.Schema; -import org.apache.iceberg.encryption.EncryptionManager; +import org.apache.iceberg.Table; import org.apache.iceberg.flink.FlinkSchemaUtil; import org.apache.iceberg.flink.source.DataIterator; import org.apache.iceberg.flink.source.RowDataFileScanTaskReader; import org.apache.iceberg.flink.source.split.IcebergSourceSplit; -import org.apache.iceberg.io.FileIO; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; public class RowDataReaderFunction extends DataIteratorReaderFunction { - private final Schema tableSchema; private final Schema readSchema; - private final String nameMapping; private final boolean caseSensitive; - private final FileIO io; - private final EncryptionManager encryption; + private final Table table; public RowDataReaderFunction( - ReadableConfig config, - Schema tableSchema, - Schema projectedSchema, - String nameMapping, - boolean caseSensitive, - FileIO io, - EncryptionManager encryption) { + Table table, ReadableConfig config, Schema projectedSchema, boolean caseSensitive) { super( new ArrayPoolDataIteratorBatcher<>( config, new RowDataRecordFactory( - FlinkSchemaUtil.convert(readSchema(tableSchema, projectedSchema))))); - this.tableSchema = tableSchema; - this.readSchema = readSchema(tableSchema, projectedSchema); - this.nameMapping = nameMapping; + FlinkSchemaUtil.convert(readSchema(table.schema(), projectedSchema))))); + this.readSchema = readSchema(table.schema(), projectedSchema); this.caseSensitive = caseSensitive; - this.io = io; - this.encryption = encryption; + this.table = table; } @Override public DataIterator createDataIterator(IcebergSourceSplit split) { return new DataIterator<>( - new RowDataFileScanTaskReader(tableSchema, readSchema, nameMapping, caseSensitive), - split.task(), - io, - encryption); + table, + new RowDataFileScanTaskReader( + table.schema(), + readSchema, + table.properties().get(DEFAULT_NAME_MAPPING), + caseSensitive), + split.task()); } private static Schema readSchema(Schema tableSchema, Schema projectedSchema) { diff --git a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/reader/ReaderFunctionTestBase.java b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/reader/ReaderFunctionTestBase.java index 0d33e4ed08ad..4174c9971265 100644 --- a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/reader/ReaderFunctionTestBase.java +++ b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/reader/ReaderFunctionTestBase.java @@ -21,13 +21,17 @@ import java.io.IOException; import java.util.List; import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds; +import org.apache.hadoop.conf.Configuration; import org.apache.iceberg.CombinedScanTask; import org.apache.iceberg.FileFormat; import org.apache.iceberg.Schema; +import org.apache.iceberg.Table; +import org.apache.iceberg.TableProperties; import org.apache.iceberg.data.GenericAppenderFactory; import org.apache.iceberg.data.Record; import org.apache.iceberg.flink.TestFixtures; import org.apache.iceberg.flink.source.split.IcebergSourceSplit; +import org.apache.iceberg.hadoop.HadoopTables; import org.apache.iceberg.io.CloseableIterator; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.junit.Assert; @@ -51,16 +55,28 @@ public static Object[][] parameters() { @ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder(); + public static final HadoopTables tables = new HadoopTables(new Configuration()); + protected abstract ReaderFunction readerFunction(); protected abstract void assertRecords(List expected, List actual, Schema schema); private final FileFormat fileFormat; private final GenericAppenderFactory appenderFactory; + private final Table table; - public ReaderFunctionTestBase(FileFormat fileFormat) { + public ReaderFunctionTestBase(FileFormat fileFormat) throws IOException { this.fileFormat = fileFormat; - this.appenderFactory = new GenericAppenderFactory(TestFixtures.SCHEMA); + this.table = + tables + .buildTable(TEMPORARY_FOLDER.newFolder().getAbsolutePath(), TestFixtures.SCHEMA) + .withProperty(TableProperties.DEFAULT_FILE_FORMAT, fileFormat.name()) + .create(); + this.appenderFactory = new GenericAppenderFactory(table); + } + + public Table table() { + return table; } private void assertRecordsAndPosition( diff --git a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/reader/TestIcebergSourceReader.java b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/reader/TestIcebergSourceReader.java index ddc144be883c..dd4f6d3c273a 100644 --- a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/reader/TestIcebergSourceReader.java +++ b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/reader/TestIcebergSourceReader.java @@ -18,6 +18,7 @@ */ package org.apache.iceberg.flink.source.reader; +import java.io.IOException; import java.util.Arrays; import java.util.List; import org.apache.flink.api.connector.source.SourceReaderContext; @@ -28,13 +29,16 @@ import org.apache.flink.table.data.RowData; import org.apache.iceberg.CombinedScanTask; import org.apache.iceberg.FileFormat; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.SortOrder; +import org.apache.iceberg.Table; import org.apache.iceberg.data.GenericAppenderFactory; import org.apache.iceberg.data.Record; -import org.apache.iceberg.encryption.PlaintextEncryptionManager; import org.apache.iceberg.flink.TestFixtures; import org.apache.iceberg.flink.TestHelpers; import org.apache.iceberg.flink.source.split.IcebergSourceSplit; -import org.apache.iceberg.hadoop.HadoopFileIO; +import org.apache.iceberg.hadoop.HadoopTables; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.junit.Assert; import org.junit.ClassRule; import org.junit.Test; @@ -44,9 +48,18 @@ public class TestIcebergSourceReader { @ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder(); private final GenericAppenderFactory appenderFactory; + private final Table table; - public TestIcebergSourceReader() { - this.appenderFactory = new GenericAppenderFactory(TestFixtures.SCHEMA); + public TestIcebergSourceReader() throws IOException { + HadoopTables tables = new HadoopTables(new org.apache.hadoop.conf.Configuration()); + this.table = + tables.create( + TestFixtures.SCHEMA, + PartitionSpec.unpartitioned(), + SortOrder.unsorted(), + ImmutableMap.of(), + TEMPORARY_FOLDER.newFolder().getAbsolutePath()); + this.appenderFactory = new GenericAppenderFactory(table); } @Test @@ -99,14 +112,7 @@ private IcebergSourceReader createReader( IcebergSourceReaderMetrics readerMetrics = new IcebergSourceReaderMetrics(metricGroup, "db.tbl"); RowDataReaderFunction readerFunction = - new RowDataReaderFunction( - new Configuration(), - TestFixtures.SCHEMA, - TestFixtures.SCHEMA, - null, - true, - new HadoopFileIO(new org.apache.hadoop.conf.Configuration()), - new PlaintextEncryptionManager()); + new RowDataReaderFunction(table, new Configuration(), TestFixtures.SCHEMA, true); return new IcebergSourceReader<>(readerMetrics, readerFunction, readerContext); } } diff --git a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/reader/TestRowDataReaderFunction.java b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/reader/TestRowDataReaderFunction.java index aee271a3a7b8..856b104710ca 100644 --- a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/reader/TestRowDataReaderFunction.java +++ b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/reader/TestRowDataReaderFunction.java @@ -18,6 +18,7 @@ */ package org.apache.iceberg.flink.source.reader; +import java.io.IOException; import java.util.List; import java.util.stream.Collectors; import org.apache.flink.configuration.Configuration; @@ -30,11 +31,9 @@ import org.apache.iceberg.FileFormat; import org.apache.iceberg.Schema; import org.apache.iceberg.data.Record; -import org.apache.iceberg.encryption.PlaintextEncryptionManager; import org.apache.iceberg.flink.FlinkSchemaUtil; import org.apache.iceberg.flink.TestFixtures; import org.apache.iceberg.flink.TestHelpers; -import org.apache.iceberg.hadoop.HadoopFileIO; public class TestRowDataReaderFunction extends ReaderFunctionTestBase { @@ -42,20 +41,13 @@ public class TestRowDataReaderFunction extends ReaderFunctionTestBase { private static final DataStructureConverter rowDataConverter = DataStructureConverters.getConverter(TypeConversions.fromLogicalToDataType(rowType)); - public TestRowDataReaderFunction(FileFormat fileFormat) { + public TestRowDataReaderFunction(FileFormat fileFormat) throws IOException { super(fileFormat); } @Override protected ReaderFunction readerFunction() { - return new RowDataReaderFunction( - new Configuration(), - TestFixtures.SCHEMA, - TestFixtures.SCHEMA, - null, - true, - new HadoopFileIO(new org.apache.hadoop.conf.Configuration()), - new PlaintextEncryptionManager()); + return new RowDataReaderFunction(table(), new Configuration(), TestFixtures.SCHEMA, true); } @Override From a2f9a5b623803369712903eb36c6e6933d20ecdd Mon Sep 17 00:00:00 2001 From: Junjie Chen Date: Mon, 12 Dec 2022 20:27:09 +0800 Subject: [PATCH 2/4] fix ut fail --- .../src/main/java/org/apache/iceberg/SerializableTable.java | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/core/src/main/java/org/apache/iceberg/SerializableTable.java b/core/src/main/java/org/apache/iceberg/SerializableTable.java index 4f5ddef0c61d..cced65b4caeb 100644 --- a/core/src/main/java/org/apache/iceberg/SerializableTable.java +++ b/core/src/main/java/org/apache/iceberg/SerializableTable.java @@ -357,6 +357,12 @@ public Transaction newTransaction() { throw new UnsupportedOperationException(errorMsg("newTransaction")); } + @Override + public IncrementalAppendScan newIncrementalAppendScan() { + TableOperations ops = new StaticTableOperations(metadataFileLocation, io, locationProvider); + return new BaseIncrementalAppendScan(ops, lazyTable()); + } + private String errorMsg(String operation) { return String.format("Operation %s is not supported after the table is serialized", operation); } From 1180ac6d4b7bfd7375b1595cd6961fcad53880af Mon Sep 17 00:00:00 2001 From: Junjie Chen Date: Tue, 13 Dec 2022 10:34:19 +0800 Subject: [PATCH 3/4] address steven's comments --- .../org/apache/iceberg/SerializableTable.java | 11 +++---- .../iceberg/data/GenericAppenderFactory.java | 5 --- .../flink/actions/RewriteDataFilesAction.java | 4 ++- .../iceberg/flink/source/DataIterator.java | 22 ------------- .../flink/source/FlinkInputFormat.java | 2 +- .../iceberg/flink/source/FlinkSource.java | 1 - .../iceberg/flink/source/IcebergSource.java | 6 +++- .../iceberg/flink/source/RowDataRewriter.java | 5 ++- .../source/reader/RowDataReaderFunction.java | 10 ++++-- .../source/reader/ReaderFunctionTestBase.java | 22 +++++-------- .../flink/source/reader/ReaderUtil.java | 11 +++---- ...stArrayPoolDataIteratorBatcherRowData.java | 20 +++++++++-- .../reader/TestIcebergSourceReader.java | 33 +++++++++---------- .../reader/TestRowDataReaderFunction.java | 10 ++++-- 14 files changed, 76 insertions(+), 86 deletions(-) diff --git a/core/src/main/java/org/apache/iceberg/SerializableTable.java b/core/src/main/java/org/apache/iceberg/SerializableTable.java index cced65b4caeb..68b1166d27f5 100644 --- a/core/src/main/java/org/apache/iceberg/SerializableTable.java +++ b/core/src/main/java/org/apache/iceberg/SerializableTable.java @@ -257,6 +257,11 @@ public TableScan newScan() { return lazyTable().newScan(); } + @Override + public IncrementalAppendScan newIncrementalAppendScan() { + return lazyTable().newIncrementalAppendScan(); + } + @Override public Snapshot currentSnapshot() { return lazyTable().currentSnapshot(); @@ -357,12 +362,6 @@ public Transaction newTransaction() { throw new UnsupportedOperationException(errorMsg("newTransaction")); } - @Override - public IncrementalAppendScan newIncrementalAppendScan() { - TableOperations ops = new StaticTableOperations(metadataFileLocation, io, locationProvider); - return new BaseIncrementalAppendScan(ops, lazyTable()); - } - private String errorMsg(String operation) { return String.format("Operation %s is not supported after the table is serialized", operation); } diff --git a/data/src/main/java/org/apache/iceberg/data/GenericAppenderFactory.java b/data/src/main/java/org/apache/iceberg/data/GenericAppenderFactory.java index e6ec11853529..23a94ebc9944 100644 --- a/data/src/main/java/org/apache/iceberg/data/GenericAppenderFactory.java +++ b/data/src/main/java/org/apache/iceberg/data/GenericAppenderFactory.java @@ -26,7 +26,6 @@ import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; import org.apache.iceberg.StructLike; -import org.apache.iceberg.Table; import org.apache.iceberg.avro.Avro; import org.apache.iceberg.data.avro.DataWriter; import org.apache.iceberg.data.orc.GenericOrcWriter; @@ -60,10 +59,6 @@ public GenericAppenderFactory(Schema schema, PartitionSpec spec) { this(schema, spec, null, null, null); } - public GenericAppenderFactory(Table table) { - this(table.schema(), table.spec(), null, null, null); - } - public GenericAppenderFactory( Schema schema, PartitionSpec spec, diff --git a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/actions/RewriteDataFilesAction.java b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/actions/RewriteDataFilesAction.java index 309757bc5c94..b94733010bf3 100644 --- a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/actions/RewriteDataFilesAction.java +++ b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/actions/RewriteDataFilesAction.java @@ -23,6 +23,7 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.iceberg.CombinedScanTask; import org.apache.iceberg.DataFile; +import org.apache.iceberg.SerializableTable; import org.apache.iceberg.Table; import org.apache.iceberg.actions.BaseRewriteDataFilesAction; import org.apache.iceberg.flink.source.RowDataRewriter; @@ -50,7 +51,8 @@ protected List rewriteDataForTasks(List combinedScan int size = combinedScanTasks.size(); int parallelism = Math.min(size, maxParallelism); DataStream dataStream = env.fromCollection(combinedScanTasks); - RowDataRewriter rowDataRewriter = new RowDataRewriter(table(), caseSensitive()); + RowDataRewriter rowDataRewriter = + new RowDataRewriter((SerializableTable) SerializableTable.copyOf(table()), caseSensitive()); try { return rowDataRewriter.rewriteDataForTasks(dataStream, parallelism); } catch (Exception e) { diff --git a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/DataIterator.java b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/DataIterator.java index 2cff13447c4e..82ff08d039c3 100644 --- a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/DataIterator.java +++ b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/DataIterator.java @@ -25,10 +25,8 @@ import org.apache.iceberg.CombinedScanTask; import org.apache.iceberg.FileScanTask; import org.apache.iceberg.Table; -import org.apache.iceberg.encryption.EncryptionManager; import org.apache.iceberg.encryption.InputFilesDecryptor; import org.apache.iceberg.io.CloseableIterator; -import org.apache.iceberg.io.FileIO; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; /** @@ -49,26 +47,6 @@ public class DataIterator implements CloseableIterator { private int fileOffset; private long recordOffset; - public DataIterator( - FileScanTaskReader fileScanTaskReader, - CombinedScanTask task, - FileIO io, - EncryptionManager encryption) { - this.fileScanTaskReader = fileScanTaskReader; - - this.inputFilesDecryptor = new InputFilesDecryptor(task, io, encryption); - this.combinedTask = task; - - this.tasks = task.files().iterator(); - this.currentIterator = CloseableIterator.empty(); - - // fileOffset starts at -1 because we started - // from an empty iterator that is not from the split files. - this.fileOffset = -1; - // record offset points to the record that next() should return when called - this.recordOffset = 0L; - } - public DataIterator( Table table, FileScanTaskReader fileScanTaskReader, CombinedScanTask task) { this.fileScanTaskReader = fileScanTaskReader; diff --git a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/FlinkInputFormat.java b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/FlinkInputFormat.java index 35878d2398fa..dac46eee604e 100644 --- a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/FlinkInputFormat.java +++ b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/FlinkInputFormat.java @@ -39,9 +39,9 @@ public class FlinkInputFormat extends RichInputFormat private static final long serialVersionUID = 1L; + private final Table table; private final ScanContext context; private final RowDataFileScanTaskReader rowDataReader; - private final Table table; private transient DataIterator iterator; private transient long currentReadCount = 0L; diff --git a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/FlinkSource.java b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/FlinkSource.java index 06f444de0083..ef03de3f6bd3 100644 --- a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/FlinkSource.java +++ b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/FlinkSource.java @@ -199,7 +199,6 @@ public FlinkInputFormat buildFormat() { } icebergSchema = table.schema(); - if (projectedSchema == null) { contextBuilder.project(icebergSchema); } else { diff --git a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/IcebergSource.java b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/IcebergSource.java index 6a912e8de76e..ef9bb37e5ee0 100644 --- a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/IcebergSource.java +++ b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/IcebergSource.java @@ -40,6 +40,7 @@ import org.apache.flink.table.data.RowData; import org.apache.flink.util.Preconditions; import org.apache.iceberg.Schema; +import org.apache.iceberg.SerializableTable; import org.apache.iceberg.Table; import org.apache.iceberg.expressions.Expression; import org.apache.iceberg.flink.FlinkSchemaUtil; @@ -357,7 +358,10 @@ public IcebergSource build() { if (readerFunction == null) { RowDataReaderFunction rowDataReaderFunction = new RowDataReaderFunction( - table, flinkConfig, context.project(), context.caseSensitive()); + (SerializableTable) SerializableTable.copyOf(table), + flinkConfig, + context.project(), + context.caseSensitive()); this.readerFunction = (ReaderFunction) rowDataReaderFunction; } diff --git a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/RowDataRewriter.java b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/RowDataRewriter.java index 67c4a58a03b3..595b38ab5a0d 100644 --- a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/RowDataRewriter.java +++ b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/RowDataRewriter.java @@ -31,6 +31,7 @@ import org.apache.iceberg.CombinedScanTask; import org.apache.iceberg.DataFile; import org.apache.iceberg.FileFormat; +import org.apache.iceberg.SerializableTable; import org.apache.iceberg.Table; import org.apache.iceberg.TableProperties; import org.apache.iceberg.flink.FlinkSchemaUtil; @@ -51,7 +52,7 @@ public class RowDataRewriter { private final TaskWriterFactory taskWriterFactory; private final String tableName; - public RowDataRewriter(Table table, boolean caseSensitive) { + public RowDataRewriter(SerializableTable table, boolean caseSensitive) { this.table = table; this.caseSensitive = caseSensitive; this.tableName = table.name(); @@ -84,14 +85,12 @@ public static class RewriteMap extends RichMapFunction taskWriterFactory; private final RowDataFileScanTaskReader rowDataReader; public RewriteMap( Table table, boolean caseSensitive, TaskWriterFactory taskWriterFactory) { this.table = table; - this.caseSensitive = caseSensitive; this.taskWriterFactory = taskWriterFactory; this.rowDataReader = new RowDataFileScanTaskReader( diff --git a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/reader/RowDataReaderFunction.java b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/reader/RowDataReaderFunction.java index 9332fe5f6f59..b32cc0b332f1 100644 --- a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/reader/RowDataReaderFunction.java +++ b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/reader/RowDataReaderFunction.java @@ -23,6 +23,7 @@ import org.apache.flink.configuration.ReadableConfig; import org.apache.flink.table.data.RowData; import org.apache.iceberg.Schema; +import org.apache.iceberg.SerializableTable; import org.apache.iceberg.Table; import org.apache.iceberg.flink.FlinkSchemaUtil; import org.apache.iceberg.flink.source.DataIterator; @@ -31,20 +32,23 @@ import org.apache.iceberg.relocated.com.google.common.base.Preconditions; public class RowDataReaderFunction extends DataIteratorReaderFunction { + private final Table table; private final Schema readSchema; private final boolean caseSensitive; - private final Table table; public RowDataReaderFunction( - Table table, ReadableConfig config, Schema projectedSchema, boolean caseSensitive) { + SerializableTable table, + ReadableConfig config, + Schema projectedSchema, + boolean caseSensitive) { super( new ArrayPoolDataIteratorBatcher<>( config, new RowDataRecordFactory( FlinkSchemaUtil.convert(readSchema(table.schema(), projectedSchema))))); + this.table = table; this.readSchema = readSchema(table.schema(), projectedSchema); this.caseSensitive = caseSensitive; - this.table = table; } @Override diff --git a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/reader/ReaderFunctionTestBase.java b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/reader/ReaderFunctionTestBase.java index 4174c9971265..7db87cfe5b90 100644 --- a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/reader/ReaderFunctionTestBase.java +++ b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/reader/ReaderFunctionTestBase.java @@ -21,21 +21,20 @@ import java.io.IOException; import java.util.List; import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds; -import org.apache.hadoop.conf.Configuration; import org.apache.iceberg.CombinedScanTask; import org.apache.iceberg.FileFormat; import org.apache.iceberg.Schema; import org.apache.iceberg.Table; -import org.apache.iceberg.TableProperties; import org.apache.iceberg.data.GenericAppenderFactory; import org.apache.iceberg.data.Record; +import org.apache.iceberg.flink.HadoopTableResource; import org.apache.iceberg.flink.TestFixtures; import org.apache.iceberg.flink.source.split.IcebergSourceSplit; -import org.apache.iceberg.hadoop.HadoopTables; import org.apache.iceberg.io.CloseableIterator; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.junit.Assert; import org.junit.ClassRule; +import org.junit.Rule; import org.junit.Test; import org.junit.rules.TemporaryFolder; import org.junit.runner.RunWith; @@ -55,7 +54,10 @@ public static Object[][] parameters() { @ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder(); - public static final HadoopTables tables = new HadoopTables(new Configuration()); + @Rule + public final HadoopTableResource tableResource = + new HadoopTableResource( + TEMPORARY_FOLDER, TestFixtures.DATABASE, TestFixtures.TABLE, TestFixtures.SCHEMA); protected abstract ReaderFunction readerFunction(); @@ -63,20 +65,14 @@ public static Object[][] parameters() { private final FileFormat fileFormat; private final GenericAppenderFactory appenderFactory; - private final Table table; - public ReaderFunctionTestBase(FileFormat fileFormat) throws IOException { + public ReaderFunctionTestBase(FileFormat fileFormat) { this.fileFormat = fileFormat; - this.table = - tables - .buildTable(TEMPORARY_FOLDER.newFolder().getAbsolutePath(), TestFixtures.SCHEMA) - .withProperty(TableProperties.DEFAULT_FILE_FORMAT, fileFormat.name()) - .create(); - this.appenderFactory = new GenericAppenderFactory(table); + this.appenderFactory = new GenericAppenderFactory(TestFixtures.SCHEMA); } public Table table() { - return table; + return tableResource.table(); } private void assertRecordsAndPosition( diff --git a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/reader/ReaderUtil.java b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/reader/ReaderUtil.java index b2ccbff2d6c2..93416f2d4e37 100644 --- a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/reader/ReaderUtil.java +++ b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/reader/ReaderUtil.java @@ -34,16 +34,15 @@ import org.apache.iceberg.PartitionSpecParser; import org.apache.iceberg.Schema; import org.apache.iceberg.SchemaParser; +import org.apache.iceberg.SerializableTable; import org.apache.iceberg.data.GenericAppenderFactory; import org.apache.iceberg.data.RandomGenericData; import org.apache.iceberg.data.Record; -import org.apache.iceberg.encryption.PlaintextEncryptionManager; import org.apache.iceberg.expressions.Expressions; import org.apache.iceberg.expressions.ResidualEvaluator; import org.apache.iceberg.flink.TestFixtures; import org.apache.iceberg.flink.source.DataIterator; import org.apache.iceberg.flink.source.RowDataFileScanTaskReader; -import org.apache.iceberg.hadoop.HadoopFileIO; import org.apache.iceberg.io.FileAppender; import org.apache.iceberg.io.FileAppenderFactory; import org.apache.iceberg.relocated.com.google.common.collect.Lists; @@ -81,12 +80,12 @@ public static FileScanTask createFileTask( residuals); } - public static DataIterator createDataIterator(CombinedScanTask combinedTask) { + public static DataIterator createDataIterator( + SerializableTable table, CombinedScanTask combinedTask) { return new DataIterator<>( + table, new RowDataFileScanTaskReader(TestFixtures.SCHEMA, TestFixtures.SCHEMA, null, true), - combinedTask, - new HadoopFileIO(new org.apache.hadoop.conf.Configuration()), - new PlaintextEncryptionManager()); + combinedTask); } public static List> createRecordBatchList( diff --git a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/reader/TestArrayPoolDataIteratorBatcherRowData.java b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/reader/TestArrayPoolDataIteratorBatcherRowData.java index f964a7707689..2990b74ce01c 100644 --- a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/reader/TestArrayPoolDataIteratorBatcherRowData.java +++ b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/reader/TestArrayPoolDataIteratorBatcherRowData.java @@ -28,16 +28,19 @@ import org.apache.iceberg.CombinedScanTask; import org.apache.iceberg.FileFormat; import org.apache.iceberg.FileScanTask; +import org.apache.iceberg.SerializableTable; import org.apache.iceberg.data.GenericAppenderFactory; import org.apache.iceberg.data.RandomGenericData; import org.apache.iceberg.data.Record; import org.apache.iceberg.flink.FlinkConfigOptions; +import org.apache.iceberg.flink.HadoopTableResource; import org.apache.iceberg.flink.TestFixtures; import org.apache.iceberg.flink.TestHelpers; import org.apache.iceberg.flink.source.DataIterator; import org.apache.iceberg.io.CloseableIterator; import org.junit.Assert; import org.junit.ClassRule; +import org.junit.Rule; import org.junit.Test; import org.junit.rules.TemporaryFolder; @@ -46,6 +49,11 @@ public class TestArrayPoolDataIteratorBatcherRowData { @ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder(); private static final FileFormat fileFormat = FileFormat.PARQUET; + @Rule + public final HadoopTableResource tableResource = + new HadoopTableResource( + TEMPORARY_FOLDER, TestFixtures.DATABASE, TestFixtures.TABLE, TestFixtures.SCHEMA); + private final GenericAppenderFactory appenderFactory; private final DataIteratorBatcher batcher; @@ -67,7 +75,9 @@ public void testSingleFileLessThanOneFullBatch() throws Exception { FileScanTask fileTask = ReaderUtil.createFileTask(records, TEMPORARY_FOLDER.newFile(), fileFormat, appenderFactory); CombinedScanTask combinedTask = new BaseCombinedScanTask(fileTask); - DataIterator dataIterator = ReaderUtil.createDataIterator(combinedTask); + DataIterator dataIterator = + ReaderUtil.createDataIterator( + (SerializableTable) SerializableTable.copyOf(tableResource.table()), combinedTask); String splitId = "someSplitId"; CloseableIterator>> recordBatchIterator = batcher.batch(splitId, dataIterator); @@ -109,7 +119,9 @@ public void testSingleFileWithMultipleBatches() throws Exception { FileScanTask fileTask = ReaderUtil.createFileTask(records, TEMPORARY_FOLDER.newFile(), fileFormat, appenderFactory); CombinedScanTask combinedTask = new BaseCombinedScanTask(fileTask); - DataIterator dataIterator = ReaderUtil.createDataIterator(combinedTask); + DataIterator dataIterator = + ReaderUtil.createDataIterator( + (SerializableTable) SerializableTable.copyOf(tableResource.table()), combinedTask); String splitId = "someSplitId"; CloseableIterator>> recordBatchIterator = batcher.batch(splitId, dataIterator); @@ -226,7 +238,9 @@ public void testMultipleFilesWithSeekPosition() throws Exception { CombinedScanTask combinedTask = new BaseCombinedScanTask(Arrays.asList(fileTask0, fileTask1, fileTask2)); - DataIterator dataIterator = ReaderUtil.createDataIterator(combinedTask); + DataIterator dataIterator = + ReaderUtil.createDataIterator( + (SerializableTable) SerializableTable.copyOf(tableResource.table()), combinedTask); // seek to file1 and after record 1 dataIterator.seek(1, 1); diff --git a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/reader/TestIcebergSourceReader.java b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/reader/TestIcebergSourceReader.java index dd4f6d3c273a..51dba72903e0 100644 --- a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/reader/TestIcebergSourceReader.java +++ b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/reader/TestIcebergSourceReader.java @@ -18,7 +18,6 @@ */ package org.apache.iceberg.flink.source.reader; -import java.io.IOException; import java.util.Arrays; import java.util.List; import org.apache.flink.api.connector.source.SourceReaderContext; @@ -29,37 +28,31 @@ import org.apache.flink.table.data.RowData; import org.apache.iceberg.CombinedScanTask; import org.apache.iceberg.FileFormat; -import org.apache.iceberg.PartitionSpec; -import org.apache.iceberg.SortOrder; -import org.apache.iceberg.Table; +import org.apache.iceberg.SerializableTable; import org.apache.iceberg.data.GenericAppenderFactory; import org.apache.iceberg.data.Record; +import org.apache.iceberg.flink.HadoopTableResource; import org.apache.iceberg.flink.TestFixtures; import org.apache.iceberg.flink.TestHelpers; import org.apache.iceberg.flink.source.split.IcebergSourceSplit; -import org.apache.iceberg.hadoop.HadoopTables; -import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.junit.Assert; import org.junit.ClassRule; +import org.junit.Rule; import org.junit.Test; import org.junit.rules.TemporaryFolder; public class TestIcebergSourceReader { @ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder(); + @Rule + public final HadoopTableResource tableResource = + new HadoopTableResource( + TEMPORARY_FOLDER, TestFixtures.DATABASE, TestFixtures.TABLE, TestFixtures.SCHEMA); + private final GenericAppenderFactory appenderFactory; - private final Table table; - public TestIcebergSourceReader() throws IOException { - HadoopTables tables = new HadoopTables(new org.apache.hadoop.conf.Configuration()); - this.table = - tables.create( - TestFixtures.SCHEMA, - PartitionSpec.unpartitioned(), - SortOrder.unsorted(), - ImmutableMap.of(), - TEMPORARY_FOLDER.newFolder().getAbsolutePath()); - this.appenderFactory = new GenericAppenderFactory(table); + public TestIcebergSourceReader() { + this.appenderFactory = new GenericAppenderFactory(TestFixtures.SCHEMA); } @Test @@ -112,7 +105,11 @@ private IcebergSourceReader createReader( IcebergSourceReaderMetrics readerMetrics = new IcebergSourceReaderMetrics(metricGroup, "db.tbl"); RowDataReaderFunction readerFunction = - new RowDataReaderFunction(table, new Configuration(), TestFixtures.SCHEMA, true); + new RowDataReaderFunction( + (SerializableTable) SerializableTable.copyOf(tableResource.table()), + new Configuration(), + TestFixtures.SCHEMA, + true); return new IcebergSourceReader<>(readerMetrics, readerFunction, readerContext); } } diff --git a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/reader/TestRowDataReaderFunction.java b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/reader/TestRowDataReaderFunction.java index 856b104710ca..fd41fde879df 100644 --- a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/reader/TestRowDataReaderFunction.java +++ b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/reader/TestRowDataReaderFunction.java @@ -18,7 +18,6 @@ */ package org.apache.iceberg.flink.source.reader; -import java.io.IOException; import java.util.List; import java.util.stream.Collectors; import org.apache.flink.configuration.Configuration; @@ -30,6 +29,7 @@ import org.apache.flink.types.Row; import org.apache.iceberg.FileFormat; import org.apache.iceberg.Schema; +import org.apache.iceberg.SerializableTable; import org.apache.iceberg.data.Record; import org.apache.iceberg.flink.FlinkSchemaUtil; import org.apache.iceberg.flink.TestFixtures; @@ -41,13 +41,17 @@ public class TestRowDataReaderFunction extends ReaderFunctionTestBase { private static final DataStructureConverter rowDataConverter = DataStructureConverters.getConverter(TypeConversions.fromLogicalToDataType(rowType)); - public TestRowDataReaderFunction(FileFormat fileFormat) throws IOException { + public TestRowDataReaderFunction(FileFormat fileFormat) { super(fileFormat); } @Override protected ReaderFunction readerFunction() { - return new RowDataReaderFunction(table(), new Configuration(), TestFixtures.SCHEMA, true); + return new RowDataReaderFunction( + (SerializableTable) SerializableTable.copyOf(table()), + new Configuration(), + TestFixtures.SCHEMA, + true); } @Override From 4f9dc9712cee0a1941222e0dac85f7eadae039bc Mon Sep 17 00:00:00 2001 From: Junjie Chen Date: Tue, 13 Dec 2022 20:03:16 +0800 Subject: [PATCH 4/4] revert back namemapping changes --- .../apache/iceberg/flink/source/IcebergSource.java | 1 + .../flink/source/reader/RowDataReaderFunction.java | 11 ++++------- .../flink/source/reader/TestIcebergSourceReader.java | 1 + .../source/reader/TestRowDataReaderFunction.java | 1 + 4 files changed, 7 insertions(+), 7 deletions(-) diff --git a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/IcebergSource.java b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/IcebergSource.java index ef9bb37e5ee0..c970ee54d555 100644 --- a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/IcebergSource.java +++ b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/IcebergSource.java @@ -361,6 +361,7 @@ public IcebergSource build() { (SerializableTable) SerializableTable.copyOf(table), flinkConfig, context.project(), + context.nameMapping(), context.caseSensitive()); this.readerFunction = (ReaderFunction) rowDataReaderFunction; } diff --git a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/reader/RowDataReaderFunction.java b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/reader/RowDataReaderFunction.java index b32cc0b332f1..080fea50f7b1 100644 --- a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/reader/RowDataReaderFunction.java +++ b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/reader/RowDataReaderFunction.java @@ -18,8 +18,6 @@ */ package org.apache.iceberg.flink.source.reader; -import static org.apache.iceberg.TableProperties.DEFAULT_NAME_MAPPING; - import org.apache.flink.configuration.ReadableConfig; import org.apache.flink.table.data.RowData; import org.apache.iceberg.Schema; @@ -34,12 +32,14 @@ public class RowDataReaderFunction extends DataIteratorReaderFunction { private final Table table; private final Schema readSchema; + private final String nameMapping; private final boolean caseSensitive; public RowDataReaderFunction( SerializableTable table, ReadableConfig config, Schema projectedSchema, + String nameMapping, boolean caseSensitive) { super( new ArrayPoolDataIteratorBatcher<>( @@ -48,6 +48,7 @@ public RowDataReaderFunction( FlinkSchemaUtil.convert(readSchema(table.schema(), projectedSchema))))); this.table = table; this.readSchema = readSchema(table.schema(), projectedSchema); + this.nameMapping = nameMapping; this.caseSensitive = caseSensitive; } @@ -55,11 +56,7 @@ public RowDataReaderFunction( public DataIterator createDataIterator(IcebergSourceSplit split) { return new DataIterator<>( table, - new RowDataFileScanTaskReader( - table.schema(), - readSchema, - table.properties().get(DEFAULT_NAME_MAPPING), - caseSensitive), + new RowDataFileScanTaskReader(table.schema(), readSchema, nameMapping, caseSensitive), split.task()); } diff --git a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/reader/TestIcebergSourceReader.java b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/reader/TestIcebergSourceReader.java index 51dba72903e0..30864726a20b 100644 --- a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/reader/TestIcebergSourceReader.java +++ b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/reader/TestIcebergSourceReader.java @@ -109,6 +109,7 @@ private IcebergSourceReader createReader( (SerializableTable) SerializableTable.copyOf(tableResource.table()), new Configuration(), TestFixtures.SCHEMA, + null, true); return new IcebergSourceReader<>(readerMetrics, readerFunction, readerContext); } diff --git a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/reader/TestRowDataReaderFunction.java b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/reader/TestRowDataReaderFunction.java index fd41fde879df..826f61b7b1ee 100644 --- a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/reader/TestRowDataReaderFunction.java +++ b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/reader/TestRowDataReaderFunction.java @@ -51,6 +51,7 @@ protected ReaderFunction readerFunction() { (SerializableTable) SerializableTable.copyOf(table()), new Configuration(), TestFixtures.SCHEMA, + null, true); }