Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1492,6 +1492,10 @@ public boolean isMetadataIndexColumnStatsForAllColumnsEnabled() {
return isMetadataTableEnabled() && getMetadataConfig().isMetadataColumnStatsIndexForAllColumnsEnabled();
}

public int getColumnStatsIndexParallelism() {
return metadataConfig.getColumnStatsIndexParallelism();
}

public int getBloomIndexKeysPerBucket() {
return getInt(HoodieIndexConfig.BLOOM_INDEX_KEYS_PER_BUCKET);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.BaseFile;
import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieColumnRangeMetadata;
import org.apache.hudi.common.model.HoodieDeltaWriteStat;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieLogFile;
Expand Down Expand Up @@ -71,6 +72,9 @@
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;

import static org.apache.hudi.metadata.HoodieTableMetadataUtil.accumulateColumnRanges;
import static org.apache.hudi.metadata.HoodieTableMetadataUtil.aggregateColumnStats;

/**
* IO Operation to append data onto an existing file.
*/
Expand Down Expand Up @@ -320,7 +324,7 @@ private void updateWriteStatus(HoodieDeltaWriteStat stat, AppendResult result) {
statuses.add(this.writeStatus);
}

private void processAppendResult(AppendResult result) {
private void processAppendResult(AppendResult result, List<IndexedRecord> recordList) {
HoodieDeltaWriteStat stat = (HoodieDeltaWriteStat) this.writeStatus.getStat();

if (stat.getPath() == null) {
Expand All @@ -339,6 +343,19 @@ private void processAppendResult(AppendResult result) {
updateWriteStatus(stat, result);
}

if (config.isMetadataIndexColumnStatsForAllColumnsEnabled()) {
Map<String, HoodieColumnRangeMetadata<Comparable>> columnRangeMap = stat.getRecordsStats().isPresent()
? stat.getRecordsStats().get().getStats() : new HashMap<>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@codope that's what i was referring to with my comments regarding increased complexity in respect to RecordStats. Why not just have stat.getRecordsStats().get() instead?

Now, when reading this code reader actually need to understand what is this additional getStats() call is about and why it's needed, while w/o it the call-site is crystal clear and doesn't require scanning through of getRecordStats to understand what's going on

final String filePath = stat.getPath();
// initialize map of column name to map of stats name to stats value
Map<String, Map<String, Object>> columnToStats = new HashMap<>();
writeSchemaWithMetaFields.getFields().forEach(field -> columnToStats.putIfAbsent(field.name(), new HashMap<>()));
// collect stats for columns at once per record and keep iterating through every record to eventually find col stats for all fields.
recordList.forEach(record -> aggregateColumnStats(record, writeSchemaWithMetaFields, columnToStats, config.isConsistentLogicalTimestampEnabled()));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we, instead of placing iteration and aggregation into separate methods, consolidate them in aggregateColumnStats so that its signature actually is:

Map<String, Map<...>> aggregateColumnStats(records, writeSchema, ...)

writeSchemaWithMetaFields.getFields().forEach(field -> accumulateColumnRanges(field, filePath, columnRangeMap, columnToStats));
stat.setRecordsStats(new HoodieDeltaWriteStat.RecordsStats<>(columnRangeMap));
}

resetWriteCounts();
assert stat.getRuntimeStats() != null;
LOG.info(String.format("AppendHandle for partitionPath %s filePath %s, took %d ms.", partitionPath,
Expand Down Expand Up @@ -376,7 +393,7 @@ protected void appendDataAndDeleteBlocks(Map<HeaderMetadataType, String> header)

if (blocks.size() > 0) {
AppendResult appendResult = writer.appendBlocks(blocks);
processAppendResult(appendResult);
processAppendResult(appendResult, recordList);
recordList.clear();
keysToDelete.clear();
}
Expand Down Expand Up @@ -419,7 +436,7 @@ public List<WriteStatus> close() {
// update final size, once for all log files
// TODO we can actually deduce file size purely from AppendResult (based on offset and size
// of the appended block)
for (WriteStatus status: statuses) {
for (WriteStatus status : statuses) {
long logFileSize = FSUtils.getFileSize(fs, new Path(config.getBasePath(), status.getStat().getPath()));
status.getStat().setFileSizeInBytes(logFileSize);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,9 @@
package org.apache.hudi.io;

import org.apache.hudi.common.bloom.BloomFilter;
import org.apache.hudi.common.bloom.BloomFilterTypeCode;
import org.apache.hudi.common.bloom.HoodieDynamicBoundedBloomFilter;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.util.HoodieTimer;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieIndexException;
Expand All @@ -39,8 +34,6 @@
import org.apache.log4j.Logger;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;

Expand All @@ -53,53 +46,30 @@ public class HoodieKeyLookupHandle<T extends HoodieRecordPayload, I, K, O> exten

private final BloomFilter bloomFilter;
private final List<String> candidateRecordKeys;
private final boolean useMetadataTableIndex;
private Option<String> fileName = Option.empty();
private long totalKeysChecked;

public HoodieKeyLookupHandle(HoodieWriteConfig config, HoodieTable<T, I, K, O> hoodieTable,
Pair<String, String> partitionPathFileIDPair) {
this(config, hoodieTable, partitionPathFileIDPair, Option.empty(), false);
}

public HoodieKeyLookupHandle(HoodieWriteConfig config, HoodieTable<T, I, K, O> hoodieTable,
Pair<String, String> partitionPathFileIDPair, Option<String> fileName,
boolean useMetadataTableIndex) {
super(config, hoodieTable, partitionPathFileIDPair);
this.candidateRecordKeys = new ArrayList<>();
this.totalKeysChecked = 0;
if (fileName.isPresent()) {
ValidationUtils.checkArgument(FSUtils.getFileId(fileName.get()).equals(getFileId()),
"File name '" + fileName.get() + "' doesn't match this lookup handle fileid '" + getFileId() + "'");
this.fileName = fileName;
}
this.useMetadataTableIndex = useMetadataTableIndex;
this.bloomFilter = getBloomFilter();
}

private BloomFilter getBloomFilter() {
BloomFilter bloomFilter = null;
HoodieTimer timer = new HoodieTimer().startTimer();
try {
if (this.useMetadataTableIndex) {
ValidationUtils.checkArgument(this.fileName.isPresent(),
"File name not available to fetch bloom filter from the metadata table index.");
Option<ByteBuffer> bloomFilterByteBuffer =
hoodieTable.getMetadataTable().getBloomFilter(partitionPathFileIDPair.getLeft(), fileName.get());
if (!bloomFilterByteBuffer.isPresent()) {
throw new HoodieIndexException("BloomFilter missing for " + partitionPathFileIDPair.getRight());
}
bloomFilter =
new HoodieDynamicBoundedBloomFilter(StandardCharsets.UTF_8.decode(bloomFilterByteBuffer.get()).toString(),
BloomFilterTypeCode.DYNAMIC_V0);
if (config.isMetadataBloomFilterIndexEnabled()) {
bloomFilter = hoodieTable.getMetadataTable().getBloomFilter(partitionPathFileIDPair.getLeft(), partitionPathFileIDPair.getRight())
.orElseThrow(() -> new HoodieIndexException("BloomFilter missing for " + partitionPathFileIDPair.getRight()));
} else {
try (HoodieFileReader reader = createNewFileReader()) {
bloomFilter = reader.readBloomFilter();
}
}
} catch (IOException e) {
throw new HoodieIndexException(String.format("Error reading bloom filter from %s/%s - %s",
getPartitionPathFileIDPair().getLeft(), this.fileName, e));
throw new HoodieIndexException(String.format("Error reading bloom filter from %s", getPartitionPathFileIDPair()), e);
}
LOG.info(String.format("Read bloom filter from %s in %d ms", partitionPathFileIDPair, timer.endTimer()));
return bloomFilter;
Expand Down
Loading