Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,9 @@
package org.apache.hudi.index.bucket;

import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.utils.LazyIterableIterator;
import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordLocation;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieIndexException;
import org.apache.hudi.index.HoodieIndex;
Expand All @@ -37,8 +33,6 @@
import java.util.Arrays;
import java.util.List;

import static org.apache.hudi.index.HoodieIndexUtils.tagAsNewRecordIfNeeded;

/**
* Hash indexing mechanism.
*/
Expand All @@ -65,30 +59,6 @@ public HoodieData<WriteStatus> updateLocation(HoodieData<WriteStatus> writeStatu
return writeStatuses;
}

@Override
public <R> HoodieData<HoodieRecord<R>> tagLocation(
HoodieData<HoodieRecord<R>> records, HoodieEngineContext context,
HoodieTable hoodieTable)
throws HoodieIndexException {
// Get bucket location mapper for the given partitions
List<String> partitions = records.map(HoodieRecord::getPartitionPath).distinct().collectAsList();
LOG.info("Get BucketIndexLocationMapper for partitions: " + partitions);
BucketIndexLocationMapper mapper = getLocationMapper(hoodieTable, partitions);

return records.mapPartitions(iterator ->
new LazyIterableIterator<HoodieRecord<R>, HoodieRecord<R>>(iterator) {
@Override
protected HoodieRecord<R> computeNext() {
// TODO maybe batch the operation to improve performance
HoodieRecord record = inputItr.next();
Option<HoodieRecordLocation> loc = mapper.getRecordLocation(record.getKey());
return tagAsNewRecordIfNeeded(record, loc);
}
},
false
);
}

@Override
public boolean requiresTagging(WriteOperationType operationType) {
switch (operationType) {
Expand Down Expand Up @@ -127,9 +97,4 @@ public boolean isImplicitWithStorage() {
public int getNumBuckets() {
return numBuckets;
}

/**
* Get a location mapper for the given table & partitionPath
*/
protected abstract BucketIndexLocationMapper getLocationMapper(HoodieTable table, List<String> partitionPath);
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,14 @@
package org.apache.hudi.index.bucket;

import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.utils.LazyIterableIterator;
import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.ConsistentHashingNode;
import org.apache.hudi.common.model.HoodieConsistentHashingMetadata;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordLocation;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
Expand All @@ -35,10 +37,13 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.Serializable;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

import static org.apache.hudi.index.HoodieIndexUtils.tagAsNewRecordIfNeeded;

/**
* Consistent hashing bucket index implementation, with auto-adjust bucket number.
* NOTE: bucket resizing is triggered by clustering.
Expand Down Expand Up @@ -71,11 +76,28 @@ public boolean rollbackCommit(String instantTime) {
}

@Override
protected BucketIndexLocationMapper getLocationMapper(HoodieTable table, List<String> partitionPath) {
return new ConsistentBucketIndexLocationMapper(table, partitionPath);
public <R> HoodieData<HoodieRecord<R>> tagLocation(
HoodieData<HoodieRecord<R>> records, HoodieEngineContext context,
HoodieTable hoodieTable)
throws HoodieIndexException {
// Get bucket location mapper for the given partitions
List<String> partitions = records.map(HoodieRecord::getPartitionPath).distinct().collectAsList();
LOG.info("Get BucketIndexLocationMapper for partitions: " + partitions);
ConsistentBucketIndexLocationMapper mapper = new ConsistentBucketIndexLocationMapper(hoodieTable, partitions);

return records.mapPartitions(iterator ->
new LazyIterableIterator<HoodieRecord<R>, HoodieRecord<R>>(iterator) {
@Override
protected HoodieRecord<R> computeNext() {
// TODO maybe batch the operation to improve performance
HoodieRecord record = inputItr.next();
Option<HoodieRecordLocation> loc = mapper.getRecordLocation(record.getKey());
return tagAsNewRecordIfNeeded(record, loc);
}
}, false);
}

public class ConsistentBucketIndexLocationMapper implements BucketIndexLocationMapper {
public class ConsistentBucketIndexLocationMapper implements Serializable {

/**
* Mapping from partitionPath -> bucket identifier
Expand All @@ -90,7 +112,6 @@ public ConsistentBucketIndexLocationMapper(HoodieTable table, List<String> parti
}));
}

@Override
public Option<HoodieRecordLocation> getRecordLocation(HoodieKey key) {
String partitionPath = key.getPartitionPath();
ConsistentHashingNode node = partitionToIdentifier.get(partitionPath).getBucket(key, indexKeyFields);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,29 +18,29 @@

package org.apache.hudi.index.bucket;

import org.apache.hudi.client.utils.LazyIterableIterator;
import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordLocation;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.exception.HoodieIndexException;
import org.apache.hudi.index.HoodieIndexUtils;
import org.apache.hudi.table.HoodieTable;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

import static org.apache.hudi.index.HoodieIndexUtils.tagAsNewRecordIfNeeded;

/**
* Simple bucket index implementation, with fixed bucket number.
*/
public class HoodieSimpleBucketIndex extends HoodieBucketIndex {

private static final Logger LOG = LoggerFactory.getLogger(HoodieSimpleBucketIndex.class);

public HoodieSimpleBucketIndex(HoodieWriteConfig config) {
super(config);
}
Expand Down Expand Up @@ -79,27 +79,23 @@ public boolean canIndexLogFiles() {
}

@Override
protected BucketIndexLocationMapper getLocationMapper(HoodieTable table, List<String> partitionPath) {
return new SimpleBucketIndexLocationMapper(table, partitionPath);
}

public class SimpleBucketIndexLocationMapper implements BucketIndexLocationMapper {

/**
* Mapping from partitionPath -> bucketId -> fileInfo
*/
private final Map<String, Map<Integer, HoodieRecordLocation>> partitionPathFileIDList;

public SimpleBucketIndexLocationMapper(HoodieTable table, List<String> partitions) {
partitionPathFileIDList = partitions.stream()
.collect(Collectors.toMap(p -> p, p -> loadBucketIdToFileIdMappingForPartition(table, p)));
}

@Override
public Option<HoodieRecordLocation> getRecordLocation(HoodieKey key) {
int bucketId = getBucketID(key);
Map<Integer, HoodieRecordLocation> bucketIdToFileIdMapping = partitionPathFileIDList.get(key.getPartitionPath());
return Option.ofNullable(bucketIdToFileIdMapping.getOrDefault(bucketId, null));
}
public <R> HoodieData<HoodieRecord<R>> tagLocation(
HoodieData<HoodieRecord<R>> records, HoodieEngineContext context,
HoodieTable hoodieTable)
throws HoodieIndexException {
Map<String, Map<Integer, HoodieRecordLocation>> partitionPathFileIDList = new HashMap<>();
return records.mapPartitions(iterator -> new LazyIterableIterator<HoodieRecord<R>, HoodieRecord<R>>(iterator) {
@Override
protected HoodieRecord<R> computeNext() {
HoodieRecord record = inputItr.next();
int bucketId = getBucketID(record.getKey());
String partitionPath = record.getPartitionPath();
if (!partitionPathFileIDList.containsKey(partitionPath)) {
partitionPathFileIDList.put(partitionPath, loadBucketIdToFileIdMappingForPartition(hoodieTable, partitionPath));
}
HoodieRecordLocation loc = partitionPathFileIDList.get(partitionPath).getOrDefault(bucketId, null);
return tagAsNewRecordIfNeeded(record, Option.ofNullable(loc));
}
}, false);
}
}