From 7f38c09425e02aab5da82a122a955272b1792f71 Mon Sep 17 00:00:00 2001 From: Bo Date: Wed, 25 May 2022 10:28:17 +0800 Subject: [PATCH 1/5] [HUDI-4151] flink split_reader supports rocksdb --- .../util/collection/RocksDbDiskMap.java | 43 +++++++++++++------ .../apache/hudi/table/format/FormatUtils.java | 39 ++++++++++------- .../format/mor/MergeOnReadInputFormat.java | 8 ++-- 3 files changed, 58 insertions(+), 32 deletions(-) diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/collection/RocksDbDiskMap.java b/hudi-common/src/main/java/org/apache/hudi/common/util/collection/RocksDbDiskMap.java index 21211a5700e51..b2508be4021de 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/collection/RocksDbDiskMap.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/collection/RocksDbDiskMap.java @@ -33,6 +33,7 @@ import java.util.Map; import java.util.Set; import java.util.Spliterators; +import java.util.UUID; import java.util.stream.Stream; import java.util.stream.StreamSupport; @@ -41,21 +42,25 @@ * All of the data is stored using the RocksDB implementation. */ public final class RocksDbDiskMap extends DiskMap { + private static final Logger LOG = LogManager.getLogger(RocksDbDiskMap.class); + // ColumnFamily allows partitioning data within RockDB, which allows // independent configuration and faster deletes across partitions // https://github.com/facebook/rocksdb/wiki/Column-Families // For this use case, we use a single static column family/ partition // - private static final String ROCKSDB_COL_FAMILY = "rocksdb-diskmap"; + private static final String ROCKSDB_BASE_PATH = "rocksdb-diskmap"; - private static final Logger LOG = LogManager.getLogger(RocksDbDiskMap.class); // Stores the key and corresponding value's latest metadata spilled to disk private final Set keySet; - private RocksDBDAO rocksDb; + private static RocksDBDAO rocksDb = null; + private String rocksdbColFamily; public RocksDbDiskMap(String rocksDbStoragePath) throws IOException { super(rocksDbStoragePath, ExternalSpillableMap.DiskMapType.ROCKS_DB.name()); this.keySet = new HashSet<>(); + this.rocksdbColFamily = "rocksdb-diskmap" + UUID.randomUUID().toString(); + getRocksDb().addColumnFamily(rocksdbColFamily); } @Override @@ -83,12 +88,12 @@ public R get(Object key) { if (!containsKey(key)) { return null; } - return getRocksDb().get(ROCKSDB_COL_FAMILY, (T) key); + return getRocksDb().get(rocksdbColFamily, (T) key); } @Override public R put(T key, R value) { - getRocksDb().put(ROCKSDB_COL_FAMILY, key, value); + getRocksDb().put(rocksdbColFamily, key, value); keySet.add(key); return value; } @@ -98,14 +103,14 @@ public R remove(Object key) { R value = get(key); if (value != null) { keySet.remove((T) key); - getRocksDb().delete(ROCKSDB_COL_FAMILY, (T) key); + getRocksDb().delete(rocksdbColFamily, (T) key); } return value; } @Override public void putAll(Map keyValues) { - getRocksDb().writeBatch(batch -> keyValues.forEach((key, value) -> getRocksDb().putInBatch(batch, ROCKSDB_COL_FAMILY, key, value))); + getRocksDb().writeBatch(batch -> keyValues.forEach((key, value) -> getRocksDb().putInBatch(batch, rocksdbColFamily, key, value))); keySet.addAll(keyValues.keySet()); } @@ -138,7 +143,7 @@ public Set> entrySet() { */ @Override public Iterator iterator() { - return getRocksDb().iterator(ROCKSDB_COL_FAMILY); + return getRocksDb().iterator(rocksdbColFamily); } @Override @@ -155,18 +160,30 @@ public long sizeOfFileOnDiskInBytes() { public void close() { keySet.clear(); if (null != rocksDb) { - rocksDb.close(); + rocksDb.dropColumnFamily(rocksdbColFamily); } - rocksDb = null; super.close(); } private RocksDBDAO getRocksDb() { if (null == rocksDb) { - synchronized (this) { + synchronized (ROCKSDB_BASE_PATH) { if (null == rocksDb) { - rocksDb = new RocksDBDAO(ROCKSDB_COL_FAMILY, diskMapPath); - rocksDb.addColumnFamily(ROCKSDB_COL_FAMILY); + rocksDb = new RocksDBDAO(ROCKSDB_BASE_PATH, diskMapPath); + Runtime.getRuntime().addShutdownHook(new Thread() { + @Override + public void run() { + if (null != rocksDb) { + synchronized (ROCKSDB_BASE_PATH) { + if (null != rocksDb) { + rocksDb.close(); + rocksDb = null; + LOG.info("closed " + ROCKSDB_BASE_PATH + " rocksdb"); + } + } + } + } + }); } } } diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FormatUtils.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FormatUtils.java index 478f94cb71f73..443c29d8a5ec3 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FormatUtils.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FormatUtils.java @@ -18,6 +18,7 @@ package org.apache.hudi.table.format; +import org.apache.hudi.common.config.HoodieCommonConfig; import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.model.HoodieOperation; import org.apache.hudi.common.model.HoodieRecord; @@ -26,10 +27,12 @@ import org.apache.hudi.common.util.DefaultSizeEstimator; import org.apache.hudi.common.util.Functions; import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.collection.ExternalSpillableMap; import org.apache.hudi.common.util.queue.BoundedInMemoryExecutor; import org.apache.hudi.common.util.queue.BoundedInMemoryQueueProducer; import org.apache.hudi.common.util.queue.FunctionBasedQueueProducer; import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.configuration.FlinkOptions; import org.apache.hudi.hadoop.config.HoodieRealtimeConfig; import org.apache.hudi.table.format.mor.MergeOnReadInputSplit; import org.apache.hudi.util.StreamerUtil; @@ -120,9 +123,12 @@ private static Object getVal(IndexedRecord record, int pos) { public static HoodieMergedLogRecordScanner logScanner( MergeOnReadInputSplit split, Schema logSchema, - Configuration config, - boolean withOperationField) { - FileSystem fs = FSUtils.getFs(split.getTablePath(), config); + org.apache.flink.configuration.Configuration flinkConf, + Configuration hadoopConf) { + FileSystem fs = FSUtils.getFs(split.getTablePath(), hadoopConf); + final ExternalSpillableMap.DiskMapType diskMapType = ExternalSpillableMap.DiskMapType.valueOf( + flinkConf.getString(HoodieCommonConfig.SPILLABLE_DISK_MAP_TYPE.key(), + ExternalSpillableMap.DiskMapType.BITCASK.name()).toUpperCase(Locale.ROOT)); return HoodieMergedLogRecordScanner.newBuilder() .withFileSystem(fs) .withBasePath(split.getTablePath()) @@ -131,27 +137,29 @@ public static HoodieMergedLogRecordScanner logScanner( .withLatestInstantTime(split.getLatestCommit()) .withReadBlocksLazily( string2Boolean( - config.get(HoodieRealtimeConfig.COMPACTION_LAZY_BLOCK_READ_ENABLED_PROP, + flinkConf.getString(HoodieRealtimeConfig.COMPACTION_LAZY_BLOCK_READ_ENABLED_PROP, HoodieRealtimeConfig.DEFAULT_COMPACTION_LAZY_BLOCK_READ_ENABLED))) .withReverseReader(false) .withBufferSize( - config.getInt(HoodieRealtimeConfig.MAX_DFS_STREAM_BUFFER_SIZE_PROP, + flinkConf.getInteger(HoodieRealtimeConfig.MAX_DFS_STREAM_BUFFER_SIZE_PROP, HoodieRealtimeConfig.DEFAULT_MAX_DFS_STREAM_BUFFER_SIZE)) - .withMaxMemorySizeInBytes(split.getMaxCompactionMemoryInBytes()) + .withMaxMemorySizeInBytes(diskMapType == ExternalSpillableMap.DiskMapType.ROCKS_DB ? 0 : split.getMaxCompactionMemoryInBytes()) + .withDiskMapType(diskMapType) .withSpillableMapBasePath( - config.get(HoodieRealtimeConfig.SPILLABLE_MAP_BASE_PATH_PROP, + flinkConf.getString(HoodieRealtimeConfig.SPILLABLE_MAP_BASE_PATH_PROP, HoodieRealtimeConfig.DEFAULT_SPILLABLE_MAP_BASE_PATH)) .withInstantRange(split.getInstantRange()) - .withOperationField(withOperationField) + .withOperationField(flinkConf.getBoolean(FlinkOptions.CHANGELOG_ENABLED)) .build(); } private static HoodieUnMergedLogRecordScanner unMergedLogScanner( MergeOnReadInputSplit split, Schema logSchema, - Configuration config, + org.apache.flink.configuration.Configuration flinkConf, + Configuration hadoopConf, HoodieUnMergedLogRecordScanner.LogRecordScannerCallback callback) { - FileSystem fs = FSUtils.getFs(split.getTablePath(), config); + FileSystem fs = FSUtils.getFs(split.getTablePath(), hadoopConf); return HoodieUnMergedLogRecordScanner.newBuilder() .withFileSystem(fs) .withBasePath(split.getTablePath()) @@ -160,11 +168,11 @@ private static HoodieUnMergedLogRecordScanner unMergedLogScanner( .withLatestInstantTime(split.getLatestCommit()) .withReadBlocksLazily( string2Boolean( - config.get(HoodieRealtimeConfig.COMPACTION_LAZY_BLOCK_READ_ENABLED_PROP, + flinkConf.getString(HoodieRealtimeConfig.COMPACTION_LAZY_BLOCK_READ_ENABLED_PROP, HoodieRealtimeConfig.DEFAULT_COMPACTION_LAZY_BLOCK_READ_ENABLED))) .withReverseReader(false) .withBufferSize( - config.getInt(HoodieRealtimeConfig.MAX_DFS_STREAM_BUFFER_SIZE_PROP, + flinkConf.getInteger(HoodieRealtimeConfig.MAX_DFS_STREAM_BUFFER_SIZE_PROP, HoodieRealtimeConfig.DEFAULT_MAX_DFS_STREAM_BUFFER_SIZE)) .withInstantRange(split.getInstantRange()) .withLogRecordScannerCallback(callback) @@ -198,7 +206,7 @@ public BoundedMemoryRecords( Functions.noop()); // Consumer of this record reader this.iterator = this.executor.getQueue().iterator(); - this.scanner = FormatUtils.unMergedLogScanner(split, logSchema, hadoopConf, + this.scanner = FormatUtils.unMergedLogScanner(split, logSchema, flinkConf, hadoopConf, record -> executor.getQueue().insertRecord(record)); // Start reading and buffering this.executor.startProducers(); @@ -232,6 +240,7 @@ public static HoodieMergedLogRecordScanner logScanner( HoodieWriteConfig writeConfig, Configuration hadoopConf) { String basePath = writeConfig.getBasePath(); + final ExternalSpillableMap.DiskMapType diskMapType = writeConfig.getCommonConfig().getSpillableDiskMapType(); return HoodieMergedLogRecordScanner.newBuilder() .withFileSystem(FSUtils.getFs(basePath, hadoopConf)) .withBasePath(basePath) @@ -241,9 +250,9 @@ public static HoodieMergedLogRecordScanner logScanner( .withReadBlocksLazily(writeConfig.getCompactionLazyBlockReadEnabled()) .withReverseReader(false) .withBufferSize(writeConfig.getMaxDFSStreamBufferSize()) - .withMaxMemorySizeInBytes(writeConfig.getMaxMemoryPerPartitionMerge()) + .withMaxMemorySizeInBytes(diskMapType == ExternalSpillableMap.DiskMapType.ROCKS_DB ? 0 : writeConfig.getMaxMemoryPerPartitionMerge()) .withSpillableMapBasePath(writeConfig.getSpillableMapBasePath()) - .withDiskMapType(writeConfig.getCommonConfig().getSpillableDiskMapType()) + .withDiskMapType(diskMapType) .withBitCaskDiskMapCompressionEnabled(writeConfig.getCommonConfig().isBitCaskDiskMapCompressionEnabled()) .build(); } diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java index 4f2de3648ed56..7fc4d96e96d70 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java @@ -192,6 +192,7 @@ public void open(MergeOnReadInputSplit split) throws IOException { getLogFileIterator(split)); } else if (split.getMergeType().equals(FlinkOptions.REALTIME_PAYLOAD_COMBINE)) { this.iterator = new MergeIterator( + conf, hadoopConf, split, this.tableState.getRowType(), @@ -200,7 +201,6 @@ public void open(MergeOnReadInputSplit split) throws IOException { new Schema.Parser().parse(this.tableState.getRequiredAvroSchema()), this.requiredPos, this.emitDelete, - this.conf.getBoolean(FlinkOptions.CHANGELOG_ENABLED), this.tableState.getOperationPos(), getFullSchemaReader(split.getBasePath().get())); } else { @@ -323,7 +323,7 @@ private ClosableIterator getLogFileIterator(MergeOnReadInputSplit split final GenericRecordBuilder recordBuilder = new GenericRecordBuilder(requiredSchema); final AvroToRowDataConverters.AvroToRowDataConverter avroToRowDataConverter = AvroToRowDataConverters.createRowConverter(tableState.getRequiredRowType()); - final HoodieMergedLogRecordScanner scanner = FormatUtils.logScanner(split, tableSchema, hadoopConf, conf.getBoolean(FlinkOptions.CHANGELOG_ENABLED)); + final HoodieMergedLogRecordScanner scanner = FormatUtils.logScanner(split, tableSchema, conf, hadoopConf); final Iterator logRecordsKeyIterator = scanner.getRecords().keySet().iterator(); final int[] pkOffset = tableState.getPkOffsetsInRequired(); // flag saying whether the pk semantics has been dropped by user specified @@ -639,6 +639,7 @@ static class MergeIterator implements RecordIterator { private RowData currentRecord; MergeIterator( + Configuration conf, org.apache.hadoop.conf.Configuration hadoopConf, MergeOnReadInputSplit split, RowType tableRowType, @@ -647,12 +648,11 @@ static class MergeIterator implements RecordIterator { Schema requiredSchema, int[] requiredPos, boolean emitDelete, - boolean withOperationField, int operationPos, ParquetColumnarRowSplitReader reader) { // the reader should be with full schema this.tableSchema = tableSchema; this.reader = reader; - this.scanner = FormatUtils.logScanner(split, tableSchema, hadoopConf, withOperationField); + this.scanner = FormatUtils.logScanner(split, tableSchema, conf, hadoopConf); this.logKeysIterator = scanner.getRecords().keySet().iterator(); this.requiredSchema = requiredSchema; this.requiredPos = requiredPos; From aa7df7c99896831a050c515a793f55cf5c716789 Mon Sep 17 00:00:00 2001 From: Bo Date: Wed, 25 May 2022 19:24:52 +0800 Subject: [PATCH 2/5] [HUDI-4151] flink split_reader supports rocksdb --- .../util/collection/RocksDbDiskMap.java | 43 ++++++------------- 1 file changed, 13 insertions(+), 30 deletions(-) diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/collection/RocksDbDiskMap.java b/hudi-common/src/main/java/org/apache/hudi/common/util/collection/RocksDbDiskMap.java index b2508be4021de..21211a5700e51 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/collection/RocksDbDiskMap.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/collection/RocksDbDiskMap.java @@ -33,7 +33,6 @@ import java.util.Map; import java.util.Set; import java.util.Spliterators; -import java.util.UUID; import java.util.stream.Stream; import java.util.stream.StreamSupport; @@ -42,25 +41,21 @@ * All of the data is stored using the RocksDB implementation. */ public final class RocksDbDiskMap extends DiskMap { - private static final Logger LOG = LogManager.getLogger(RocksDbDiskMap.class); - // ColumnFamily allows partitioning data within RockDB, which allows // independent configuration and faster deletes across partitions // https://github.com/facebook/rocksdb/wiki/Column-Families // For this use case, we use a single static column family/ partition // - private static final String ROCKSDB_BASE_PATH = "rocksdb-diskmap"; + private static final String ROCKSDB_COL_FAMILY = "rocksdb-diskmap"; + private static final Logger LOG = LogManager.getLogger(RocksDbDiskMap.class); // Stores the key and corresponding value's latest metadata spilled to disk private final Set keySet; - private static RocksDBDAO rocksDb = null; - private String rocksdbColFamily; + private RocksDBDAO rocksDb; public RocksDbDiskMap(String rocksDbStoragePath) throws IOException { super(rocksDbStoragePath, ExternalSpillableMap.DiskMapType.ROCKS_DB.name()); this.keySet = new HashSet<>(); - this.rocksdbColFamily = "rocksdb-diskmap" + UUID.randomUUID().toString(); - getRocksDb().addColumnFamily(rocksdbColFamily); } @Override @@ -88,12 +83,12 @@ public R get(Object key) { if (!containsKey(key)) { return null; } - return getRocksDb().get(rocksdbColFamily, (T) key); + return getRocksDb().get(ROCKSDB_COL_FAMILY, (T) key); } @Override public R put(T key, R value) { - getRocksDb().put(rocksdbColFamily, key, value); + getRocksDb().put(ROCKSDB_COL_FAMILY, key, value); keySet.add(key); return value; } @@ -103,14 +98,14 @@ public R remove(Object key) { R value = get(key); if (value != null) { keySet.remove((T) key); - getRocksDb().delete(rocksdbColFamily, (T) key); + getRocksDb().delete(ROCKSDB_COL_FAMILY, (T) key); } return value; } @Override public void putAll(Map keyValues) { - getRocksDb().writeBatch(batch -> keyValues.forEach((key, value) -> getRocksDb().putInBatch(batch, rocksdbColFamily, key, value))); + getRocksDb().writeBatch(batch -> keyValues.forEach((key, value) -> getRocksDb().putInBatch(batch, ROCKSDB_COL_FAMILY, key, value))); keySet.addAll(keyValues.keySet()); } @@ -143,7 +138,7 @@ public Set> entrySet() { */ @Override public Iterator iterator() { - return getRocksDb().iterator(rocksdbColFamily); + return getRocksDb().iterator(ROCKSDB_COL_FAMILY); } @Override @@ -160,30 +155,18 @@ public long sizeOfFileOnDiskInBytes() { public void close() { keySet.clear(); if (null != rocksDb) { - rocksDb.dropColumnFamily(rocksdbColFamily); + rocksDb.close(); } + rocksDb = null; super.close(); } private RocksDBDAO getRocksDb() { if (null == rocksDb) { - synchronized (ROCKSDB_BASE_PATH) { + synchronized (this) { if (null == rocksDb) { - rocksDb = new RocksDBDAO(ROCKSDB_BASE_PATH, diskMapPath); - Runtime.getRuntime().addShutdownHook(new Thread() { - @Override - public void run() { - if (null != rocksDb) { - synchronized (ROCKSDB_BASE_PATH) { - if (null != rocksDb) { - rocksDb.close(); - rocksDb = null; - LOG.info("closed " + ROCKSDB_BASE_PATH + " rocksdb"); - } - } - } - } - }); + rocksDb = new RocksDBDAO(ROCKSDB_COL_FAMILY, diskMapPath); + rocksDb.addColumnFamily(ROCKSDB_COL_FAMILY); } } } From 36161bf2dea866c5aecc745af9109e66b2af0726 Mon Sep 17 00:00:00 2001 From: Bo Date: Wed, 25 May 2022 19:51:15 +0800 Subject: [PATCH 3/5] [HUDI-4151] flink split_reader supports rocksdb --- .../apache/hudi/table/format/FormatUtils.java | 22 ++++++------------- .../format/mor/MergeOnReadInputFormat.java | 13 +++++++---- 2 files changed, 16 insertions(+), 19 deletions(-) diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FormatUtils.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FormatUtils.java index 443c29d8a5ec3..3c28618381c44 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FormatUtils.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FormatUtils.java @@ -123,33 +123,25 @@ private static Object getVal(IndexedRecord record, int pos) { public static HoodieMergedLogRecordScanner logScanner( MergeOnReadInputSplit split, Schema logSchema, - org.apache.flink.configuration.Configuration flinkConf, + HoodieWriteConfig writeConfig, Configuration hadoopConf) { FileSystem fs = FSUtils.getFs(split.getTablePath(), hadoopConf); - final ExternalSpillableMap.DiskMapType diskMapType = ExternalSpillableMap.DiskMapType.valueOf( - flinkConf.getString(HoodieCommonConfig.SPILLABLE_DISK_MAP_TYPE.key(), - ExternalSpillableMap.DiskMapType.BITCASK.name()).toUpperCase(Locale.ROOT)); + final ExternalSpillableMap.DiskMapType diskMapType = writeConfig.getCommonConfig().getSpillableDiskMapType(); return HoodieMergedLogRecordScanner.newBuilder() .withFileSystem(fs) .withBasePath(split.getTablePath()) .withLogFilePaths(split.getLogPaths().get()) .withReaderSchema(logSchema) .withLatestInstantTime(split.getLatestCommit()) - .withReadBlocksLazily( - string2Boolean( - flinkConf.getString(HoodieRealtimeConfig.COMPACTION_LAZY_BLOCK_READ_ENABLED_PROP, - HoodieRealtimeConfig.DEFAULT_COMPACTION_LAZY_BLOCK_READ_ENABLED))) + .withReadBlocksLazily(writeConfig.getCompactionLazyBlockReadEnabled()) .withReverseReader(false) - .withBufferSize( - flinkConf.getInteger(HoodieRealtimeConfig.MAX_DFS_STREAM_BUFFER_SIZE_PROP, - HoodieRealtimeConfig.DEFAULT_MAX_DFS_STREAM_BUFFER_SIZE)) + .withBufferSize(writeConfig.getMaxDFSStreamBufferSize()) .withMaxMemorySizeInBytes(diskMapType == ExternalSpillableMap.DiskMapType.ROCKS_DB ? 0 : split.getMaxCompactionMemoryInBytes()) .withDiskMapType(diskMapType) - .withSpillableMapBasePath( - flinkConf.getString(HoodieRealtimeConfig.SPILLABLE_MAP_BASE_PATH_PROP, - HoodieRealtimeConfig.DEFAULT_SPILLABLE_MAP_BASE_PATH)) + .withSpillableMapBasePath(writeConfig.getSpillableMapBasePath()) .withInstantRange(split.getInstantRange()) - .withOperationField(flinkConf.getBoolean(FlinkOptions.CHANGELOG_ENABLED)) + .withOperationField(writeConfig.getProps().getBoolean(FlinkOptions.CHANGELOG_ENABLED.key(), + FlinkOptions.CHANGELOG_ENABLED.defaultValue())) .build(); } diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java index 7fc4d96e96d70..01a90fd15c960 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java @@ -25,6 +25,7 @@ import org.apache.hudi.common.table.log.InstantRange; import org.apache.hudi.common.util.ClosableIterator; import org.apache.hudi.common.util.Option; +import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.configuration.FlinkOptions; import org.apache.hudi.configuration.HadoopConfigurations; import org.apache.hudi.configuration.OptionsResolver; @@ -37,6 +38,7 @@ import org.apache.hudi.util.AvroToRowDataConverters; import org.apache.hudi.util.RowDataProjection; import org.apache.hudi.util.RowDataToAvroConverters; +import org.apache.hudi.util.StreamerUtil; import org.apache.hudi.util.StringToRowDataConverter; import org.apache.avro.Schema; @@ -85,6 +87,8 @@ public class MergeOnReadInputFormat private final Configuration conf; + private transient HoodieWriteConfig writeConfig; + private transient org.apache.hadoop.conf.Configuration hadoopConf; private final MergeOnReadTableState tableState; @@ -168,6 +172,7 @@ public void open(MergeOnReadInputSplit split) throws IOException { this.currentReadCount = 0L; this.closed = false; this.hadoopConf = HadoopConfigurations.getHadoopConf(this.conf); + this.writeConfig = StreamerUtil.getHoodieClientConfig(this.conf); if (!(split.getLogPaths().isPresent() && split.getLogPaths().get().size() > 0)) { if (split.getInstantRange() != null) { // base file only with commit time filtering @@ -192,7 +197,7 @@ public void open(MergeOnReadInputSplit split) throws IOException { getLogFileIterator(split)); } else if (split.getMergeType().equals(FlinkOptions.REALTIME_PAYLOAD_COMBINE)) { this.iterator = new MergeIterator( - conf, + writeConfig, hadoopConf, split, this.tableState.getRowType(), @@ -323,7 +328,7 @@ private ClosableIterator getLogFileIterator(MergeOnReadInputSplit split final GenericRecordBuilder recordBuilder = new GenericRecordBuilder(requiredSchema); final AvroToRowDataConverters.AvroToRowDataConverter avroToRowDataConverter = AvroToRowDataConverters.createRowConverter(tableState.getRequiredRowType()); - final HoodieMergedLogRecordScanner scanner = FormatUtils.logScanner(split, tableSchema, conf, hadoopConf); + final HoodieMergedLogRecordScanner scanner = FormatUtils.logScanner(split, tableSchema, writeConfig, hadoopConf); final Iterator logRecordsKeyIterator = scanner.getRecords().keySet().iterator(); final int[] pkOffset = tableState.getPkOffsetsInRequired(); // flag saying whether the pk semantics has been dropped by user specified @@ -639,7 +644,7 @@ static class MergeIterator implements RecordIterator { private RowData currentRecord; MergeIterator( - Configuration conf, + HoodieWriteConfig writeConfig, org.apache.hadoop.conf.Configuration hadoopConf, MergeOnReadInputSplit split, RowType tableRowType, @@ -652,7 +657,7 @@ static class MergeIterator implements RecordIterator { ParquetColumnarRowSplitReader reader) { // the reader should be with full schema this.tableSchema = tableSchema; this.reader = reader; - this.scanner = FormatUtils.logScanner(split, tableSchema, conf, hadoopConf); + this.scanner = FormatUtils.logScanner(split, tableSchema, writeConfig, hadoopConf); this.logKeysIterator = scanner.getRecords().keySet().iterator(); this.requiredSchema = requiredSchema; this.requiredPos = requiredPos; From d84585bf3018a6cb9795059cfa24a8f61affa023 Mon Sep 17 00:00:00 2001 From: Bo Date: Thu, 26 May 2022 09:06:06 +0800 Subject: [PATCH 4/5] [HUDI-4151] flink split_reader supports rocksdb --- .../org/apache/hudi/table/format/FormatUtils.java | 12 ++++-------- 1 file changed, 4 insertions(+), 8 deletions(-) diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FormatUtils.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FormatUtils.java index 3c28618381c44..2280ec97523b9 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FormatUtils.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FormatUtils.java @@ -18,7 +18,6 @@ package org.apache.hudi.table.format; -import org.apache.hudi.common.config.HoodieCommonConfig; import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.model.HoodieOperation; import org.apache.hudi.common.model.HoodieRecord; @@ -27,7 +26,6 @@ import org.apache.hudi.common.util.DefaultSizeEstimator; import org.apache.hudi.common.util.Functions; import org.apache.hudi.common.util.Option; -import org.apache.hudi.common.util.collection.ExternalSpillableMap; import org.apache.hudi.common.util.queue.BoundedInMemoryExecutor; import org.apache.hudi.common.util.queue.BoundedInMemoryQueueProducer; import org.apache.hudi.common.util.queue.FunctionBasedQueueProducer; @@ -126,7 +124,6 @@ public static HoodieMergedLogRecordScanner logScanner( HoodieWriteConfig writeConfig, Configuration hadoopConf) { FileSystem fs = FSUtils.getFs(split.getTablePath(), hadoopConf); - final ExternalSpillableMap.DiskMapType diskMapType = writeConfig.getCommonConfig().getSpillableDiskMapType(); return HoodieMergedLogRecordScanner.newBuilder() .withFileSystem(fs) .withBasePath(split.getTablePath()) @@ -136,8 +133,8 @@ public static HoodieMergedLogRecordScanner logScanner( .withReadBlocksLazily(writeConfig.getCompactionLazyBlockReadEnabled()) .withReverseReader(false) .withBufferSize(writeConfig.getMaxDFSStreamBufferSize()) - .withMaxMemorySizeInBytes(diskMapType == ExternalSpillableMap.DiskMapType.ROCKS_DB ? 0 : split.getMaxCompactionMemoryInBytes()) - .withDiskMapType(diskMapType) + .withMaxMemorySizeInBytes(split.getMaxCompactionMemoryInBytes()) + .withDiskMapType(writeConfig.getCommonConfig().getSpillableDiskMapType()) .withSpillableMapBasePath(writeConfig.getSpillableMapBasePath()) .withInstantRange(split.getInstantRange()) .withOperationField(writeConfig.getProps().getBoolean(FlinkOptions.CHANGELOG_ENABLED.key(), @@ -232,7 +229,6 @@ public static HoodieMergedLogRecordScanner logScanner( HoodieWriteConfig writeConfig, Configuration hadoopConf) { String basePath = writeConfig.getBasePath(); - final ExternalSpillableMap.DiskMapType diskMapType = writeConfig.getCommonConfig().getSpillableDiskMapType(); return HoodieMergedLogRecordScanner.newBuilder() .withFileSystem(FSUtils.getFs(basePath, hadoopConf)) .withBasePath(basePath) @@ -242,9 +238,9 @@ public static HoodieMergedLogRecordScanner logScanner( .withReadBlocksLazily(writeConfig.getCompactionLazyBlockReadEnabled()) .withReverseReader(false) .withBufferSize(writeConfig.getMaxDFSStreamBufferSize()) - .withMaxMemorySizeInBytes(diskMapType == ExternalSpillableMap.DiskMapType.ROCKS_DB ? 0 : writeConfig.getMaxMemoryPerPartitionMerge()) + .withMaxMemorySizeInBytes(writeConfig.getMaxMemoryPerPartitionMerge()) .withSpillableMapBasePath(writeConfig.getSpillableMapBasePath()) - .withDiskMapType(diskMapType) + .withDiskMapType(writeConfig.getCommonConfig().getSpillableDiskMapType()) .withBitCaskDiskMapCompressionEnabled(writeConfig.getCommonConfig().isBitCaskDiskMapCompressionEnabled()) .build(); } From 89ce5aae7139def8393ccd20dac2c07ea868d265 Mon Sep 17 00:00:00 2001 From: Bo Date: Fri, 27 May 2022 16:26:31 +0800 Subject: [PATCH 5/5] [HUDI-4151] flink split_reader supports rocksdb --- .../org/apache/hudi/table/format/FormatUtils.java | 6 +++--- .../table/format/mor/MergeOnReadInputFormat.java | 13 ++++--------- 2 files changed, 7 insertions(+), 12 deletions(-) diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FormatUtils.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FormatUtils.java index 2280ec97523b9..eb058597f8059 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FormatUtils.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FormatUtils.java @@ -121,8 +121,9 @@ private static Object getVal(IndexedRecord record, int pos) { public static HoodieMergedLogRecordScanner logScanner( MergeOnReadInputSplit split, Schema logSchema, - HoodieWriteConfig writeConfig, + org.apache.flink.configuration.Configuration flinkConf, Configuration hadoopConf) { + HoodieWriteConfig writeConfig = StreamerUtil.getHoodieClientConfig(flinkConf); FileSystem fs = FSUtils.getFs(split.getTablePath(), hadoopConf); return HoodieMergedLogRecordScanner.newBuilder() .withFileSystem(fs) @@ -137,8 +138,7 @@ public static HoodieMergedLogRecordScanner logScanner( .withDiskMapType(writeConfig.getCommonConfig().getSpillableDiskMapType()) .withSpillableMapBasePath(writeConfig.getSpillableMapBasePath()) .withInstantRange(split.getInstantRange()) - .withOperationField(writeConfig.getProps().getBoolean(FlinkOptions.CHANGELOG_ENABLED.key(), - FlinkOptions.CHANGELOG_ENABLED.defaultValue())) + .withOperationField(flinkConf.getBoolean(FlinkOptions.CHANGELOG_ENABLED)) .build(); } diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java index 01a90fd15c960..8eaa9d0b886f4 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java @@ -25,7 +25,6 @@ import org.apache.hudi.common.table.log.InstantRange; import org.apache.hudi.common.util.ClosableIterator; import org.apache.hudi.common.util.Option; -import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.configuration.FlinkOptions; import org.apache.hudi.configuration.HadoopConfigurations; import org.apache.hudi.configuration.OptionsResolver; @@ -38,7 +37,6 @@ import org.apache.hudi.util.AvroToRowDataConverters; import org.apache.hudi.util.RowDataProjection; import org.apache.hudi.util.RowDataToAvroConverters; -import org.apache.hudi.util.StreamerUtil; import org.apache.hudi.util.StringToRowDataConverter; import org.apache.avro.Schema; @@ -87,8 +85,6 @@ public class MergeOnReadInputFormat private final Configuration conf; - private transient HoodieWriteConfig writeConfig; - private transient org.apache.hadoop.conf.Configuration hadoopConf; private final MergeOnReadTableState tableState; @@ -172,7 +168,6 @@ public void open(MergeOnReadInputSplit split) throws IOException { this.currentReadCount = 0L; this.closed = false; this.hadoopConf = HadoopConfigurations.getHadoopConf(this.conf); - this.writeConfig = StreamerUtil.getHoodieClientConfig(this.conf); if (!(split.getLogPaths().isPresent() && split.getLogPaths().get().size() > 0)) { if (split.getInstantRange() != null) { // base file only with commit time filtering @@ -197,7 +192,7 @@ public void open(MergeOnReadInputSplit split) throws IOException { getLogFileIterator(split)); } else if (split.getMergeType().equals(FlinkOptions.REALTIME_PAYLOAD_COMBINE)) { this.iterator = new MergeIterator( - writeConfig, + conf, hadoopConf, split, this.tableState.getRowType(), @@ -328,7 +323,7 @@ private ClosableIterator getLogFileIterator(MergeOnReadInputSplit split final GenericRecordBuilder recordBuilder = new GenericRecordBuilder(requiredSchema); final AvroToRowDataConverters.AvroToRowDataConverter avroToRowDataConverter = AvroToRowDataConverters.createRowConverter(tableState.getRequiredRowType()); - final HoodieMergedLogRecordScanner scanner = FormatUtils.logScanner(split, tableSchema, writeConfig, hadoopConf); + final HoodieMergedLogRecordScanner scanner = FormatUtils.logScanner(split, tableSchema, conf, hadoopConf); final Iterator logRecordsKeyIterator = scanner.getRecords().keySet().iterator(); final int[] pkOffset = tableState.getPkOffsetsInRequired(); // flag saying whether the pk semantics has been dropped by user specified @@ -644,7 +639,7 @@ static class MergeIterator implements RecordIterator { private RowData currentRecord; MergeIterator( - HoodieWriteConfig writeConfig, + Configuration finkConf, org.apache.hadoop.conf.Configuration hadoopConf, MergeOnReadInputSplit split, RowType tableRowType, @@ -657,7 +652,7 @@ static class MergeIterator implements RecordIterator { ParquetColumnarRowSplitReader reader) { // the reader should be with full schema this.tableSchema = tableSchema; this.reader = reader; - this.scanner = FormatUtils.logScanner(split, tableSchema, writeConfig, hadoopConf); + this.scanner = FormatUtils.logScanner(split, tableSchema, finkConf, hadoopConf); this.logKeysIterator = scanner.getRecords().keySet().iterator(); this.requiredSchema = requiredSchema; this.requiredPos = requiredPos;