Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1453,6 +1453,10 @@ public boolean isMetadataIndexColumnStatsForAllColumnsEnabled() {
return isMetadataTableEnabled() && getMetadataConfig().isMetadataColumnStatsIndexForAllColumnsEnabled();
}

public int getColumnStatsIndexParallelism() {
return metadataConfig.getColumnStatsIndexParallelism();
}

public int getBloomIndexKeysPerBucket() {
return getInt(HoodieIndexConfig.BLOOM_INDEX_KEYS_PER_BUCKET);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,14 @@

package org.apache.hudi.io;

import org.apache.avro.Schema;
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.engine.TaskContextSupplier;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.BaseFile;
import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieColumnRangeMetadata;
import org.apache.hudi.common.model.HoodieDeltaWriteStat;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieLogFile;
Expand Down Expand Up @@ -52,6 +54,7 @@
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieAppendException;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.exception.HoodieUpsertException;
import org.apache.hudi.table.HoodieTable;

Expand All @@ -71,6 +74,8 @@
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;

import static org.apache.hudi.common.model.HoodieColumnRangeMetadata.COLUMN_RANGE_MERGE_FUNCTION;

/**
* IO Operation to append data onto an existing file.
*/
Expand Down Expand Up @@ -320,7 +325,48 @@ private void updateWriteStatus(HoodieDeltaWriteStat stat, AppendResult result) {
statuses.add(this.writeStatus);
}

private void processAppendResult(AppendResult result) {
/**
* Get column statistics for the records part of this append handle.
*
* @param filePath - Log file that records are part of
* @param recordList - List of records appended to the log for which column statistics is needed for
* @param columnRangeMap - Output map to accumulate the column statistics for the records
*/
private void getRecordsStats(final String filePath, List<IndexedRecord> recordList,
Map<String, HoodieColumnRangeMetadata<Comparable>> columnRangeMap) {
recordList.forEach(record -> accumulateColumnRanges(record, writeSchemaWithMetaFields, filePath, columnRangeMap, config.isConsistentLogicalTimestampEnabled()));
}

/**
* Accumulate column range statistics for the requested record.
*
* @param record - Record to get the column range statistics for
* @param schema - Schema for the record
* @param filePath - File that record belongs to
*/
private static void accumulateColumnRanges(IndexedRecord record, Schema schema, String filePath,
Map<String, HoodieColumnRangeMetadata<Comparable>> columnRangeMap, boolean consistentLogicalTimestampEnabled) {
if (!(record instanceof GenericRecord)) {
throw new HoodieIOException("Record is not a generic type to get column range metadata!");
}
schema.getFields().forEach(field -> {
final String fieldVal = HoodieAvroUtils.getNestedFieldValAsString((GenericRecord) record, field.name(), true, consistentLogicalTimestampEnabled);
final int fieldSize = fieldVal == null ? 0 : fieldVal.length();
final HoodieColumnRangeMetadata<Comparable> fieldRange = new HoodieColumnRangeMetadata<>(
filePath,
field.name(),
fieldVal,
fieldVal,
fieldVal == null ? 1 : 0, // null count
fieldVal == null ? 0 : 1, // value count
fieldSize,
fieldSize
);
columnRangeMap.merge(field.name(), fieldRange, COLUMN_RANGE_MERGE_FUNCTION);
});
}

private void processAppendResult(AppendResult result, List<IndexedRecord> recordList) {
HoodieDeltaWriteStat stat = (HoodieDeltaWriteStat) this.writeStatus.getStat();

if (stat.getPath() == null) {
Expand All @@ -339,6 +385,13 @@ private void processAppendResult(AppendResult result) {
updateWriteStatus(stat, result);
}

if (config.isMetadataIndexColumnStatsForAllColumnsEnabled()) {
Map<String, HoodieColumnRangeMetadata<Comparable>> columnRangeMap = stat.getRecordsStats().isPresent()
? stat.getRecordsStats().get().getStats() : new HashMap<>();
getRecordsStats(stat.getPath(), recordList, columnRangeMap);
stat.setRecordsStats(new HoodieDeltaWriteStat.RecordsStats<>(columnRangeMap));
}

resetWriteCounts();
assert stat.getRuntimeStats() != null;
LOG.info(String.format("AppendHandle for partitionPath %s filePath %s, took %d ms.", partitionPath,
Expand Down Expand Up @@ -376,7 +429,7 @@ protected void appendDataAndDeleteBlocks(Map<HeaderMetadataType, String> header)

if (blocks.size() > 0) {
AppendResult appendResult = writer.appendBlocks(blocks);
processAppendResult(appendResult);
processAppendResult(appendResult, recordList);
recordList.clear();
keysToDelete.clear();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,8 @@
package org.apache.hudi.io;

import org.apache.hudi.common.bloom.BloomFilter;
import org.apache.hudi.common.bloom.BloomFilterTypeCode;
import org.apache.hudi.common.bloom.HoodieDynamicBoundedBloomFilter;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.util.HoodieTimer;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieIndexException;
Expand All @@ -39,8 +33,6 @@
import org.apache.log4j.Logger;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;

Expand All @@ -53,56 +45,23 @@ public class HoodieKeyLookupHandle<T extends HoodieRecordPayload, I, K, O> exten

private final BloomFilter bloomFilter;
private final List<String> candidateRecordKeys;
private final boolean useMetadataTableIndex;
private Option<String> fileName = Option.empty();
private long totalKeysChecked;

public HoodieKeyLookupHandle(HoodieWriteConfig config, HoodieTable<T, I, K, O> hoodieTable,
Pair<String, String> partitionPathFileIDPair) {
this(config, hoodieTable, partitionPathFileIDPair, Option.empty(), false);
}

public HoodieKeyLookupHandle(HoodieWriteConfig config, HoodieTable<T, I, K, O> hoodieTable,
Pair<String, String> partitionPathFileIDPair, Option<String> fileName,
boolean useMetadataTableIndex) {
super(config, hoodieTable, partitionPathFileIDPair);
this.candidateRecordKeys = new ArrayList<>();
this.totalKeysChecked = 0;
if (fileName.isPresent()) {
ValidationUtils.checkArgument(FSUtils.getFileId(fileName.get()).equals(getFileId()),
"File name '" + fileName.get() + "' doesn't match this lookup handle fileid '" + getFileId() + "'");
this.fileName = fileName;
}
this.useMetadataTableIndex = useMetadataTableIndex;
this.bloomFilter = getBloomFilter();
}

private BloomFilter getBloomFilter() {
BloomFilter bloomFilter = null;
HoodieTimer timer = new HoodieTimer().startTimer();
try {
if (this.useMetadataTableIndex) {
ValidationUtils.checkArgument(this.fileName.isPresent(),
"File name not available to fetch bloom filter from the metadata table index.");
Option<ByteBuffer> bloomFilterByteBuffer =
hoodieTable.getMetadataTable().getBloomFilter(partitionPathFileIDPair.getLeft(), fileName.get());
if (!bloomFilterByteBuffer.isPresent()) {
throw new HoodieIndexException("BloomFilter missing for " + partitionPathFileIDPair.getRight());
}
bloomFilter =
new HoodieDynamicBoundedBloomFilter(StandardCharsets.UTF_8.decode(bloomFilterByteBuffer.get()).toString(),
BloomFilterTypeCode.DYNAMIC_V0);
} else {
try (HoodieFileReader reader = createNewFileReader()) {
bloomFilter = reader.readBloomFilter();
}
}
try (HoodieFileReader reader = createNewFileReader()) {
LOG.debug(String.format("Read bloom filter from %s", partitionPathFileIDPair));
return reader.readBloomFilter();
} catch (IOException e) {
throw new HoodieIndexException(String.format("Error reading bloom filter from %s/%s - %s",
getPartitionPathFileIDPair().getLeft(), this.fileName, e));
throw new HoodieIndexException(String.format("Error reading bloom filter from %s", getPartitionPathFileIDPair()), e);
}
LOG.info(String.format("Read bloom filter from %s in %d ms", partitionPathFileIDPair, timer.endTimer()));
return bloomFilter;
}

/**
Expand Down
Loading