Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,9 @@
package org.apache.hudi.io;

import org.apache.hudi.common.bloom.BloomFilter;
import org.apache.hudi.common.bloom.BloomFilterTypeCode;
import org.apache.hudi.common.bloom.HoodieDynamicBoundedBloomFilter;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.util.HoodieTimer;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieIndexException;
Expand All @@ -39,8 +34,6 @@
import org.apache.log4j.Logger;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;

Expand All @@ -53,56 +46,24 @@ public class HoodieKeyLookupHandle<T extends HoodieRecordPayload, I, K, O> exten

private final BloomFilter bloomFilter;
private final List<String> candidateRecordKeys;
private final boolean useMetadataTableIndex;
private Option<String> fileName = Option.empty();
private long totalKeysChecked;

public HoodieKeyLookupHandle(HoodieWriteConfig config, HoodieTable<T, I, K, O> hoodieTable,
Pair<String, String> partitionPathFileIDPair) {
this(config, hoodieTable, partitionPathFileIDPair, Option.empty(), false);
}

public HoodieKeyLookupHandle(HoodieWriteConfig config, HoodieTable<T, I, K, O> hoodieTable,
Pair<String, String> partitionPathFileIDPair, Option<String> fileName,
boolean useMetadataTableIndex) {
super(config, hoodieTable, partitionPathFileIDPair);
this.candidateRecordKeys = new ArrayList<>();
this.totalKeysChecked = 0;
if (fileName.isPresent()) {
ValidationUtils.checkArgument(FSUtils.getFileId(fileName.get()).equals(getFileId()),
"File name '" + fileName.get() + "' doesn't match this lookup handle fileid '" + getFileId() + "'");
this.fileName = fileName;
}
this.useMetadataTableIndex = useMetadataTableIndex;
this.bloomFilter = getBloomFilter();
}

private BloomFilter getBloomFilter() {
BloomFilter bloomFilter = null;
HoodieTimer timer = new HoodieTimer().startTimer();
try {
if (this.useMetadataTableIndex) {
ValidationUtils.checkArgument(this.fileName.isPresent(),
"File name not available to fetch bloom filter from the metadata table index.");
Option<ByteBuffer> bloomFilterByteBuffer =
hoodieTable.getMetadataTable().getBloomFilter(partitionPathFileIDPair.getLeft(), fileName.get());
if (!bloomFilterByteBuffer.isPresent()) {
throw new HoodieIndexException("BloomFilter missing for " + partitionPathFileIDPair.getRight());
}
bloomFilter =
new HoodieDynamicBoundedBloomFilter(StandardCharsets.UTF_8.decode(bloomFilterByteBuffer.get()).toString(),
BloomFilterTypeCode.DYNAMIC_V0);
} else {
try (HoodieFileReader reader = createNewFileReader()) {
bloomFilter = reader.readBloomFilter();
}
}
try (HoodieFileReader reader = createNewFileReader()) {
LOG.debug(String.format("Read bloom filter from %s in %d ms", partitionPathFileIDPair, timer.endTimer()));
return reader.readBloomFilter();
} catch (IOException e) {
throw new HoodieIndexException(String.format("Error reading bloom filter from %s/%s - %s",
getPartitionPathFileIDPair().getLeft(), this.fileName, e));
throw new HoodieIndexException(String.format("Error reading bloom filter from %s", getPartitionPathFileIDPair(), e));
}
LOG.info(String.format("Read bloom filter from %s in %d ms", partitionPathFileIDPair, timer.endTimer()));
return bloomFilter;
}

/**
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ protected <T extends SpecificRecordBase> void initialize(HoodieEngineContext eng
Option<String> inflightInstantTimestamp) {
try {
if (enabled) {
bootstrapIfNeeded(engineContext, dataMetaClient, actionMetadata, inflightInstantTimestamp);
initializeIfNeeded(engineContext, dataMetaClient, actionMetadata, inflightInstantTimestamp);
}
} catch (IOException e) {
LOG.error("Failed to initialize metadata table. Disabling the writer.", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,7 @@

import org.apache.hadoop.fs.Path;
import org.apache.hudi.client.utils.LazyIterableIterator;
import org.apache.hudi.common.bloom.BloomFilterTypeCode;
import org.apache.hudi.common.bloom.HoodieDynamicBoundedBloomFilter;
import org.apache.hudi.common.bloom.BloomFilter;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieKey;
Expand All @@ -37,8 +36,6 @@
import org.apache.spark.api.java.function.Function2;
import scala.Tuple2;

import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
Expand Down Expand Up @@ -113,7 +110,7 @@ protected List<HoodieKeyLookupResult> computeNext() {
}

List<Pair<String, String>> partitionNameFileNameList = new ArrayList<>(fileToKeysMap.keySet());
Map<Pair<String, String>, ByteBuffer> fileToBloomFilterMap =
Map<Pair<String, String>, BloomFilter> fileToBloomFilterMap =
hoodieTable.getMetadataTable().getBloomFilters(partitionNameFileNameList);

final AtomicInteger totalKeys = new AtomicInteger(0);
Expand All @@ -126,11 +123,7 @@ protected List<HoodieKeyLookupResult> computeNext() {
if (!fileToBloomFilterMap.containsKey(partitionPathFileNamePair)) {
throw new HoodieIndexException("Failed to get the bloom filter for " + partitionPathFileNamePair);
}
final ByteBuffer fileBloomFilterByteBuffer = fileToBloomFilterMap.get(partitionPathFileNamePair);

HoodieDynamicBoundedBloomFilter fileBloomFilter =
new HoodieDynamicBoundedBloomFilter(StandardCharsets.UTF_8.decode(fileBloomFilterByteBuffer).toString(),
BloomFilterTypeCode.DYNAMIC_V0);
final BloomFilter fileBloomFilter = fileToBloomFilterMap.get(partitionPathFileNamePair);

List<String> candidateRecordKeys = new ArrayList<>();
hoodieKeyList.forEach(hoodieKey -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ protected <T extends SpecificRecordBase> void initialize(HoodieEngineContext eng
});

if (enabled) {
bootstrapIfNeeded(engineContext, dataMetaClient, actionMetadata, inflightInstantTimestamp);
initializeIfNeeded(engineContext, dataMetaClient, actionMetadata, inflightInstantTimestamp);
}
} catch (IOException e) {
LOG.error("Failed to initialize metadata table. Disabling the writer.", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@

import org.apache.hudi.avro.model.HoodieMetadataColumnStats;
import org.apache.hudi.avro.model.HoodieMetadataBloomFilter;
import org.apache.hudi.common.bloom.BloomFilter;
import org.apache.hudi.common.bloom.BloomFilterFactory;
import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.config.SerializableConfiguration;
import org.apache.hudi.common.engine.HoodieEngineContext;
Expand All @@ -46,6 +48,7 @@

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
Expand Down Expand Up @@ -157,15 +160,15 @@ public Map<String, FileStatus[]> getAllFilesInPartitions(List<String> partitions
}

@Override
public Option<ByteBuffer> getBloomFilter(final String partitionName, final String fileName)
public Option<BloomFilter> getBloomFilter(final String partitionName, final String fileName)
throws HoodieMetadataException {
if (!isBloomFilterIndexEnabled) {
LOG.error("Metadata bloom filter index is disabled!");
return Option.empty();
}

final Pair<String, String> partitionFileName = Pair.of(partitionName, fileName);
Map<Pair<String, String>, ByteBuffer> bloomFilters = getBloomFilters(Collections.singletonList(partitionFileName));
Map<Pair<String, String>, BloomFilter> bloomFilters = getBloomFilters(Collections.singletonList(partitionFileName));
if (bloomFilters.isEmpty()) {
LOG.error("Meta index: missing bloom filter for partition: " + partitionName + ", file: " + fileName);
return Option.empty();
Expand All @@ -176,7 +179,7 @@ public Option<ByteBuffer> getBloomFilter(final String partitionName, final Strin
}

@Override
public Map<Pair<String, String>, ByteBuffer> getBloomFilters(final List<Pair<String, String>> partitionNameFileNameList)
public Map<Pair<String, String>, BloomFilter> getBloomFilters(final List<Pair<String, String>> partitionNameFileNameList)
throws HoodieMetadataException {
if (!isBloomFilterIndexEnabled) {
LOG.error("Metadata bloom filter index is disabled!");
Expand All @@ -203,15 +206,19 @@ public Map<Pair<String, String>, ByteBuffer> getBloomFilters(final List<Pair<Str
metrics.ifPresent(m -> m.updateMetrics(HoodieMetadataMetrics.LOOKUP_BLOOM_FILTERS_METADATA_STR,
(timer.endTimer() / partitionIDFileIDStrings.size())));

Map<Pair<String, String>, ByteBuffer> partitionFileToBloomFilterMap = new HashMap<>();
Map<Pair<String, String>, BloomFilter> partitionFileToBloomFilterMap = new HashMap<>();
for (final Pair<String, Option<HoodieRecord<HoodieMetadataPayload>>> entry : hoodieRecordList) {
if (entry.getRight().isPresent()) {
final Option<HoodieMetadataBloomFilter> bloomFilterMetadata =
entry.getRight().get().getData().getBloomFilterMetadata();
if (bloomFilterMetadata.isPresent()) {
if (!bloomFilterMetadata.get().getIsDeleted()) {
ValidationUtils.checkState(fileToKeyMap.containsKey(entry.getLeft()));
partitionFileToBloomFilterMap.put(fileToKeyMap.get(entry.getLeft()), bloomFilterMetadata.get().getBloomFilter());
final ByteBuffer bloomFilterByteBuffer = bloomFilterMetadata.get().getBloomFilter();
final String bloomFilterType = bloomFilterMetadata.get().getType();
final BloomFilter bloomFilter = BloomFilterFactory.fromString(
StandardCharsets.UTF_8.decode(bloomFilterByteBuffer).toString(), bloomFilterType);
partitionFileToBloomFilterMap.put(fileToKeyMap.get(entry.getLeft()), bloomFilter);
}
} else {
LOG.error("Meta index bloom filter missing for: " + fileToKeyMap.get(entry.getLeft()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.hudi.metadata;

import org.apache.hudi.avro.model.HoodieMetadataColumnStats;
import org.apache.hudi.common.bloom.BloomFilter;
import org.apache.hudi.common.config.SerializableConfiguration;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.fs.FSUtils;
Expand All @@ -33,7 +34,6 @@
import org.apache.hudi.exception.HoodieMetadataException;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
Expand Down Expand Up @@ -143,13 +143,13 @@ public void reset() {
// no-op
}

public Option<ByteBuffer> getBloomFilter(final String partitionName, final String fileName)
public Option<BloomFilter> getBloomFilter(final String partitionName, final String fileName)
throws HoodieMetadataException {
throw new HoodieMetadataException("Unsupported operation: getBloomFilter for " + fileName);
}

@Override
public Map<Pair<String, String>, ByteBuffer> getBloomFilters(final List<Pair<String, String>> partitionNameFileNameList)
public Map<Pair<String, String>, BloomFilter> getBloomFilters(final List<Pair<String, String>> partitionNameFileNameList)
throws HoodieMetadataException {
throw new HoodieMetadataException("Unsupported operation: getBloomFilters!");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import org.apache.hudi.avro.model.HoodieMetadataBloomFilter;
import org.apache.hudi.avro.model.HoodieMetadataFileInfo;
import org.apache.hudi.avro.model.HoodieMetadataRecord;
import org.apache.hudi.common.bloom.BloomFilterTypeCode;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieColumnRangeMetadata;
import org.apache.hudi.common.model.HoodieKey;
Expand Down Expand Up @@ -241,6 +240,7 @@ public static HoodieRecord<HoodieMetadataPayload> createPartitionFilesRecord(Str
public static HoodieRecord<HoodieMetadataPayload> createBloomFilterMetadataRecord(final String partitionName,
final String baseFileName,
final String timestamp,
final String bloomFilterType,
final ByteBuffer bloomFilter,
final boolean isDeleted) {
ValidationUtils.checkArgument(!baseFileName.contains(Path.SEPARATOR)
Expand All @@ -250,10 +250,8 @@ public static HoodieRecord<HoodieMetadataPayload> createBloomFilterMetadataRecor
.concat(new FileIndexID(baseFileName).asBase64EncodedString());
HoodieKey key = new HoodieKey(bloomFilterIndexKey, MetadataPartitionType.BLOOM_FILTERS.getPartitionPath());

// TODO: HUDI-3203 Get the bloom filter type from the file
HoodieMetadataBloomFilter metadataBloomFilter =
new HoodieMetadataBloomFilter(BloomFilterTypeCode.DYNAMIC_V0.name(),
timestamp, bloomFilter, isDeleted);
new HoodieMetadataBloomFilter(bloomFilterType, timestamp, bloomFilter, isDeleted);
HoodieMetadataPayload metadataPayload = new HoodieMetadataPayload(key.getRecordKey(),
HoodieMetadataPayload.METADATA_TYPE_BLOOM_FILTER, metadataBloomFilter);
return new HoodieRecord<>(key, metadataPayload);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.hudi.metadata;

import org.apache.hudi.avro.model.HoodieMetadataColumnStats;
import org.apache.hudi.common.bloom.BloomFilter;
import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.config.SerializableConfiguration;
import org.apache.hudi.common.engine.HoodieEngineContext;
Expand All @@ -31,7 +32,6 @@

import java.io.IOException;
import java.io.Serializable;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.Map;

Expand Down Expand Up @@ -113,20 +113,20 @@ static HoodieTableMetadata create(HoodieEngineContext engineContext, HoodieMetad
*
* @param partitionName - Partition name
* @param fileName - File name for which bloom filter needs to be retrieved
* @return BloomFilter byte buffer if available, otherwise empty
* @return BloomFilter if available, otherwise empty
* @throws HoodieMetadataException
*/
Option<ByteBuffer> getBloomFilter(final String partitionName, final String fileName)
Option<BloomFilter> getBloomFilter(final String partitionName, final String fileName)
throws HoodieMetadataException;

/**
* Get bloom filters for files from the metadata table index.
*
* @param partitionNameFileNameList - List of partition and file name pair for which bloom filters need to be retrieved
* @return Map of partition file name pair to its bloom filter byte buffer
* @return Map of partition file name pair to its bloom filter
* @throws HoodieMetadataException
*/
Map<Pair<String, String>, ByteBuffer> getBloomFilters(final List<Pair<String, String>> partitionNameFileNameList)
Map<Pair<String, String>, BloomFilter> getBloomFilters(final List<Pair<String, String>> partitionNameFileNameList)
throws HoodieMetadataException;

/**
Expand Down
Loading