Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
16 commits
Select commit Hold shift + click to select a range
624790d
[HUDI-1295] Metadata Index - Bloom filter and Column stats index to s…
manojpec Dec 17, 2021
eaa2c85
[HUDI-1295] Metadata Index - Bloom filter and Column stats index to s…
manojpec Dec 23, 2021
feee40d
[HUDI-1295] Metadata Index - Bloom filter and Column stats index to s…
manojpec Jan 4, 2022
f112f90
[HUDI-1295] Metadata Index - Bloom filter and Column stats index to s…
manojpec Jan 5, 2022
7a44b1c
[HUDI-1295] Metadata Index - Bloom filter and Column stats index to s…
manojpec Jan 6, 2022
ab7a5ee
[HUDI-1295] Metadata Index - Bloom filter and Column stats index to s…
manojpec Jan 10, 2022
760c75f
[HUDI-1295] Metadata Index - Bloom filter and Column stats index to s…
manojpec Jan 14, 2022
e5c8f82
[HUDI-1295] Metadata Index - Bloom filter and Column stats index to s…
manojpec Jan 14, 2022
3045b76
[HUDI-1295] Metadata Index - Bloom filter and Column stats index to s…
manojpec Jan 17, 2022
2c61c59
[HUDI-1295] Metadata Index - Bloom filter and Column stats index to s…
manojpec Jan 18, 2022
01cd808
[HUDI-1295] Metadata Index - Bloom filter and Column stats index to s…
manojpec Jan 20, 2022
da4cf69
[HUDI-1295] Metadata Index - Bloom filter and Column stats index to s…
manojpec Jan 26, 2022
0db738f
[HUDI-1295] Metadata Index - Bloom filter and Column stats index to s…
manojpec Jan 27, 2022
5f5ba30
[HUDI-1295] Metadata Index - Bloom filter and Column stats index to s…
manojpec Jan 27, 2022
bbe0f56
[HUDI-1295] Metadata Index - Bloom filter and Column stats index to s…
manojpec Jan 29, 2022
e489045
[HUDI-1295] Metadata Index - Bloom filter and Column stats index to s…
manojpec Feb 3, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1437,6 +1437,14 @@ public boolean useBloomIndexBucketizedChecking() {
return getBoolean(HoodieIndexConfig.BLOOM_INDEX_BUCKETIZED_CHECKING);
}

public boolean isMetadataBloomFilterIndexEnabled() {
return isMetadataTableEnabled() && getMetadataConfig().isBloomFilterIndexEnabled();
}

public boolean isMetadataIndexColumnStatsForAllColumnsEnabled() {
return isMetadataTableEnabled() && getMetadataConfig().isMetadataColumnStatsIndexForAllColumnsEnabled();
}

public int getBloomIndexKeysPerBucket() {
return getInt(HoodieIndexConfig.BLOOM_INDEX_KEYS_PER_BUCKET);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,30 @@

package org.apache.hudi.index;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordLocation;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.util.HoodieTimer;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.exception.HoodieIndexException;
import org.apache.hudi.io.storage.HoodieFileReader;
import org.apache.hudi.io.storage.HoodieFileReaderFactory;
import org.apache.hudi.table.HoodieTable;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.TreeSet;

import static java.util.stream.Collectors.toList;

Expand All @@ -37,6 +50,8 @@
*/
public class HoodieIndexUtils {

private static final Logger LOG = LogManager.getLogger(HoodieIndexUtils.class);

/**
* Fetches Pair of partition path and {@link HoodieBaseFile}s for interested partitions.
*
Expand Down Expand Up @@ -101,4 +116,34 @@ record = new HoodieRecord<>(inputRecord);
}
return record;
}

/**
* Given a list of row keys and one file, return only row keys existing in that file.
*
* @param filePath - File to filter keys from
* @param candidateRecordKeys - Candidate keys to filter
* @return List of candidate keys that are available in the file
*/
public static List<String> filterKeysFromFile(Path filePath, List<String> candidateRecordKeys,
Configuration configuration) throws HoodieIndexException {
ValidationUtils.checkArgument(FSUtils.isBaseFile(filePath));
List<String> foundRecordKeys = new ArrayList<>();
try {
// Load all rowKeys from the file, to double-confirm
if (!candidateRecordKeys.isEmpty()) {
HoodieTimer timer = new HoodieTimer().startTimer();
HoodieFileReader fileReader = HoodieFileReaderFactory.getFileReader(configuration, filePath);
Set<String> fileRowKeys = fileReader.filterRowKeys(new TreeSet<>(candidateRecordKeys));
foundRecordKeys.addAll(fileRowKeys);
LOG.info(String.format("Checked keys against file %s, in %d ms. #candidates (%d) #found (%d)", filePath,
timer.endTimer(), candidateRecordKeys.size(), foundRecordKeys.size()));
if (LOG.isDebugEnabled()) {
LOG.debug("Keys matching for file " + filePath + " => " + foundRecordKeys);
}
}
} catch (Exception e) {
throw new HoodieIndexException("Error checking candidate keys against file.", e);
}
return foundRecordKeys;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIndexException;
import org.apache.hudi.io.HoodieKeyLookupHandle;
import org.apache.hudi.io.HoodieKeyLookupHandle.KeyLookupResult;
import org.apache.hudi.io.HoodieKeyLookupResult;
import org.apache.hudi.table.HoodieTable;

import java.util.function.Function;
Expand All @@ -37,7 +37,7 @@
* Function performing actual checking of list containing (fileId, hoodieKeys) against the actual files.
*/
public class HoodieBaseBloomIndexCheckFunction
implements Function<Iterator<Pair<String, HoodieKey>>, Iterator<List<KeyLookupResult>>> {
implements Function<Iterator<Pair<String, HoodieKey>>, Iterator<List<HoodieKeyLookupResult>>> {

private final HoodieTable hoodieTable;

Expand All @@ -49,11 +49,11 @@ public HoodieBaseBloomIndexCheckFunction(HoodieTable hoodieTable, HoodieWriteCon
}

@Override
public Iterator<List<KeyLookupResult>> apply(Iterator<Pair<String, HoodieKey>> filePartitionRecordKeyTripletItr) {
public Iterator<List<HoodieKeyLookupResult>> apply(Iterator<Pair<String, HoodieKey>> filePartitionRecordKeyTripletItr) {
return new LazyKeyCheckIterator(filePartitionRecordKeyTripletItr);
}

class LazyKeyCheckIterator extends LazyIterableIterator<Pair<String, HoodieKey>, List<KeyLookupResult>> {
class LazyKeyCheckIterator extends LazyIterableIterator<Pair<String, HoodieKey>, List<HoodieKeyLookupResult>> {

private HoodieKeyLookupHandle keyLookupHandle;

Expand All @@ -66,8 +66,8 @@ protected void start() {
}

@Override
protected List<KeyLookupResult> computeNext() {
List<KeyLookupResult> ret = new ArrayList<>();
protected List<HoodieKeyLookupResult> computeNext() {
List<HoodieKeyLookupResult> ret = new ArrayList<>();
try {
// process one file in each go.
while (inputItr.hasNext()) {
Expand All @@ -83,7 +83,7 @@ protected List<KeyLookupResult> computeNext() {
}

// if continue on current file
if (keyLookupHandle.getPartitionPathFilePair().equals(partitionPathFilePair)) {
if (keyLookupHandle.getPartitionPathFileIDPair().equals(partitionPathFilePair)) {
keyLookupHandle.addKey(recordKey);
} else {
// do the actual checking of file & break out
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,13 @@

package org.apache.hudi.index.bloom;

import org.apache.hudi.avro.model.HoodieMetadataColumnStats;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.config.HoodieConfig;
import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.data.HoodiePairData;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordLocation;
Expand All @@ -33,6 +35,7 @@
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieIndexConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieMetadataException;
import org.apache.hudi.exception.MetadataNotFoundException;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.index.HoodieIndexUtils;
Expand All @@ -46,6 +49,7 @@
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static java.util.stream.Collectors.groupingBy;
import static java.util.stream.Collectors.mapping;
Expand Down Expand Up @@ -111,13 +115,19 @@ record -> new ImmutablePair<>(record.getPartitionPath(), record.getRecordKey()))
private HoodiePairData<HoodieKey, HoodieRecordLocation> lookupIndex(
HoodiePairData<String, String> partitionRecordKeyPairs, final HoodieEngineContext context,
final HoodieTable hoodieTable) {
// Obtain records per partition, in the incoming records
// Step 1: Obtain records per partition, in the incoming records
Map<String, Long> recordsPerPartition = partitionRecordKeyPairs.countByKey();
List<String> affectedPartitionPathList = new ArrayList<>(recordsPerPartition.keySet());

// Step 2: Load all involved files as <Partition, filename> pairs
List<Pair<String, BloomIndexFileInfo>> fileInfoList =
loadInvolvedFiles(affectedPartitionPathList, context, hoodieTable);
List<Pair<String, BloomIndexFileInfo>> fileInfoList;
if (config.getBloomIndexPruneByRanges()) {
fileInfoList = (config.getMetadataConfig().isColumnStatsIndexEnabled()
? loadColumnRangesFromMetaIndex(affectedPartitionPathList, context, hoodieTable)
: loadColumnRangesFromFiles(affectedPartitionPathList, context, hoodieTable));
} else {
fileInfoList = getFileInfoForLatestBaseFiles(affectedPartitionPathList, context, hoodieTable);
}
final Map<String, List<BloomIndexFileInfo>> partitionToFileInfo =
fileInfoList.stream().collect(groupingBy(Pair::getLeft, mapping(Pair::getRight, toList())));

Expand All @@ -133,30 +143,84 @@ private HoodiePairData<HoodieKey, HoodieRecordLocation> lookupIndex(
/**
* Load all involved files as <Partition, filename> pair List.
*/
List<Pair<String, BloomIndexFileInfo>> loadInvolvedFiles(
List<Pair<String, BloomIndexFileInfo>> loadColumnRangesFromFiles(
List<String> partitions, final HoodieEngineContext context, final HoodieTable hoodieTable) {
// Obtain the latest data files from all the partitions.
List<Pair<String, String>> partitionPathFileIDList = getLatestBaseFilesForAllPartitions(partitions, context, hoodieTable).stream()
.map(pair -> Pair.of(pair.getKey(), pair.getValue().getFileId()))
.collect(toList());

if (config.getBloomIndexPruneByRanges()) {
// also obtain file ranges, if range pruning is enabled
context.setJobStatus(this.getClass().getName(), "Obtain key ranges for file slices (range pruning=on)");
return context.map(partitionPathFileIDList, pf -> {
try {
HoodieRangeInfoHandle rangeInfoHandle = new HoodieRangeInfoHandle(config, hoodieTable, pf);
String[] minMaxKeys = rangeInfoHandle.getMinMaxKeys();
return Pair.of(pf.getKey(), new BloomIndexFileInfo(pf.getValue(), minMaxKeys[0], minMaxKeys[1]));
} catch (MetadataNotFoundException me) {
LOG.warn("Unable to find range metadata in file :" + pf);
return Pair.of(pf.getKey(), new BloomIndexFileInfo(pf.getValue()));
context.setJobStatus(this.getClass().getName(), "Obtain key ranges for file slices (range pruning=on)");
return context.map(partitionPathFileIDList, pf -> {
try {
HoodieRangeInfoHandle rangeInfoHandle = new HoodieRangeInfoHandle(config, hoodieTable, pf);
String[] minMaxKeys = rangeInfoHandle.getMinMaxKeys();
return Pair.of(pf.getKey(), new BloomIndexFileInfo(pf.getValue(), minMaxKeys[0], minMaxKeys[1]));
} catch (MetadataNotFoundException me) {
LOG.warn("Unable to find range metadata in file :" + pf);
return Pair.of(pf.getKey(), new BloomIndexFileInfo(pf.getValue()));
}
}, Math.max(partitionPathFileIDList.size(), 1));
}

/**
* Get BloomIndexFileInfo for all the latest base files for the requested partitions.
*
* @param partitions - List of partitions to get the base files for
* @param context - Engine context
* @param hoodieTable - Hoodie Table
* @return List of partition and file column range info pairs
*/
private List<Pair<String, BloomIndexFileInfo>> getFileInfoForLatestBaseFiles(
List<String> partitions, final HoodieEngineContext context, final HoodieTable hoodieTable) {
List<Pair<String, String>> partitionPathFileIDList = getLatestBaseFilesForAllPartitions(partitions, context,
hoodieTable).stream()
.map(pair -> Pair.of(pair.getKey(), pair.getValue().getFileId()))
.collect(toList());
return partitionPathFileIDList.stream()
.map(pf -> Pair.of(pf.getKey(), new BloomIndexFileInfo(pf.getValue()))).collect(toList());
}

/**
* Load the column stats index as BloomIndexFileInfo for all the involved files in the partition.
*
* @param partitions - List of partitions for which column stats need to be loaded
* @param context - Engine context
* @param hoodieTable - Hoodie table
* @return List of partition and file column range info pairs
*/
protected List<Pair<String, BloomIndexFileInfo>> loadColumnRangesFromMetaIndex(
List<String> partitions, final HoodieEngineContext context, final HoodieTable hoodieTable) {
// also obtain file ranges, if range pruning is enabled
context.setJobStatus(this.getClass().getName(), "Load meta index key ranges for file slices");

final String keyField = hoodieTable.getMetaClient().getTableConfig().getRecordKeyFieldProp();
return context.flatMap(partitions, partitionName -> {
// Partition and file name pairs
List<Pair<String, String>> partitionFileNameList = HoodieIndexUtils.getLatestBaseFilesForPartition(partitionName,
hoodieTable).stream().map(baseFile -> Pair.of(partitionName, baseFile.getFileName()))
.sorted()
.collect(toList());
if (partitionFileNameList.isEmpty()) {
return Stream.empty();
}
try {
Map<Pair<String, String>, HoodieMetadataColumnStats> fileToColumnStatsMap = hoodieTable
.getMetadataTable().getColumnStats(partitionFileNameList, keyField);
List<Pair<String, BloomIndexFileInfo>> result = new ArrayList<>();
for (Map.Entry<Pair<String, String>, HoodieMetadataColumnStats> entry : fileToColumnStatsMap.entrySet()) {
result.add(Pair.of(entry.getKey().getLeft(),
new BloomIndexFileInfo(
FSUtils.getFileId(entry.getKey().getRight()),
entry.getValue().getMinValue(),
entry.getValue().getMaxValue()
)));
}
}, Math.max(partitionPathFileIDList.size(), 1));
} else {
return partitionPathFileIDList.stream()
.map(pf -> Pair.of(pf.getKey(), new BloomIndexFileInfo(pf.getValue()))).collect(toList());
}
return result.stream();
} catch (MetadataNotFoundException me) {
throw new HoodieMetadataException("Unable to find column range metadata for partition:" + partitionName, me);
}
}, Math.max(partitions.size(), 1));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,11 +55,11 @@ public HoodieGlobalBloomIndex(HoodieWriteConfig config, BaseHoodieBloomIndexHelp
* Load all involved files as <Partition, filename> pairs from all partitions in the table.
*/
@Override
List<Pair<String, BloomIndexFileInfo>> loadInvolvedFiles(List<String> partitions, final HoodieEngineContext context,
final HoodieTable hoodieTable) {
List<Pair<String, BloomIndexFileInfo>> loadColumnRangesFromFiles(List<String> partitions, final HoodieEngineContext context,
final HoodieTable hoodieTable) {
HoodieTableMetaClient metaClient = hoodieTable.getMetaClient();
List<String> allPartitionPaths = FSUtils.getAllPartitionPaths(context, config.getMetadataConfig(), metaClient.getBasePath());
return super.loadInvolvedFiles(allPartitionPaths, context, hoodieTable);
return super.loadColumnRangesFromFiles(allPartitionPaths, context, hoodieTable);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
import org.apache.hudi.common.util.collection.ImmutablePair;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.io.HoodieKeyLookupHandle;
import org.apache.hudi.io.HoodieKeyLookupResult;
import org.apache.hudi.table.HoodieTable;

import java.util.ArrayList;
Expand Down Expand Up @@ -63,9 +63,8 @@ public HoodiePairData<HoodieKey, HoodieRecordLocation> findMatchingFilesForRecor
HoodieList.getList(fileComparisonPairs).stream()
.sorted(Comparator.comparing(ImmutablePair::getLeft)).collect(toList());

List<HoodieKeyLookupHandle.KeyLookupResult> keyLookupResults = new ArrayList<>();

Iterator<List<HoodieKeyLookupHandle.KeyLookupResult>> iterator = new HoodieBaseBloomIndexCheckFunction(
List<HoodieKeyLookupResult> keyLookupResults = new ArrayList<>();
Iterator<List<HoodieKeyLookupResult>> iterator = new HoodieBaseBloomIndexCheckFunction(
hoodieTable, config).apply(fileComparisonPairList.iterator());
while (iterator.hasNext()) {
keyLookupResults.addAll(iterator.next());
Expand All @@ -77,7 +76,7 @@ public HoodiePairData<HoodieKey, HoodieRecordLocation> findMatchingFilesForRecor
lookupResult.getMatchingRecordKeys().stream()
.map(recordKey -> new ImmutablePair<>(lookupResult, recordKey)).iterator()
).mapToPair(pair -> {
HoodieKeyLookupHandle.KeyLookupResult lookupResult = pair.getLeft();
HoodieKeyLookupResult lookupResult = pair.getLeft();
String recordKey = pair.getRight();
return new ImmutablePair<>(
new HoodieKey(recordKey, lookupResult.getPartitionPath()),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
package org.apache.hudi.io;

import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.table.HoodieTable;

Expand All @@ -31,8 +33,8 @@ public abstract class HoodieIOHandle<T extends HoodieRecordPayload, I, K, O> {
protected final FileSystem fs;
protected final HoodieTable<T, I, K, O> hoodieTable;

HoodieIOHandle(HoodieWriteConfig config, String instantTime, HoodieTable<T, I, K, O> hoodieTable) {
this.instantTime = instantTime;
HoodieIOHandle(HoodieWriteConfig config, Option<String> instantTime, HoodieTable<T, I, K, O> hoodieTable) {
this.instantTime = instantTime.orElse(StringUtils.EMPTY_STRING);
this.config = config;
this.hoodieTable = hoodieTable;
this.fs = getFileSystem();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ public class HoodieKeyLocationFetchHandle<T extends HoodieRecordPayload, I, K, O

public HoodieKeyLocationFetchHandle(HoodieWriteConfig config, HoodieTable<T, I, K, O> hoodieTable,
Pair<String, HoodieBaseFile> partitionPathBaseFilePair, Option<BaseKeyGenerator> keyGeneratorOpt) {
super(config, null, hoodieTable, Pair.of(partitionPathBaseFilePair.getLeft(), partitionPathBaseFilePair.getRight().getFileId()));
super(config, hoodieTable, Pair.of(partitionPathBaseFilePair.getLeft(), partitionPathBaseFilePair.getRight().getFileId()));
this.partitionPathBaseFilePair = partitionPathBaseFilePair;
this.keyGeneratorOpt = keyGeneratorOpt;
}
Expand Down
Loading