Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,14 @@
public class WorkloadProfile implements Serializable {

/**
* Computed workload profile.
* Computed workload stats.
*/
protected final HashMap<String, WorkloadStat> partitionPathStatMap;
protected final HashMap<String, WorkloadStat> inputPartitionPathStatMap;

/**
* Execution/Output workload stats
*/
protected final HashMap<String, WorkloadStat> outputPartitionPathStatMap;

/**
* Global workloadStat.
Expand All @@ -47,13 +52,21 @@ public class WorkloadProfile implements Serializable {
*/
private WriteOperationType operationType;

private final boolean hasOutputWorkLoadStats;

public WorkloadProfile(Pair<HashMap<String, WorkloadStat>, WorkloadStat> profile) {
this.partitionPathStatMap = profile.getLeft();
this(profile, false);
}

public WorkloadProfile(Pair<HashMap<String, WorkloadStat>, WorkloadStat> profile, boolean hasOutputWorkLoadStats) {
this.inputPartitionPathStatMap = profile.getLeft();
this.globalStat = profile.getRight();
this.outputPartitionPathStatMap = new HashMap<>();
this.hasOutputWorkLoadStats = hasOutputWorkLoadStats;
}

public WorkloadProfile(Pair<HashMap<String, WorkloadStat>, WorkloadStat> profile, WriteOperationType operationType) {
this(profile);
public WorkloadProfile(Pair<HashMap<String, WorkloadStat>, WorkloadStat> profile, WriteOperationType operationType, boolean hasOutputWorkLoadStats) {
this(profile, hasOutputWorkLoadStats);
this.operationType = operationType;
}

Expand All @@ -62,15 +75,37 @@ public WorkloadStat getGlobalStat() {
}

public Set<String> getPartitionPaths() {
return partitionPathStatMap.keySet();
return inputPartitionPathStatMap.keySet();
}

public Set<String> getOutputPartitionPaths() {
return hasOutputWorkLoadStats ? outputPartitionPathStatMap.keySet() : inputPartitionPathStatMap.keySet();
}

public HashMap<String, WorkloadStat> getPartitionPathStatMap() {
return partitionPathStatMap;
public HashMap<String, WorkloadStat> getInputPartitionPathStatMap() {
return inputPartitionPathStatMap;
}

public HashMap<String, WorkloadStat> getOutputPartitionPathStatMap() {
return outputPartitionPathStatMap;
}

public boolean hasOutputWorkLoadStats() {
return hasOutputWorkLoadStats;
}

public void updateOutputPartitionPathStatMap(String partitionPath, WorkloadStat workloadStat) {
if (hasOutputWorkLoadStats) {
outputPartitionPathStatMap.put(partitionPath, workloadStat);
}
}

public WorkloadStat getWorkloadStat(String partitionPath) {
return partitionPathStatMap.get(partitionPath);
return inputPartitionPathStatMap.get(partitionPath);
}

public WorkloadStat getOutputWorkloadStat(String partitionPath) {
return hasOutputWorkLoadStats ? outputPartitionPathStatMap.get(partitionPath) : inputPartitionPathStatMap.get(partitionPath);
}

public WriteOperationType getOperationType() {
Expand All @@ -81,7 +116,8 @@ public WriteOperationType getOperationType() {
public String toString() {
final StringBuilder sb = new StringBuilder("WorkloadProfile {");
sb.append("globalStat=").append(globalStat).append(", ");
sb.append("partitionStat=").append(partitionPathStatMap).append(", ");
sb.append("InputPartitionStat=").append(inputPartitionPathStatMap).append(", ");
sb.append("OutputPartitionStat=").append(outputPartitionPathStatMap).append(", ");
sb.append("operationType=").append(operationType);
sb.append('}');
return sb.toString();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,16 +33,30 @@ public class WorkloadStat implements Serializable {

private long numUpdates = 0L;

private HashMap<String, Pair<String, Long>> insertLocationToCount;

private HashMap<String, Pair<String, Long>> updateLocationToCount;

public WorkloadStat() {
insertLocationToCount = new HashMap<>();
updateLocationToCount = new HashMap<>();
}

public long addInserts(long numInserts) {
return this.numInserts += numInserts;
}

public long addInserts(HoodieRecordLocation location, long numInserts) {
long accNumInserts = 0;
if (insertLocationToCount.containsKey(location.getFileId())) {
accNumInserts = insertLocationToCount.get(location.getFileId()).getRight();
}
insertLocationToCount.put(
location.getFileId(),
Pair.of(location.getInstantTime(), numInserts + accNumInserts));
return this.numInserts += numInserts;
}

public long addUpdates(HoodieRecordLocation location, long numUpdates) {
long accNumUpdates = 0;
if (updateLocationToCount.containsKey(location.getFileId())) {
Expand All @@ -66,6 +80,10 @@ public HashMap<String, Pair<String, Long>> getUpdateLocationToCount() {
return updateLocationToCount;
}

public HashMap<String, Pair<String, Long>> getInsertLocationToCount() {
return insertLocationToCount;
}

@Override
public String toString() {
final StringBuilder sb = new StringBuilder("WorkloadStat {");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;

public abstract class BaseCommitActionExecutor<T extends HoodieRecordPayload, I, K, O, R>
extends BaseActionExecutor<T, I, K, O, R> {
Expand Down Expand Up @@ -108,22 +109,32 @@ void saveWorkloadProfileMetadataToInflight(WorkloadProfile profile, String insta
throws HoodieCommitException {
try {
HoodieCommitMetadata metadata = new HoodieCommitMetadata();
profile.getPartitionPaths().forEach(path -> {
WorkloadStat partitionStat = profile.getWorkloadStat(path);
profile.getOutputPartitionPaths().forEach(path -> {
WorkloadStat partitionStat = profile.getOutputWorkloadStat(path);
HoodieWriteStat insertStat = new HoodieWriteStat();
insertStat.setNumInserts(partitionStat.getNumInserts());
insertStat.setFileId("");
insertStat.setPrevCommit(HoodieWriteStat.NULL_COMMIT);
metadata.addWriteStat(path, insertStat);

partitionStat.getUpdateLocationToCount().forEach((key, value) -> {
HoodieWriteStat writeStat = new HoodieWriteStat();
writeStat.setFileId(key);
// TODO : Write baseCommitTime is possible here ?
writeStat.setPrevCommit(value.getKey());
writeStat.setNumUpdateWrites(value.getValue());
metadata.addWriteStat(path, writeStat);
});
Map<String, Pair<String, Long>> updateLocationMap = partitionStat.getUpdateLocationToCount();
Map<String, Pair<String, Long>> insertLocationMap = partitionStat.getInsertLocationToCount();
Stream.concat(updateLocationMap.keySet().stream(), insertLocationMap.keySet().stream())
.distinct()
.forEach(fileId -> {
HoodieWriteStat writeStat = new HoodieWriteStat();
writeStat.setFileId(fileId);
Pair<String, Long> updateLocation = updateLocationMap.get(fileId);
Pair<String, Long> insertLocation = insertLocationMap.get(fileId);
// TODO : Write baseCommitTime is possible here ?
writeStat.setPrevCommit(updateLocation != null ? updateLocation.getKey() : insertLocation.getKey());
if (updateLocation != null) {
writeStat.setNumUpdateWrites(updateLocation.getValue());
}
if (insertLocation != null) {
writeStat.setNumInserts(insertLocation.getValue());
}
metadata.addWriteStat(path, writeStat);
});
});
metadata.setOperationType(operationType);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -182,14 +182,30 @@ public List<WriteStatus> compact(HoodieCompactionHandler compactionHandler,
.withOperationField(config.allowOperationMetadataField())
.withPartition(operation.getPartitionPath())
.build();
if (!scanner.iterator().hasNext()) {
scanner.close();
return new ArrayList<>();
}

Option<HoodieBaseFile> oldDataFileOpt =
operation.getBaseFile(metaClient.getBasePath(), operation.getPartitionPath());

// Considering following scenario: if all log blocks in this fileSlice is rollback, it returns an empty scanner.
// But in this case, we need to give it a base file. Otherwise, it will lose base file in following fileSlice.
if (!scanner.iterator().hasNext()) {
if (!oldDataFileOpt.isPresent()) {
scanner.close();
return new ArrayList<>();
} else {
// TODO: we may directly rename original parquet file if there is not evolution/devolution of schema
/*
TaskContextSupplier taskContextSupplier = hoodieCopyOnWriteTable.getTaskContextSupplier();
String newFileName = FSUtils.makeDataFileName(instantTime,
FSUtils.makeWriteToken(taskContextSupplier.getPartitionIdSupplier().get(), taskContextSupplier.getStageIdSupplier().get(), taskContextSupplier.getAttemptIdSupplier().get()),
operation.getFileId(), hoodieCopyOnWriteTable.getBaseFileExtension());
Path oldFilePath = new Path(oldDataFileOpt.get().getPath());
Path newFilePath = new Path(oldFilePath.getParent(), newFileName);
FileUtil.copy(fs,oldFilePath, fs, newFilePath, false, fs.getConf());
*/
}
}

// Compacting is very similar to applying updates to existing file
Iterator<List<WriteStatus>> result;
// If the dataFile is present, perform updates else perform inserts into a new base file.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ public static List<ListingBasedRollbackRequest> generateRollbackRequestsUsingFil
partitionRollbackRequests.add(
ListingBasedRollbackRequest.createRollbackRequestWithDeleteDataAndLogFilesAction(partitionPath));

// append rollback blocks for updates
// append rollback blocks for updates and inserts as A.2 and B.2
if (commitMetadata.getPartitionToWriteStats().containsKey(partitionPath)) {
partitionRollbackRequests
.addAll(generateAppendRollbackBlocksAction(partitionPath, instantToRollback, commitMetadata, table));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,27 +91,27 @@ public BaseJavaCommitActionExecutor(HoodieEngineContext context,
public HoodieWriteMetadata<List<WriteStatus>> execute(List<HoodieRecord<T>> inputRecords) {
HoodieWriteMetadata<List<WriteStatus>> result = new HoodieWriteMetadata<>();

WorkloadProfile profile = null;
WorkloadProfile workloadProfile = null;
if (isWorkloadProfileNeeded()) {
profile = new WorkloadProfile(buildProfile(inputRecords));
LOG.info("Workload profile :" + profile);
workloadProfile = new WorkloadProfile(buildProfile(inputRecords), table.getIndex().canIndexLogFiles());
LOG.info("Input workload profile :" + workloadProfile);
}

final Partitioner partitioner = getPartitioner(workloadProfile);
try {
saveWorkloadProfileMetadataToInflight(workloadProfile, instantTime);
} catch (Exception e) {
HoodieTableMetaClient metaClient = table.getMetaClient();
HoodieInstant inflightInstant = new HoodieInstant(HoodieInstant.State.INFLIGHT, metaClient.getCommitActionType(), instantTime);
try {
saveWorkloadProfileMetadataToInflight(profile, instantTime);
} catch (Exception e) {
HoodieTableMetaClient metaClient = table.getMetaClient();
HoodieInstant inflightInstant = new HoodieInstant(HoodieInstant.State.INFLIGHT, metaClient.getCommitActionType(), instantTime);
try {
if (!metaClient.getFs().exists(new Path(metaClient.getMetaPath(), inflightInstant.getFileName()))) {
throw new HoodieCommitException("Failed to commit " + instantTime + " unable to save inflight metadata ", e);
}
} catch (IOException ex) {
LOG.error("Check file exists failed");
throw new HoodieCommitException("Failed to commit " + instantTime + " unable to save inflight metadata ", ex);
if (!metaClient.getFs().exists(new Path(metaClient.getMetaPath(), inflightInstant.getFileName()))) {
throw new HoodieCommitException("Failed to commit " + instantTime + " unable to save inflight metadata ", e);
}
} catch (IOException ex) {
LOG.error("Check file exists failed");
throw new HoodieCommitException("Failed to commit " + instantTime + " unable to save inflight metadata ", ex);
}
}

final Partitioner partitioner = getPartitioner(profile);
Map<Integer, List<HoodieRecord<T>>> partitionedRecords = partition(inputRecords, partitioner);

List<WriteStatus> writeStatuses = new LinkedList<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecordLocation;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.NumericUtils;
Expand Down Expand Up @@ -64,9 +65,9 @@ public class JavaUpsertPartitioner<T extends HoodieRecordPayload<T>> implements
*/
private int totalBuckets = 0;
/**
* Stat for the current workload. Helps in determining inserts, upserts etc.
* Stat for the input and output workload. Describe the workload before and after being assigned buckets.
*/
private WorkloadProfile profile;
private WorkloadProfile workloadProfile;
/**
* Helps decide which bucket an incoming update should go to.
*/
Expand All @@ -84,16 +85,16 @@ public class JavaUpsertPartitioner<T extends HoodieRecordPayload<T>> implements

protected final HoodieWriteConfig config;

public JavaUpsertPartitioner(WorkloadProfile profile, HoodieEngineContext context, HoodieTable table,
public JavaUpsertPartitioner(WorkloadProfile workloadProfile, HoodieEngineContext context, HoodieTable table,
HoodieWriteConfig config) {
updateLocationToBucket = new HashMap<>();
partitionPathToInsertBucketInfos = new HashMap<>();
bucketInfoMap = new HashMap<>();
this.profile = profile;
this.workloadProfile = workloadProfile;
this.table = table;
this.config = config;
assignUpdates(profile);
assignInserts(profile, context);
assignUpdates(workloadProfile);
assignInserts(workloadProfile, context);

LOG.info("Total Buckets :" + totalBuckets + ", buckets info => " + bucketInfoMap + ", \n"
+ "Partition to insert buckets => " + partitionPathToInsertBucketInfos + ", \n"
Expand All @@ -102,11 +103,19 @@ public JavaUpsertPartitioner(WorkloadProfile profile, HoodieEngineContext contex

private void assignUpdates(WorkloadProfile profile) {
// each update location gets a partition
Set<Map.Entry<String, WorkloadStat>> partitionStatEntries = profile.getPartitionPathStatMap().entrySet();
Set<Map.Entry<String, WorkloadStat>> partitionStatEntries = profile.getInputPartitionPathStatMap().entrySet();
for (Map.Entry<String, WorkloadStat> partitionStat : partitionStatEntries) {
WorkloadStat outputWorkloadStats = profile.getOutputPartitionPathStatMap().getOrDefault(partitionStat.getKey(), new WorkloadStat());
for (Map.Entry<String, Pair<String, Long>> updateLocEntry :
partitionStat.getValue().getUpdateLocationToCount().entrySet()) {
addUpdateBucket(partitionStat.getKey(), updateLocEntry.getKey());
if (profile.hasOutputWorkLoadStats()) {
HoodieRecordLocation hoodieRecordLocation = new HoodieRecordLocation(updateLocEntry.getValue().getKey(), updateLocEntry.getKey());
outputWorkloadStats.addUpdates(hoodieRecordLocation, updateLocEntry.getValue().getValue());
}
}
if (profile.hasOutputWorkLoadStats()) {
profile.updateOutputPartitionPathStatMap(partitionStat.getKey(), outputWorkloadStats);
}
}
}
Expand All @@ -133,6 +142,7 @@ private void assignInserts(WorkloadProfile profile, HoodieEngineContext context)

for (String partitionPath : partitionPaths) {
WorkloadStat pStat = profile.getWorkloadStat(partitionPath);
WorkloadStat outputWorkloadStats = profile.getOutputPartitionPathStatMap().getOrDefault(partitionPath, new WorkloadStat());
if (pStat.getNumInserts() > 0) {

List<SmallFile> smallFiles = partitionSmallFilesMap.getOrDefault(partitionPath, new ArrayList<>());
Expand All @@ -158,6 +168,9 @@ private void assignInserts(WorkloadProfile profile, HoodieEngineContext context)
bucket = addUpdateBucket(partitionPath, smallFile.location.getFileId());
LOG.info("Assigning " + recordsToAppend + " inserts to new update bucket " + bucket);
}
if (profile.hasOutputWorkLoadStats()) {
outputWorkloadStats.addInserts(smallFile.location, recordsToAppend);
}
bucketNumbers.add(bucket);
recordsPerBucket.add(recordsToAppend);
totalUnassignedInserts -= recordsToAppend;
Expand All @@ -183,6 +196,9 @@ private void assignInserts(WorkloadProfile profile, HoodieEngineContext context)
}
BucketInfo bucketInfo = new BucketInfo(BucketType.INSERT, FSUtils.createNewFileIdPfx(), partitionPath);
bucketInfoMap.put(totalBuckets, bucketInfo);
if (profile.hasOutputWorkLoadStats()) {
outputWorkloadStats.addInserts(new HoodieRecordLocation(HoodieWriteStat.NULL_COMMIT, bucketInfo.getFileIdPrefix()), recordsPerBucket.get(recordsPerBucket.size() - 1));
}
totalBuckets++;
}
}
Expand All @@ -200,6 +216,9 @@ private void assignInserts(WorkloadProfile profile, HoodieEngineContext context)
LOG.info("Total insert buckets for partition path " + partitionPath + " => " + insertBuckets);
partitionPathToInsertBucketInfos.put(partitionPath, insertBuckets);
}
if (profile.hasOutputWorkLoadStats()) {
profile.updateOutputPartitionPathStatMap(partitionPath, outputWorkloadStats);
}
}
}

Expand Down Expand Up @@ -271,7 +290,7 @@ public int getPartition(Object key) {
String partitionPath = keyLocation.getLeft().getPartitionPath();
List<InsertBucketCumulativeWeightPair> targetBuckets = partitionPathToInsertBucketInfos.get(partitionPath);
// pick the target bucket to use based on the weights.
final long totalInserts = Math.max(1, profile.getWorkloadStat(partitionPath).getNumInserts());
final long totalInserts = Math.max(1, workloadProfile.getWorkloadStat(partitionPath).getNumInserts());
final long hashOfKey = NumericUtils.getMessageDigestHash("MD5", keyLocation.getLeft().getRecordKey());
final double r = 1.0 * Math.floorMod(hashOfKey, totalInserts) / totalInserts;

Expand Down
Loading