diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieFileSliceReader.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieFileSliceReader.java index 19e319224fc6e..698e041f1d8e8 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieFileSliceReader.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieFileSliceReader.java @@ -24,6 +24,7 @@ import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecordMerger; import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.collection.ClosableIterator; import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.io.storage.HoodieFileReader; @@ -32,32 +33,31 @@ import org.apache.avro.Schema; import java.io.IOException; -import java.util.Iterator; import java.util.Map; import java.util.Properties; public class HoodieFileSliceReader extends LogFileIterator { - private Option> baseFileIterator; - private HoodieMergedLogRecordScanner scanner; - private Schema schema; - private Properties props; + private final Option baseFileReader; + private final Option> baseFileIterator; + private final Schema schema; + private final Properties props; - private TypedProperties payloadProps = new TypedProperties(); - private Option> simpleKeyGenFieldsOpt; - private Option keyGeneratorOpt; + private final TypedProperties payloadProps = new TypedProperties(); + private final Option> simpleKeyGenFieldsOpt; + private final Option keyGeneratorOpt; Map records; HoodieRecordMerger merger; public HoodieFileSliceReader(Option baseFileReader, - HoodieMergedLogRecordScanner scanner, Schema schema, String preCombineField, HoodieRecordMerger merger, + HoodieMergedLogRecordScanner scanner, Schema schema, String preCombineField, HoodieRecordMerger merger, Properties props, Option> simpleKeyGenFieldsOpt, Option keyGeneratorOpt) throws IOException { super(scanner); + this.baseFileReader = baseFileReader; if (baseFileReader.isPresent()) { this.baseFileIterator = Option.of(baseFileReader.get().getRecordIterator(schema)); } else { this.baseFileIterator = Option.empty(); } - this.scanner = scanner; this.schema = schema; this.merger = merger; if (preCombineField != null) { @@ -66,7 +66,7 @@ public HoodieFileSliceReader(Option baseFileReader, this.props = props; this.simpleKeyGenFieldsOpt = simpleKeyGenFieldsOpt; this.keyGeneratorOpt = keyGeneratorOpt; - this.records = scanner.getRecords(); + this.records = this.scanner.getRecords(); } private boolean hasNextInternal() { @@ -99,4 +99,14 @@ protected boolean doHasNext() { return hasNextInternal(); } + @Override + public void close() { + super.close(); + if (baseFileIterator.isPresent()) { + baseFileIterator.get().close(); + } + if (baseFileReader.isPresent()) { + baseFileReader.get().close(); + } + } } diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/SecondaryIndexRecordGenerationUtils.java b/hudi-common/src/main/java/org/apache/hudi/metadata/SecondaryIndexRecordGenerationUtils.java index 380c29b52baa1..c9d7130121974 100644 --- a/hudi-common/src/main/java/org/apache/hudi/metadata/SecondaryIndexRecordGenerationUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/metadata/SecondaryIndexRecordGenerationUtils.java @@ -176,6 +176,26 @@ private static Map getRecordKeyToSecondaryKey(HoodieTableMetaCli Option dataFilePath, HoodieIndexDefinition indexDefinition, String instantTime) throws Exception { + Map recordKeyToSecondaryKey = new HashMap<>(); + try (HoodieFileSliceReader fileSliceReader = + getFileSliceReader(metaClient, engineType, logFilePaths, tableSchema, partition, dataFilePath, instantTime)) { + // Collect the records from the iterator in a map by record key to secondary key + while (fileSliceReader.hasNext()) { + HoodieRecord record = (HoodieRecord) fileSliceReader.next(); + String secondaryKey = getSecondaryKey(record, tableSchema, indexDefinition); + if (secondaryKey != null) { + // no delete records here + recordKeyToSecondaryKey.put(record.getRecordKey(tableSchema, HoodieRecord.RECORD_KEY_METADATA_FIELD), secondaryKey); + } + } + } + return recordKeyToSecondaryKey; + } + + private static HoodieFileSliceReader getFileSliceReader( + HoodieTableMetaClient metaClient, EngineType engineType, + List logFilePaths, Schema tableSchema, String partition, + Option dataFilePath, String instantTime) throws IOException { final String basePath = metaClient.getBasePath().toString(); final StorageConfiguration storageConf = metaClient.getStorageConf(); @@ -207,19 +227,8 @@ private static Map getRecordKeyToSecondaryKey(HoodieTableMetaCli if (dataFilePath.isPresent()) { baseFileReader = Option.of(HoodieIOFactory.getIOFactory(metaClient.getStorage()).getReaderFactory(recordMerger.getRecordType()).getFileReader(getReaderConfigs(storageConf), dataFilePath.get())); } - HoodieFileSliceReader fileSliceReader = new HoodieFileSliceReader(baseFileReader, mergedLogRecordScanner, tableSchema, metaClient.getTableConfig().getPreCombineField(), recordMerger, + return new HoodieFileSliceReader(baseFileReader, mergedLogRecordScanner, tableSchema, metaClient.getTableConfig().getPreCombineField(), recordMerger, metaClient.getTableConfig().getProps(), Option.empty(), Option.empty()); - // Collect the records from the iterator in a map by record key to secondary key - Map recordKeyToSecondaryKey = new HashMap<>(); - while (fileSliceReader.hasNext()) { - HoodieRecord record = (HoodieRecord) fileSliceReader.next(); - String secondaryKey = getSecondaryKey(record, tableSchema, indexDefinition); - if (secondaryKey != null) { - // no delete records here - recordKeyToSecondaryKey.put(record.getRecordKey(tableSchema, HoodieRecord.RECORD_KEY_METADATA_FIELD), secondaryKey); - } - } - return recordKeyToSecondaryKey; } private static String getSecondaryKey(HoodieRecord record, Schema tableSchema, HoodieIndexDefinition indexDefinition) { @@ -277,47 +286,14 @@ private static ClosableIterator createSecondaryIndexGenerator(Hood Option dataFilePath, HoodieIndexDefinition indexDefinition, String instantTime) throws Exception { - final String basePath = metaClient.getBasePath().toString(); - final StorageConfiguration storageConf = metaClient.getStorageConf(); - - HoodieRecordMerger recordMerger = HoodieRecordUtils.createRecordMerger( - basePath, - engineType, - Collections.emptyList(), - metaClient.getTableConfig().getRecordMergeStrategyId()); - - HoodieMergedLogRecordScanner mergedLogRecordScanner = HoodieMergedLogRecordScanner.newBuilder() - .withStorage(metaClient.getStorage()) - .withBasePath(metaClient.getBasePath()) - .withLogFilePaths(logFilePaths) - .withReaderSchema(tableSchema) - .withLatestInstantTime(instantTime) - .withReverseReader(false) - .withMaxMemorySizeInBytes(storageConf.getLong(MAX_MEMORY_FOR_COMPACTION.key(), DEFAULT_MAX_MEMORY_FOR_SPILLABLE_MAP_IN_BYTES)) - .withBufferSize(HoodieMetadataConfig.MAX_READER_BUFFER_SIZE_PROP.defaultValue()) - .withSpillableMapBasePath(FileIOUtils.getDefaultSpillableMapBasePath()) - .withPartition(partition) - .withOptimizedLogBlocksScan(storageConf.getBoolean("hoodie" + HoodieMetadataConfig.OPTIMIZED_LOG_BLOCKS_SCAN, false)) - .withDiskMapType(storageConf.getEnum(SPILLABLE_DISK_MAP_TYPE.key(), SPILLABLE_DISK_MAP_TYPE.defaultValue())) - .withBitCaskDiskMapCompressionEnabled(storageConf.getBoolean(DISK_MAP_BITCASK_COMPRESSION_ENABLED.key(), DISK_MAP_BITCASK_COMPRESSION_ENABLED.defaultValue())) - .withRecordMerger(recordMerger) - .withTableMetaClient(metaClient) - .build(); - - Option baseFileReader = Option.empty(); - if (dataFilePath.isPresent()) { - baseFileReader = Option.of(HoodieIOFactory.getIOFactory(metaClient.getStorage()).getReaderFactory(recordMerger.getRecordType()).getFileReader(getReaderConfigs(storageConf), dataFilePath.get())); - } - HoodieFileSliceReader fileSliceReader = new HoodieFileSliceReader(baseFileReader, mergedLogRecordScanner, tableSchema, metaClient.getTableConfig().getPreCombineField(), recordMerger, - metaClient.getTableConfig().getProps(), - Option.empty(), Option.empty()); - ClosableIterator fileSliceIterator = ClosableIterator.wrap(fileSliceReader); return new ClosableIterator() { + private final HoodieFileSliceReader fileSliceReader = getFileSliceReader( + metaClient, engineType, logFilePaths, tableSchema, partition, dataFilePath, instantTime); private HoodieRecord nextValidRecord; @Override public void close() { - fileSliceIterator.close(); + fileSliceReader.close(); } @Override @@ -334,8 +310,8 @@ public boolean hasNext() { // NOTE: Delete record should not happen when initializing the secondary index i.e. when called from readSecondaryKeysFromFileSlices, // because from that call, we get the merged records as of some committed instant. So, delete records must have been filtered out. // Loop to find the next valid record or exhaust the iterator. - while (fileSliceIterator.hasNext()) { - HoodieRecord record = fileSliceIterator.next(); + while (fileSliceReader.hasNext()) { + HoodieRecord record = fileSliceReader.next(); String secondaryKey = getSecondaryKey(record); if (secondaryKey != null) { nextValidRecord = createSecondaryIndexRecord(