Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieIndexConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.MetadataNotFoundException;
import org.apache.hudi.index.HoodieIndex;
Expand Down Expand Up @@ -59,21 +58,15 @@
*/
public class HoodieBloomIndex<T extends HoodieRecordPayload> extends HoodieIndex<T> {

// we need to limit the join such that it stays within 1.5GB per Spark partition. (SPARK-1476)
private static final int SPARK_MAXIMUM_BYTES_PER_PARTITION = 1500 * 1024 * 1024;
// this is how much a triplet of (partitionPath, fileId, recordKey) costs.
private static final int BYTES_PER_PARTITION_FILE_KEY_TRIPLET = 300;
private static final Logger LOG = LogManager.getLogger(HoodieBloomIndex.class);
private static int MAX_ITEMS_PER_SHUFFLE_PARTITION =
SPARK_MAXIMUM_BYTES_PER_PARTITION / BYTES_PER_PARTITION_FILE_KEY_TRIPLET;

public HoodieBloomIndex(HoodieWriteConfig config) {
super(config);
}

@Override
public JavaRDD<HoodieRecord<T>> tagLocation(JavaRDD<HoodieRecord<T>> recordRDD, JavaSparkContext jsc,
HoodieTable<T> hoodieTable) {
HoodieTable<T> hoodieTable) {

// Step 0: cache the input record RDD
if (config.getBloomIndexUseCaching()) {
Expand Down Expand Up @@ -112,13 +105,13 @@ public JavaRDD<HoodieRecord<T>> tagLocation(JavaRDD<HoodieRecord<T>> recordRDD,
* Returns an RDD mapping each HoodieKey with a partitionPath/fileID which contains it. Option.Empty if the key is not
* found.
*
* @param hoodieKeys keys to lookup
* @param jsc spark context
* @param hoodieKeys keys to lookup
* @param jsc spark context
* @param hoodieTable hoodie table object
*/
@Override
public JavaPairRDD<HoodieKey, Option<Pair<String, String>>> fetchRecordLocation(JavaRDD<HoodieKey> hoodieKeys,
JavaSparkContext jsc, HoodieTable<T> hoodieTable) {
JavaSparkContext jsc, HoodieTable<T> hoodieTable) {
JavaPairRDD<String, String> partitionRecordKeyPairRDD =
hoodieKeys.mapToPair(key -> new Tuple2<>(key.getPartitionPath(), key.getRecordKey()));

Expand Down Expand Up @@ -159,8 +152,10 @@ private JavaPairRDD<HoodieKey, HoodieRecordLocation> lookupIndex(
// that contains it.
Map<String, Long> comparisonsPerFileGroup =
computeComparisonsPerFileGroup(recordsPerPartition, partitionToFileInfo, partitionRecordKeyPairRDD);
int safeParallelism = computeSafeParallelism(recordsPerPartition, comparisonsPerFileGroup);
int joinParallelism = determineParallelism(partitionRecordKeyPairRDD.partitions().size(), safeParallelism);
int inputParallelism = partitionRecordKeyPairRDD.partitions().size();
int joinParallelism = Math.max(inputParallelism, config.getBloomIndexParallelism());
LOG.info("InputParallelism: ${" + inputParallelism + "}, IndexParallelism: ${"
+ config.getBloomIndexParallelism() + "}");
return findMatchingFilesForRecordKeys(partitionToFileInfo, partitionRecordKeyPairRDD, joinParallelism, hoodieTable,
comparisonsPerFileGroup);
}
Expand All @@ -169,8 +164,8 @@ private JavaPairRDD<HoodieKey, HoodieRecordLocation> lookupIndex(
* Compute the estimated number of bloom filter comparisons to be performed on each file group.
*/
private Map<String, Long> computeComparisonsPerFileGroup(final Map<String, Long> recordsPerPartition,
final Map<String, List<BloomIndexFileInfo>> partitionToFileInfo,
JavaPairRDD<String, String> partitionRecordKeyPairRDD) {
final Map<String, List<BloomIndexFileInfo>> partitionToFileInfo,
JavaPairRDD<String, String> partitionRecordKeyPairRDD) {

Map<String, Long> fileToComparisons;
if (config.getBloomIndexPruneByRanges()) {
Expand All @@ -190,53 +185,12 @@ private Map<String, Long> computeComparisonsPerFileGroup(final Map<String, Long>
return fileToComparisons;
}

/**
* Compute the minimum parallelism needed to play well with the spark 2GB limitation.. The index lookup can be skewed
* in three dimensions : #files, #partitions, #records
* <p>
* To be able to smoothly handle skews, we need to compute how to split each partitions into subpartitions. We do it
* here, in a way that keeps the amount of each Spark join partition to < 2GB.
* <p>
* If {@link HoodieIndexConfig#BLOOM_INDEX_PARALLELISM_PROP} is specified as a NON-zero number, then that is used
* explicitly.
*/
int computeSafeParallelism(Map<String, Long> recordsPerPartition, Map<String, Long> comparisonsPerFileGroup) {
long totalComparisons = comparisonsPerFileGroup.values().stream().mapToLong(Long::longValue).sum();
long totalFiles = comparisonsPerFileGroup.size();
long totalRecords = recordsPerPartition.values().stream().mapToLong(Long::longValue).sum();
int parallelism = (int) (totalComparisons / MAX_ITEMS_PER_SHUFFLE_PARTITION + 1);
LOG.info(String.format(
"TotalRecords %d, TotalFiles %d, TotalAffectedPartitions %d, TotalComparisons %d, SafeParallelism %d",
totalRecords, totalFiles, recordsPerPartition.size(), totalComparisons, parallelism));
return parallelism;
}

/**
* Its crucial to pick the right parallelism.
* <p>
* totalSubPartitions : this is deemed safe limit, to be nice with Spark. inputParallelism : typically number of input
* file splits
* <p>
* We pick the max such that, we are always safe, but go higher if say a there are a lot of input files. (otherwise,
* we will fallback to number of partitions in input and end up with slow performance)
*/
private int determineParallelism(int inputParallelism, int totalSubPartitions) {
// If bloom index parallelism is set, use it to to check against the input parallelism and
// take the max
int indexParallelism = Math.max(inputParallelism, config.getBloomIndexParallelism());
int joinParallelism = Math.max(totalSubPartitions, indexParallelism);
LOG.info("InputParallelism: ${" + inputParallelism + "}, IndexParallelism: ${"
+ config.getBloomIndexParallelism() + "}, TotalSubParts: ${" + totalSubPartitions + "}, "
+ "Join Parallelism set to : " + joinParallelism);
return joinParallelism;
}

/**
* Load all involved files as <Partition, filename> pair RDD.
*/
@VisibleForTesting
List<Tuple2<String, BloomIndexFileInfo>> loadInvolvedFiles(List<String> partitions, final JavaSparkContext jsc,
final HoodieTable hoodieTable) {
final HoodieTable hoodieTable) {

// Obtain the latest data files from all the partitions.
List<Pair<String, String>> partitionPathFileIDList =
Expand Down Expand Up @@ -304,7 +258,7 @@ public boolean isImplicitWithStorage() {
* For each incoming record, produce N output records, 1 each for each file against which the record's key needs to be
* checked. For tables, where the keys have a definite insert order (e.g: timestamp as prefix), the number of files
* to be compared gets cut down a lot from range pruning.
*
* <p>
* Sub-partition to ensure the records can be looked up against files & also prune file<=>record comparisons based on
* recordKey ranges in the index info.
*/
Expand Down Expand Up @@ -392,7 +346,7 @@ protected JavaRDD<HoodieRecord<T>> tagLocationBacktoRecords(

@Override
public JavaRDD<WriteStatus> updateLocation(JavaRDD<WriteStatus> writeStatusRDD, JavaSparkContext jsc,
HoodieTable<T> hoodieTable) {
HoodieTable<T> hoodieTable) {
return writeStatusRDD;
}
}