diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FormatUtils.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FormatUtils.java index 478f94cb71f73..eb058597f8059 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FormatUtils.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FormatUtils.java @@ -30,6 +30,7 @@ import org.apache.hudi.common.util.queue.BoundedInMemoryQueueProducer; import org.apache.hudi.common.util.queue.FunctionBasedQueueProducer; import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.configuration.FlinkOptions; import org.apache.hudi.hadoop.config.HoodieRealtimeConfig; import org.apache.hudi.table.format.mor.MergeOnReadInputSplit; import org.apache.hudi.util.StreamerUtil; @@ -120,38 +121,34 @@ private static Object getVal(IndexedRecord record, int pos) { public static HoodieMergedLogRecordScanner logScanner( MergeOnReadInputSplit split, Schema logSchema, - Configuration config, - boolean withOperationField) { - FileSystem fs = FSUtils.getFs(split.getTablePath(), config); + org.apache.flink.configuration.Configuration flinkConf, + Configuration hadoopConf) { + HoodieWriteConfig writeConfig = StreamerUtil.getHoodieClientConfig(flinkConf); + FileSystem fs = FSUtils.getFs(split.getTablePath(), hadoopConf); return HoodieMergedLogRecordScanner.newBuilder() .withFileSystem(fs) .withBasePath(split.getTablePath()) .withLogFilePaths(split.getLogPaths().get()) .withReaderSchema(logSchema) .withLatestInstantTime(split.getLatestCommit()) - .withReadBlocksLazily( - string2Boolean( - config.get(HoodieRealtimeConfig.COMPACTION_LAZY_BLOCK_READ_ENABLED_PROP, - HoodieRealtimeConfig.DEFAULT_COMPACTION_LAZY_BLOCK_READ_ENABLED))) + .withReadBlocksLazily(writeConfig.getCompactionLazyBlockReadEnabled()) .withReverseReader(false) - .withBufferSize( - config.getInt(HoodieRealtimeConfig.MAX_DFS_STREAM_BUFFER_SIZE_PROP, - HoodieRealtimeConfig.DEFAULT_MAX_DFS_STREAM_BUFFER_SIZE)) + .withBufferSize(writeConfig.getMaxDFSStreamBufferSize()) .withMaxMemorySizeInBytes(split.getMaxCompactionMemoryInBytes()) - .withSpillableMapBasePath( - config.get(HoodieRealtimeConfig.SPILLABLE_MAP_BASE_PATH_PROP, - HoodieRealtimeConfig.DEFAULT_SPILLABLE_MAP_BASE_PATH)) + .withDiskMapType(writeConfig.getCommonConfig().getSpillableDiskMapType()) + .withSpillableMapBasePath(writeConfig.getSpillableMapBasePath()) .withInstantRange(split.getInstantRange()) - .withOperationField(withOperationField) + .withOperationField(flinkConf.getBoolean(FlinkOptions.CHANGELOG_ENABLED)) .build(); } private static HoodieUnMergedLogRecordScanner unMergedLogScanner( MergeOnReadInputSplit split, Schema logSchema, - Configuration config, + org.apache.flink.configuration.Configuration flinkConf, + Configuration hadoopConf, HoodieUnMergedLogRecordScanner.LogRecordScannerCallback callback) { - FileSystem fs = FSUtils.getFs(split.getTablePath(), config); + FileSystem fs = FSUtils.getFs(split.getTablePath(), hadoopConf); return HoodieUnMergedLogRecordScanner.newBuilder() .withFileSystem(fs) .withBasePath(split.getTablePath()) @@ -160,11 +157,11 @@ private static HoodieUnMergedLogRecordScanner unMergedLogScanner( .withLatestInstantTime(split.getLatestCommit()) .withReadBlocksLazily( string2Boolean( - config.get(HoodieRealtimeConfig.COMPACTION_LAZY_BLOCK_READ_ENABLED_PROP, + flinkConf.getString(HoodieRealtimeConfig.COMPACTION_LAZY_BLOCK_READ_ENABLED_PROP, HoodieRealtimeConfig.DEFAULT_COMPACTION_LAZY_BLOCK_READ_ENABLED))) .withReverseReader(false) .withBufferSize( - config.getInt(HoodieRealtimeConfig.MAX_DFS_STREAM_BUFFER_SIZE_PROP, + flinkConf.getInteger(HoodieRealtimeConfig.MAX_DFS_STREAM_BUFFER_SIZE_PROP, HoodieRealtimeConfig.DEFAULT_MAX_DFS_STREAM_BUFFER_SIZE)) .withInstantRange(split.getInstantRange()) .withLogRecordScannerCallback(callback) @@ -198,7 +195,7 @@ public BoundedMemoryRecords( Functions.noop()); // Consumer of this record reader this.iterator = this.executor.getQueue().iterator(); - this.scanner = FormatUtils.unMergedLogScanner(split, logSchema, hadoopConf, + this.scanner = FormatUtils.unMergedLogScanner(split, logSchema, flinkConf, hadoopConf, record -> executor.getQueue().insertRecord(record)); // Start reading and buffering this.executor.startProducers(); diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java index 4f2de3648ed56..8eaa9d0b886f4 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java @@ -192,6 +192,7 @@ public void open(MergeOnReadInputSplit split) throws IOException { getLogFileIterator(split)); } else if (split.getMergeType().equals(FlinkOptions.REALTIME_PAYLOAD_COMBINE)) { this.iterator = new MergeIterator( + conf, hadoopConf, split, this.tableState.getRowType(), @@ -200,7 +201,6 @@ public void open(MergeOnReadInputSplit split) throws IOException { new Schema.Parser().parse(this.tableState.getRequiredAvroSchema()), this.requiredPos, this.emitDelete, - this.conf.getBoolean(FlinkOptions.CHANGELOG_ENABLED), this.tableState.getOperationPos(), getFullSchemaReader(split.getBasePath().get())); } else { @@ -323,7 +323,7 @@ private ClosableIterator getLogFileIterator(MergeOnReadInputSplit split final GenericRecordBuilder recordBuilder = new GenericRecordBuilder(requiredSchema); final AvroToRowDataConverters.AvroToRowDataConverter avroToRowDataConverter = AvroToRowDataConverters.createRowConverter(tableState.getRequiredRowType()); - final HoodieMergedLogRecordScanner scanner = FormatUtils.logScanner(split, tableSchema, hadoopConf, conf.getBoolean(FlinkOptions.CHANGELOG_ENABLED)); + final HoodieMergedLogRecordScanner scanner = FormatUtils.logScanner(split, tableSchema, conf, hadoopConf); final Iterator logRecordsKeyIterator = scanner.getRecords().keySet().iterator(); final int[] pkOffset = tableState.getPkOffsetsInRequired(); // flag saying whether the pk semantics has been dropped by user specified @@ -639,6 +639,7 @@ static class MergeIterator implements RecordIterator { private RowData currentRecord; MergeIterator( + Configuration finkConf, org.apache.hadoop.conf.Configuration hadoopConf, MergeOnReadInputSplit split, RowType tableRowType, @@ -647,12 +648,11 @@ static class MergeIterator implements RecordIterator { Schema requiredSchema, int[] requiredPos, boolean emitDelete, - boolean withOperationField, int operationPos, ParquetColumnarRowSplitReader reader) { // the reader should be with full schema this.tableSchema = tableSchema; this.reader = reader; - this.scanner = FormatUtils.logScanner(split, tableSchema, hadoopConf, withOperationField); + this.scanner = FormatUtils.logScanner(split, tableSchema, finkConf, hadoopConf); this.logKeysIterator = scanner.getRecords().keySet().iterator(); this.requiredSchema = requiredSchema; this.requiredPos = requiredPos;