Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordMerger;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.ClosableIterator;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.io.storage.HoodieFileReader;
Expand All @@ -32,32 +33,31 @@
import org.apache.avro.Schema;

import java.io.IOException;
import java.util.Iterator;
import java.util.Map;
import java.util.Properties;

public class HoodieFileSliceReader<T> extends LogFileIterator<T> {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This class is going to be removed and all the usage will be replaced by HoodieFileGroupReader. So fixing the close behavior in a simple way in this PR.

private Option<Iterator<HoodieRecord>> baseFileIterator;
private HoodieMergedLogRecordScanner scanner;
private Schema schema;
private Properties props;
private final Option<HoodieFileReader> baseFileReader;
private final Option<ClosableIterator<HoodieRecord>> baseFileIterator;
private final Schema schema;
private final Properties props;

private TypedProperties payloadProps = new TypedProperties();
private Option<Pair<String, String>> simpleKeyGenFieldsOpt;
private Option<BaseKeyGenerator> keyGeneratorOpt;
private final TypedProperties payloadProps = new TypedProperties();
private final Option<Pair<String, String>> simpleKeyGenFieldsOpt;
private final Option<BaseKeyGenerator> keyGeneratorOpt;
Map<String, HoodieRecord> records;
HoodieRecordMerger merger;

public HoodieFileSliceReader(Option<HoodieFileReader> baseFileReader,
HoodieMergedLogRecordScanner scanner, Schema schema, String preCombineField, HoodieRecordMerger merger,
HoodieMergedLogRecordScanner scanner, Schema schema, String preCombineField, HoodieRecordMerger merger,
Properties props, Option<Pair<String, String>> simpleKeyGenFieldsOpt, Option<BaseKeyGenerator> keyGeneratorOpt) throws IOException {
super(scanner);
this.baseFileReader = baseFileReader;
if (baseFileReader.isPresent()) {
this.baseFileIterator = Option.of(baseFileReader.get().getRecordIterator(schema));
} else {
this.baseFileIterator = Option.empty();
}
this.scanner = scanner;
this.schema = schema;
this.merger = merger;
if (preCombineField != null) {
Expand All @@ -66,7 +66,7 @@ public HoodieFileSliceReader(Option<HoodieFileReader> baseFileReader,
this.props = props;
this.simpleKeyGenFieldsOpt = simpleKeyGenFieldsOpt;
this.keyGeneratorOpt = keyGeneratorOpt;
this.records = scanner.getRecords();
this.records = this.scanner.getRecords();
}

private boolean hasNextInternal() {
Expand Down Expand Up @@ -99,4 +99,14 @@ protected boolean doHasNext() {
return hasNextInternal();
}

@Override
public void close() {
super.close();
if (baseFileIterator.isPresent()) {
baseFileIterator.get().close();
}
if (baseFileReader.isPresent()) {
baseFileReader.get().close();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,26 @@ private static Map<String, String> getRecordKeyToSecondaryKey(HoodieTableMetaCli
Option<StoragePath> dataFilePath,
HoodieIndexDefinition indexDefinition,
String instantTime) throws Exception {
Map<String, String> recordKeyToSecondaryKey = new HashMap<>();
try (HoodieFileSliceReader fileSliceReader =
getFileSliceReader(metaClient, engineType, logFilePaths, tableSchema, partition, dataFilePath, instantTime)) {
// Collect the records from the iterator in a map by record key to secondary key
while (fileSliceReader.hasNext()) {
HoodieRecord record = (HoodieRecord) fileSliceReader.next();
String secondaryKey = getSecondaryKey(record, tableSchema, indexDefinition);
if (secondaryKey != null) {
// no delete records here
recordKeyToSecondaryKey.put(record.getRecordKey(tableSchema, HoodieRecord.RECORD_KEY_METADATA_FIELD), secondaryKey);
}
}
}
return recordKeyToSecondaryKey;
}

private static HoodieFileSliceReader getFileSliceReader(
HoodieTableMetaClient metaClient, EngineType engineType,
List<String> logFilePaths, Schema tableSchema, String partition,
Option<StoragePath> dataFilePath, String instantTime) throws IOException {
final String basePath = metaClient.getBasePath().toString();
final StorageConfiguration<?> storageConf = metaClient.getStorageConf();

Expand Down Expand Up @@ -207,19 +227,8 @@ private static Map<String, String> getRecordKeyToSecondaryKey(HoodieTableMetaCli
if (dataFilePath.isPresent()) {
baseFileReader = Option.of(HoodieIOFactory.getIOFactory(metaClient.getStorage()).getReaderFactory(recordMerger.getRecordType()).getFileReader(getReaderConfigs(storageConf), dataFilePath.get()));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This reader is never getting closed and it will lead to leaks.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. This is fixed now.

}
HoodieFileSliceReader fileSliceReader = new HoodieFileSliceReader(baseFileReader, mergedLogRecordScanner, tableSchema, metaClient.getTableConfig().getPreCombineField(), recordMerger,
return new HoodieFileSliceReader(baseFileReader, mergedLogRecordScanner, tableSchema, metaClient.getTableConfig().getPreCombineField(), recordMerger,
metaClient.getTableConfig().getProps(), Option.empty(), Option.empty());
// Collect the records from the iterator in a map by record key to secondary key
Map<String, String> recordKeyToSecondaryKey = new HashMap<>();
while (fileSliceReader.hasNext()) {
HoodieRecord record = (HoodieRecord) fileSliceReader.next();
String secondaryKey = getSecondaryKey(record, tableSchema, indexDefinition);
if (secondaryKey != null) {
// no delete records here
recordKeyToSecondaryKey.put(record.getRecordKey(tableSchema, HoodieRecord.RECORD_KEY_METADATA_FIELD), secondaryKey);
}
}
return recordKeyToSecondaryKey;
}

private static String getSecondaryKey(HoodieRecord record, Schema tableSchema, HoodieIndexDefinition indexDefinition) {
Expand Down Expand Up @@ -277,47 +286,14 @@ private static ClosableIterator<HoodieRecord> createSecondaryIndexGenerator(Hood
Option<StoragePath> dataFilePath,
HoodieIndexDefinition indexDefinition,
String instantTime) throws Exception {
final String basePath = metaClient.getBasePath().toString();
final StorageConfiguration<?> storageConf = metaClient.getStorageConf();

HoodieRecordMerger recordMerger = HoodieRecordUtils.createRecordMerger(
basePath,
engineType,
Collections.emptyList(),
metaClient.getTableConfig().getRecordMergeStrategyId());

HoodieMergedLogRecordScanner mergedLogRecordScanner = HoodieMergedLogRecordScanner.newBuilder()
.withStorage(metaClient.getStorage())
.withBasePath(metaClient.getBasePath())
.withLogFilePaths(logFilePaths)
.withReaderSchema(tableSchema)
.withLatestInstantTime(instantTime)
.withReverseReader(false)
.withMaxMemorySizeInBytes(storageConf.getLong(MAX_MEMORY_FOR_COMPACTION.key(), DEFAULT_MAX_MEMORY_FOR_SPILLABLE_MAP_IN_BYTES))
.withBufferSize(HoodieMetadataConfig.MAX_READER_BUFFER_SIZE_PROP.defaultValue())
.withSpillableMapBasePath(FileIOUtils.getDefaultSpillableMapBasePath())
.withPartition(partition)
.withOptimizedLogBlocksScan(storageConf.getBoolean("hoodie" + HoodieMetadataConfig.OPTIMIZED_LOG_BLOCKS_SCAN, false))
.withDiskMapType(storageConf.getEnum(SPILLABLE_DISK_MAP_TYPE.key(), SPILLABLE_DISK_MAP_TYPE.defaultValue()))
.withBitCaskDiskMapCompressionEnabled(storageConf.getBoolean(DISK_MAP_BITCASK_COMPRESSION_ENABLED.key(), DISK_MAP_BITCASK_COMPRESSION_ENABLED.defaultValue()))
.withRecordMerger(recordMerger)
.withTableMetaClient(metaClient)
.build();

Option<HoodieFileReader> baseFileReader = Option.empty();
if (dataFilePath.isPresent()) {
baseFileReader = Option.of(HoodieIOFactory.getIOFactory(metaClient.getStorage()).getReaderFactory(recordMerger.getRecordType()).getFileReader(getReaderConfigs(storageConf), dataFilePath.get()));
}
HoodieFileSliceReader fileSliceReader = new HoodieFileSliceReader(baseFileReader, mergedLogRecordScanner, tableSchema, metaClient.getTableConfig().getPreCombineField(), recordMerger,
metaClient.getTableConfig().getProps(),
Option.empty(), Option.empty());
ClosableIterator<HoodieRecord> fileSliceIterator = ClosableIterator.wrap(fileSliceReader);
return new ClosableIterator<HoodieRecord>() {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This closable iterator and the file slice reader are going to be closed properly after #13178 is landed.

private final HoodieFileSliceReader<HoodieRecord> fileSliceReader = getFileSliceReader(
metaClient, engineType, logFilePaths, tableSchema, partition, dataFilePath, instantTime);
private HoodieRecord nextValidRecord;

@Override
public void close() {
fileSliceIterator.close();
fileSliceReader.close();
}

@Override
Expand All @@ -334,8 +310,8 @@ public boolean hasNext() {
// NOTE: Delete record should not happen when initializing the secondary index i.e. when called from readSecondaryKeysFromFileSlices,
// because from that call, we get the merged records as of some committed instant. So, delete records must have been filtered out.
// Loop to find the next valid record or exhaust the iterator.
while (fileSliceIterator.hasNext()) {
HoodieRecord record = fileSliceIterator.next();
while (fileSliceReader.hasNext()) {
HoodieRecord record = fileSliceReader.next();
String secondaryKey = getSecondaryKey(record);
if (secondaryKey != null) {
nextValidRecord = createSecondaryIndexRecord(
Expand Down
Loading