diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/WorkloadProfile.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/WorkloadProfile.java index 7700e95d1d707..8e6160b095483 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/WorkloadProfile.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/WorkloadProfile.java @@ -33,9 +33,14 @@ public class WorkloadProfile implements Serializable { /** - * Computed workload profile. + * Computed workload stats. */ - protected final HashMap partitionPathStatMap; + protected final HashMap inputPartitionPathStatMap; + + /** + * Execution/Output workload stats + */ + protected final HashMap outputPartitionPathStatMap; /** * Global workloadStat. @@ -47,13 +52,21 @@ public class WorkloadProfile implements Serializable { */ private WriteOperationType operationType; + private final boolean hasOutputWorkLoadStats; + public WorkloadProfile(Pair, WorkloadStat> profile) { - this.partitionPathStatMap = profile.getLeft(); + this(profile, false); + } + + public WorkloadProfile(Pair, WorkloadStat> profile, boolean hasOutputWorkLoadStats) { + this.inputPartitionPathStatMap = profile.getLeft(); this.globalStat = profile.getRight(); + this.outputPartitionPathStatMap = new HashMap<>(); + this.hasOutputWorkLoadStats = hasOutputWorkLoadStats; } - public WorkloadProfile(Pair, WorkloadStat> profile, WriteOperationType operationType) { - this(profile); + public WorkloadProfile(Pair, WorkloadStat> profile, WriteOperationType operationType, boolean hasOutputWorkLoadStats) { + this(profile, hasOutputWorkLoadStats); this.operationType = operationType; } @@ -62,15 +75,37 @@ public WorkloadStat getGlobalStat() { } public Set getPartitionPaths() { - return partitionPathStatMap.keySet(); + return inputPartitionPathStatMap.keySet(); + } + + public Set getOutputPartitionPaths() { + return hasOutputWorkLoadStats ? outputPartitionPathStatMap.keySet() : inputPartitionPathStatMap.keySet(); } - public HashMap getPartitionPathStatMap() { - return partitionPathStatMap; + public HashMap getInputPartitionPathStatMap() { + return inputPartitionPathStatMap; + } + + public HashMap getOutputPartitionPathStatMap() { + return outputPartitionPathStatMap; + } + + public boolean hasOutputWorkLoadStats() { + return hasOutputWorkLoadStats; + } + + public void updateOutputPartitionPathStatMap(String partitionPath, WorkloadStat workloadStat) { + if (hasOutputWorkLoadStats) { + outputPartitionPathStatMap.put(partitionPath, workloadStat); + } } public WorkloadStat getWorkloadStat(String partitionPath) { - return partitionPathStatMap.get(partitionPath); + return inputPartitionPathStatMap.get(partitionPath); + } + + public WorkloadStat getOutputWorkloadStat(String partitionPath) { + return hasOutputWorkLoadStats ? outputPartitionPathStatMap.get(partitionPath) : inputPartitionPathStatMap.get(partitionPath); } public WriteOperationType getOperationType() { @@ -81,7 +116,8 @@ public WriteOperationType getOperationType() { public String toString() { final StringBuilder sb = new StringBuilder("WorkloadProfile {"); sb.append("globalStat=").append(globalStat).append(", "); - sb.append("partitionStat=").append(partitionPathStatMap).append(", "); + sb.append("InputPartitionStat=").append(inputPartitionPathStatMap).append(", "); + sb.append("OutputPartitionStat=").append(outputPartitionPathStatMap).append(", "); sb.append("operationType=").append(operationType); sb.append('}'); return sb.toString(); diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/WorkloadStat.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/WorkloadStat.java index c3371bab092db..327a5a3ae7980 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/WorkloadStat.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/WorkloadStat.java @@ -33,9 +33,12 @@ public class WorkloadStat implements Serializable { private long numUpdates = 0L; + private HashMap> insertLocationToCount; + private HashMap> updateLocationToCount; public WorkloadStat() { + insertLocationToCount = new HashMap<>(); updateLocationToCount = new HashMap<>(); } @@ -43,6 +46,17 @@ public long addInserts(long numInserts) { return this.numInserts += numInserts; } + public long addInserts(HoodieRecordLocation location, long numInserts) { + long accNumInserts = 0; + if (insertLocationToCount.containsKey(location.getFileId())) { + accNumInserts = insertLocationToCount.get(location.getFileId()).getRight(); + } + insertLocationToCount.put( + location.getFileId(), + Pair.of(location.getInstantTime(), numInserts + accNumInserts)); + return this.numInserts += numInserts; + } + public long addUpdates(HoodieRecordLocation location, long numUpdates) { long accNumUpdates = 0; if (updateLocationToCount.containsKey(location.getFileId())) { @@ -66,6 +80,10 @@ public HashMap> getUpdateLocationToCount() { return updateLocationToCount; } + public HashMap> getInsertLocationToCount() { + return insertLocationToCount; + } + @Override public String toString() { final StringBuilder sb = new StringBuilder("WorkloadStat {"); diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseCommitActionExecutor.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseCommitActionExecutor.java index a9cb03a2bb57c..b8d5948c1f453 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseCommitActionExecutor.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseCommitActionExecutor.java @@ -68,6 +68,7 @@ import java.util.Map; import java.util.Set; import java.util.stream.Collectors; +import java.util.stream.Stream; public abstract class BaseCommitActionExecutor extends BaseActionExecutor { @@ -108,22 +109,32 @@ void saveWorkloadProfileMetadataToInflight(WorkloadProfile profile, String insta throws HoodieCommitException { try { HoodieCommitMetadata metadata = new HoodieCommitMetadata(); - profile.getPartitionPaths().forEach(path -> { - WorkloadStat partitionStat = profile.getWorkloadStat(path); + profile.getOutputPartitionPaths().forEach(path -> { + WorkloadStat partitionStat = profile.getOutputWorkloadStat(path); HoodieWriteStat insertStat = new HoodieWriteStat(); insertStat.setNumInserts(partitionStat.getNumInserts()); insertStat.setFileId(""); insertStat.setPrevCommit(HoodieWriteStat.NULL_COMMIT); metadata.addWriteStat(path, insertStat); - - partitionStat.getUpdateLocationToCount().forEach((key, value) -> { - HoodieWriteStat writeStat = new HoodieWriteStat(); - writeStat.setFileId(key); - // TODO : Write baseCommitTime is possible here ? - writeStat.setPrevCommit(value.getKey()); - writeStat.setNumUpdateWrites(value.getValue()); - metadata.addWriteStat(path, writeStat); - }); + Map> updateLocationMap = partitionStat.getUpdateLocationToCount(); + Map> insertLocationMap = partitionStat.getInsertLocationToCount(); + Stream.concat(updateLocationMap.keySet().stream(), insertLocationMap.keySet().stream()) + .distinct() + .forEach(fileId -> { + HoodieWriteStat writeStat = new HoodieWriteStat(); + writeStat.setFileId(fileId); + Pair updateLocation = updateLocationMap.get(fileId); + Pair insertLocation = insertLocationMap.get(fileId); + // TODO : Write baseCommitTime is possible here ? + writeStat.setPrevCommit(updateLocation != null ? updateLocation.getKey() : insertLocation.getKey()); + if (updateLocation != null) { + writeStat.setNumUpdateWrites(updateLocation.getValue()); + } + if (insertLocation != null) { + writeStat.setNumInserts(insertLocation.getValue()); + } + metadata.addWriteStat(path, writeStat); + }); }); metadata.setOperationType(operationType); diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/HoodieCompactor.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/HoodieCompactor.java index a3b7a7fabd0d3..e238d40683b64 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/HoodieCompactor.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/HoodieCompactor.java @@ -182,14 +182,30 @@ public List compact(HoodieCompactionHandler compactionHandler, .withOperationField(config.allowOperationMetadataField()) .withPartition(operation.getPartitionPath()) .build(); - if (!scanner.iterator().hasNext()) { - scanner.close(); - return new ArrayList<>(); - } Option oldDataFileOpt = operation.getBaseFile(metaClient.getBasePath(), operation.getPartitionPath()); + // Considering following scenario: if all log blocks in this fileSlice is rollback, it returns an empty scanner. + // But in this case, we need to give it a base file. Otherwise, it will lose base file in following fileSlice. + if (!scanner.iterator().hasNext()) { + if (!oldDataFileOpt.isPresent()) { + scanner.close(); + return new ArrayList<>(); + } else { + // TODO: we may directly rename original parquet file if there is not evolution/devolution of schema + /* + TaskContextSupplier taskContextSupplier = hoodieCopyOnWriteTable.getTaskContextSupplier(); + String newFileName = FSUtils.makeDataFileName(instantTime, + FSUtils.makeWriteToken(taskContextSupplier.getPartitionIdSupplier().get(), taskContextSupplier.getStageIdSupplier().get(), taskContextSupplier.getAttemptIdSupplier().get()), + operation.getFileId(), hoodieCopyOnWriteTable.getBaseFileExtension()); + Path oldFilePath = new Path(oldDataFileOpt.get().getPath()); + Path newFilePath = new Path(oldFilePath.getParent(), newFileName); + FileUtil.copy(fs,oldFilePath, fs, newFilePath, false, fs.getConf()); + */ + } + } + // Compacting is very similar to applying updates to existing file Iterator> result; // If the dataFile is present, perform updates else perform inserts into a new base file. diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/RollbackUtils.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/RollbackUtils.java index 6c17ee2369ff4..2bc9b59b0d1f1 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/RollbackUtils.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/RollbackUtils.java @@ -199,7 +199,7 @@ public static List generateRollbackRequestsUsingFil partitionRollbackRequests.add( ListingBasedRollbackRequest.createRollbackRequestWithDeleteDataAndLogFilesAction(partitionPath)); - // append rollback blocks for updates + // append rollback blocks for updates and inserts as A.2 and B.2 if (commitMetadata.getPartitionToWriteStats().containsKey(partitionPath)) { partitionRollbackRequests .addAll(generateAppendRollbackBlocksAction(partitionPath, instantToRollback, commitMetadata, table)); diff --git a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/BaseJavaCommitActionExecutor.java b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/BaseJavaCommitActionExecutor.java index 613cce28259e1..dc6994d315f02 100644 --- a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/BaseJavaCommitActionExecutor.java +++ b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/BaseJavaCommitActionExecutor.java @@ -91,27 +91,27 @@ public BaseJavaCommitActionExecutor(HoodieEngineContext context, public HoodieWriteMetadata> execute(List> inputRecords) { HoodieWriteMetadata> result = new HoodieWriteMetadata<>(); - WorkloadProfile profile = null; + WorkloadProfile workloadProfile = null; if (isWorkloadProfileNeeded()) { - profile = new WorkloadProfile(buildProfile(inputRecords)); - LOG.info("Workload profile :" + profile); + workloadProfile = new WorkloadProfile(buildProfile(inputRecords), table.getIndex().canIndexLogFiles()); + LOG.info("Input workload profile :" + workloadProfile); + } + + final Partitioner partitioner = getPartitioner(workloadProfile); + try { + saveWorkloadProfileMetadataToInflight(workloadProfile, instantTime); + } catch (Exception e) { + HoodieTableMetaClient metaClient = table.getMetaClient(); + HoodieInstant inflightInstant = new HoodieInstant(HoodieInstant.State.INFLIGHT, metaClient.getCommitActionType(), instantTime); try { - saveWorkloadProfileMetadataToInflight(profile, instantTime); - } catch (Exception e) { - HoodieTableMetaClient metaClient = table.getMetaClient(); - HoodieInstant inflightInstant = new HoodieInstant(HoodieInstant.State.INFLIGHT, metaClient.getCommitActionType(), instantTime); - try { - if (!metaClient.getFs().exists(new Path(metaClient.getMetaPath(), inflightInstant.getFileName()))) { - throw new HoodieCommitException("Failed to commit " + instantTime + " unable to save inflight metadata ", e); - } - } catch (IOException ex) { - LOG.error("Check file exists failed"); - throw new HoodieCommitException("Failed to commit " + instantTime + " unable to save inflight metadata ", ex); + if (!metaClient.getFs().exists(new Path(metaClient.getMetaPath(), inflightInstant.getFileName()))) { + throw new HoodieCommitException("Failed to commit " + instantTime + " unable to save inflight metadata ", e); } + } catch (IOException ex) { + LOG.error("Check file exists failed"); + throw new HoodieCommitException("Failed to commit " + instantTime + " unable to save inflight metadata ", ex); } } - - final Partitioner partitioner = getPartitioner(profile); Map>> partitionedRecords = partition(inputRecords, partitioner); List writeStatuses = new LinkedList<>(); diff --git a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaUpsertPartitioner.java b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaUpsertPartitioner.java index f8aaba52934e2..deaf934cf5d03 100644 --- a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaUpsertPartitioner.java +++ b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaUpsertPartitioner.java @@ -25,6 +25,7 @@ import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodieRecordLocation; import org.apache.hudi.common.model.HoodieRecordPayload; +import org.apache.hudi.common.model.HoodieWriteStat; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.util.NumericUtils; @@ -64,9 +65,9 @@ public class JavaUpsertPartitioner> implements */ private int totalBuckets = 0; /** - * Stat for the current workload. Helps in determining inserts, upserts etc. + * Stat for the input and output workload. Describe the workload before and after being assigned buckets. */ - private WorkloadProfile profile; + private WorkloadProfile workloadProfile; /** * Helps decide which bucket an incoming update should go to. */ @@ -84,16 +85,16 @@ public class JavaUpsertPartitioner> implements protected final HoodieWriteConfig config; - public JavaUpsertPartitioner(WorkloadProfile profile, HoodieEngineContext context, HoodieTable table, + public JavaUpsertPartitioner(WorkloadProfile workloadProfile, HoodieEngineContext context, HoodieTable table, HoodieWriteConfig config) { updateLocationToBucket = new HashMap<>(); partitionPathToInsertBucketInfos = new HashMap<>(); bucketInfoMap = new HashMap<>(); - this.profile = profile; + this.workloadProfile = workloadProfile; this.table = table; this.config = config; - assignUpdates(profile); - assignInserts(profile, context); + assignUpdates(workloadProfile); + assignInserts(workloadProfile, context); LOG.info("Total Buckets :" + totalBuckets + ", buckets info => " + bucketInfoMap + ", \n" + "Partition to insert buckets => " + partitionPathToInsertBucketInfos + ", \n" @@ -102,11 +103,19 @@ public JavaUpsertPartitioner(WorkloadProfile profile, HoodieEngineContext contex private void assignUpdates(WorkloadProfile profile) { // each update location gets a partition - Set> partitionStatEntries = profile.getPartitionPathStatMap().entrySet(); + Set> partitionStatEntries = profile.getInputPartitionPathStatMap().entrySet(); for (Map.Entry partitionStat : partitionStatEntries) { + WorkloadStat outputWorkloadStats = profile.getOutputPartitionPathStatMap().getOrDefault(partitionStat.getKey(), new WorkloadStat()); for (Map.Entry> updateLocEntry : partitionStat.getValue().getUpdateLocationToCount().entrySet()) { addUpdateBucket(partitionStat.getKey(), updateLocEntry.getKey()); + if (profile.hasOutputWorkLoadStats()) { + HoodieRecordLocation hoodieRecordLocation = new HoodieRecordLocation(updateLocEntry.getValue().getKey(), updateLocEntry.getKey()); + outputWorkloadStats.addUpdates(hoodieRecordLocation, updateLocEntry.getValue().getValue()); + } + } + if (profile.hasOutputWorkLoadStats()) { + profile.updateOutputPartitionPathStatMap(partitionStat.getKey(), outputWorkloadStats); } } } @@ -133,6 +142,7 @@ private void assignInserts(WorkloadProfile profile, HoodieEngineContext context) for (String partitionPath : partitionPaths) { WorkloadStat pStat = profile.getWorkloadStat(partitionPath); + WorkloadStat outputWorkloadStats = profile.getOutputPartitionPathStatMap().getOrDefault(partitionPath, new WorkloadStat()); if (pStat.getNumInserts() > 0) { List smallFiles = partitionSmallFilesMap.getOrDefault(partitionPath, new ArrayList<>()); @@ -158,6 +168,9 @@ private void assignInserts(WorkloadProfile profile, HoodieEngineContext context) bucket = addUpdateBucket(partitionPath, smallFile.location.getFileId()); LOG.info("Assigning " + recordsToAppend + " inserts to new update bucket " + bucket); } + if (profile.hasOutputWorkLoadStats()) { + outputWorkloadStats.addInserts(smallFile.location, recordsToAppend); + } bucketNumbers.add(bucket); recordsPerBucket.add(recordsToAppend); totalUnassignedInserts -= recordsToAppend; @@ -183,6 +196,9 @@ private void assignInserts(WorkloadProfile profile, HoodieEngineContext context) } BucketInfo bucketInfo = new BucketInfo(BucketType.INSERT, FSUtils.createNewFileIdPfx(), partitionPath); bucketInfoMap.put(totalBuckets, bucketInfo); + if (profile.hasOutputWorkLoadStats()) { + outputWorkloadStats.addInserts(new HoodieRecordLocation(HoodieWriteStat.NULL_COMMIT, bucketInfo.getFileIdPrefix()), recordsPerBucket.get(recordsPerBucket.size() - 1)); + } totalBuckets++; } } @@ -200,6 +216,9 @@ private void assignInserts(WorkloadProfile profile, HoodieEngineContext context) LOG.info("Total insert buckets for partition path " + partitionPath + " => " + insertBuckets); partitionPathToInsertBucketInfos.put(partitionPath, insertBuckets); } + if (profile.hasOutputWorkLoadStats()) { + profile.updateOutputPartitionPathStatMap(partitionPath, outputWorkloadStats); + } } } @@ -271,7 +290,7 @@ public int getPartition(Object key) { String partitionPath = keyLocation.getLeft().getPartitionPath(); List targetBuckets = partitionPathToInsertBucketInfos.get(partitionPath); // pick the target bucket to use based on the weights. - final long totalInserts = Math.max(1, profile.getWorkloadStat(partitionPath).getNumInserts()); + final long totalInserts = Math.max(1, workloadProfile.getWorkloadStat(partitionPath).getNumInserts()); final long hashOfKey = NumericUtils.getMessageDigestHash("MD5", keyLocation.getLeft().getRecordKey()); final double r = 1.0 * Math.floorMod(hashOfKey, totalInserts) / totalInserts; diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java index a9710a0d0d3e1..dd504476b2aca 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java @@ -156,19 +156,22 @@ public HoodieWriteMetadata> execute(JavaRDD LOG.info("RDD PreppedRecords was persisted at: " + inputRecordsRDD.getStorageLevel()); } - WorkloadProfile profile = null; + WorkloadProfile workloadProfile = null; if (isWorkloadProfileNeeded()) { context.setJobStatus(this.getClass().getSimpleName(), "Building workload profile"); - profile = new WorkloadProfile(buildProfile(inputRecordsRDD), operationType); - LOG.info("Workload profile :" + profile); - saveWorkloadProfileMetadataToInflight(profile, instantTime); + workloadProfile = new WorkloadProfile(buildProfile(inputRecordsRDD), operationType, table.getIndex().canIndexLogFiles()); + LOG.info("Input workload profile :" + workloadProfile); + } + + // partition using the insert partitioner + final Partitioner partitioner = getPartitioner(workloadProfile); + if (isWorkloadProfileNeeded()) { + saveWorkloadProfileMetadataToInflight(workloadProfile, instantTime); } // handle records update with clustering JavaRDD> inputRecordsRDDWithClusteringUpdate = clusteringHandleUpdate(inputRecordsRDD); - // partition using the insert partitioner - final Partitioner partitioner = getPartitioner(profile); context.setJobStatus(this.getClass().getSimpleName(), "Doing partition and writing data"); JavaRDD> partitionedRecords = partition(inputRecordsRDDWithClusteringUpdate, partitioner); JavaRDD writeStatusRDD = partitionedRecords.mapPartitionsWithIndex((partition, recordItr) -> { diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkBucketIndexPartitioner.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkBucketIndexPartitioner.java index a757235045108..65a45e1c6a047 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkBucketIndexPartitioner.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkBucketIndexPartitioner.java @@ -90,7 +90,7 @@ public SparkBucketIndexPartitioner(WorkloadProfile profile, private void assignUpdates(WorkloadProfile profile) { updatePartitionPathFileIds = new HashMap<>(); // each update location gets a partition - Set> partitionStatEntries = profile.getPartitionPathStatMap() + Set> partitionStatEntries = profile.getInputPartitionPathStatMap() .entrySet(); for (Entry partitionStat : partitionStatEntries) { if (!updatePartitionPathFileIds.containsKey(partitionStat.getKey())) { diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/UpsertPartitioner.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/UpsertPartitioner.java index 98f2539c69f38..c54c526253f0b 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/UpsertPartitioner.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/UpsertPartitioner.java @@ -26,6 +26,7 @@ import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodieRecordLocation; import org.apache.hudi.common.model.HoodieRecordPayload; +import org.apache.hudi.common.model.HoodieWriteStat; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.util.NumericUtils; @@ -100,11 +101,19 @@ public UpsertPartitioner(WorkloadProfile profile, HoodieEngineContext context, H private void assignUpdates(WorkloadProfile profile) { // each update location gets a partition - Set> partitionStatEntries = profile.getPartitionPathStatMap().entrySet(); + Set> partitionStatEntries = profile.getInputPartitionPathStatMap().entrySet(); for (Map.Entry partitionStat : partitionStatEntries) { + WorkloadStat outputWorkloadStats = profile.getOutputPartitionPathStatMap().getOrDefault(partitionStat.getKey(), new WorkloadStat()); for (Map.Entry> updateLocEntry : partitionStat.getValue().getUpdateLocationToCount().entrySet()) { addUpdateBucket(partitionStat.getKey(), updateLocEntry.getKey()); + if (profile.hasOutputWorkLoadStats()) { + HoodieRecordLocation hoodieRecordLocation = new HoodieRecordLocation(updateLocEntry.getValue().getKey(), updateLocEntry.getKey()); + outputWorkloadStats.addUpdates(hoodieRecordLocation, updateLocEntry.getValue().getValue()); + } + } + if (profile.hasOutputWorkLoadStats()) { + profile.updateOutputPartitionPathStatMap(partitionStat.getKey(), outputWorkloadStats); } } } @@ -161,6 +170,7 @@ private void assignInserts(WorkloadProfile profile, HoodieEngineContext context) for (String partitionPath : partitionPaths) { WorkloadStat pStat = profile.getWorkloadStat(partitionPath); + WorkloadStat outputWorkloadStats = profile.getOutputPartitionPathStatMap().getOrDefault(partitionPath, new WorkloadStat()); if (pStat.getNumInserts() > 0) { List smallFiles = @@ -189,6 +199,9 @@ private void assignInserts(WorkloadProfile profile, HoodieEngineContext context) bucket = addUpdateBucket(partitionPath, smallFile.location.getFileId()); LOG.info("Assigning " + recordsToAppend + " inserts to new update bucket " + bucket); } + if (profile.hasOutputWorkLoadStats()) { + outputWorkloadStats.addInserts(smallFile.location, recordsToAppend); + } bucketNumbers.add(bucket); recordsPerBucket.add(recordsToAppend); totalUnassignedInserts -= recordsToAppend; @@ -218,6 +231,9 @@ private void assignInserts(WorkloadProfile profile, HoodieEngineContext context) } BucketInfo bucketInfo = new BucketInfo(BucketType.INSERT, FSUtils.createNewFileIdPfx(), partitionPath); bucketInfoMap.put(totalBuckets, bucketInfo); + if (profile.hasOutputWorkLoadStats()) { + outputWorkloadStats.addInserts(new HoodieRecordLocation(HoodieWriteStat.NULL_COMMIT, bucketInfo.getFileIdPrefix()), recordsPerBucket.get(recordsPerBucket.size() - 1)); + } totalBuckets++; } } @@ -235,6 +251,9 @@ private void assignInserts(WorkloadProfile profile, HoodieEngineContext context) LOG.info("Total insert buckets for partition path " + partitionPath + " => " + insertBuckets); partitionPathToInsertBucketInfos.put(partitionPath, insertBuckets); } + if (profile.hasOutputWorkLoadStats()) { + profile.updateOutputPartitionPathStatMap(partitionPath, outputWorkloadStats); + } } } @@ -302,6 +321,11 @@ public int numPartitions() { return totalBuckets; } + @Override + public int getNumPartitions() { + return totalBuckets; + } + @Override public int getPartition(Object key) { Tuple2> keyLocation = diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/rollback/TestMergeOnReadRollbackActionExecutor.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/rollback/TestMergeOnReadRollbackActionExecutor.java index 86dd3b361f903..d73759de8d5bc 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/rollback/TestMergeOnReadRollbackActionExecutor.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/rollback/TestMergeOnReadRollbackActionExecutor.java @@ -20,17 +20,33 @@ import org.apache.hudi.avro.model.HoodieRollbackPartitionMetadata; import org.apache.hudi.client.SparkRDDWriteClient; +import org.apache.hudi.client.WriteStatus; +import org.apache.hudi.common.fs.ConsistencyGuardConfig; import org.apache.hudi.common.model.FileSlice; +import org.apache.hudi.common.model.HoodieCommitMetadata; import org.apache.hudi.common.model.HoodieFileGroup; import org.apache.hudi.common.model.HoodieLogFile; +import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieTableType; +import org.apache.hudi.common.model.HoodieWriteStat; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieTimeline; +import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion; +import org.apache.hudi.common.table.view.FileSystemViewStorageConfig; +import org.apache.hudi.common.table.view.FileSystemViewStorageType; +import org.apache.hudi.common.table.view.SyncableFileSystemView; import org.apache.hudi.common.testutils.HoodieTestDataGenerator; +import org.apache.hudi.common.util.StringUtils; +import org.apache.hudi.config.HoodieCompactionConfig; +import org.apache.hudi.config.HoodieIndexConfig; +import org.apache.hudi.config.HoodieStorageConfig; import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.index.HoodieIndex; import org.apache.hudi.table.HoodieTable; import org.apache.hudi.table.marker.WriteMarkersFactory; +import org.apache.hudi.testutils.MetadataMergeWriteStatus; +import org.apache.spark.api.java.JavaRDD; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeEach; @@ -40,9 +56,11 @@ import java.io.IOException; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.Map; import java.util.stream.Collectors; +import java.util.stream.Stream; import static org.apache.hudi.common.testutils.HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH; import static org.apache.hudi.common.testutils.HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH; @@ -139,6 +157,131 @@ public void testMergeOnReadRollbackActionExecutor(boolean isUsingMarkers) throws assertFalse(WriteMarkersFactory.get(cfg.getMarkersType(), table, "002").doesMarkerDirExist()); } + @Test + public void testRollbackForCanIndexLogFile() throws IOException { + cleanupResources(); + setUpDFS(); + //1. prepare data and assert data result + //just generate one partitions + dataGen = new HoodieTestDataGenerator(new String[]{DEFAULT_FIRST_PARTITION_PATH}); + HoodieWriteConfig cfg = HoodieWriteConfig.newBuilder().withPath(basePath).withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA) + .withParallelism(2, 2).withBulkInsertParallelism(2).withFinalizeWriteParallelism(2).withDeleteParallelism(2) + .withTimelineLayoutVersion(TimelineLayoutVersion.CURR_VERSION) + .withWriteStatusClass(MetadataMergeWriteStatus.class) + .withConsistencyGuardConfig(ConsistencyGuardConfig.newBuilder().withConsistencyCheckEnabled(true).build()) + .withCompactionConfig(HoodieCompactionConfig.newBuilder().compactionSmallFileSize(1024 * 1024).build()) + .withStorageConfig(HoodieStorageConfig.newBuilder().hfileMaxFileSize(1024 * 1024).parquetMaxFileSize(1024 * 1024).build()) + .forTable("test-trip-table") + .withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.INMEMORY).build()) + .withEmbeddedTimelineServerEnabled(true).withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder() + .withEnableBackupForRemoteFileSystemView(false) // Fail test if problem connecting to timeline-server + .withStorageType(FileSystemViewStorageType.EMBEDDED_KV_STORE).build()).withRollbackUsingMarkers(false).withAutoCommit(false).build(); + + //1. prepare data + HoodieTestDataGenerator.writePartitionMetadata(fs, new String[]{DEFAULT_FIRST_PARTITION_PATH}, basePath); + SparkRDDWriteClient client = getHoodieWriteClient(cfg); + // Write 1 (only inserts) + String newCommitTime = "001"; + client.startCommitWithTime(newCommitTime); + List records = dataGen.generateInsertsForPartition(newCommitTime, 2, DEFAULT_FIRST_PARTITION_PATH); + JavaRDD writeRecords = jsc.parallelize(records, 1); + JavaRDD statuses = client.upsert(writeRecords, newCommitTime); + org.apache.hudi.testutils.Assertions.assertNoWriteErrors(statuses.collect()); + client.commit(newCommitTime, statuses); + + // check fileSlice + HoodieTable table = this.getHoodieTable(metaClient, cfg); + SyncableFileSystemView fsView = getFileSystemViewWithUnCommittedSlices(table.getMetaClient()); + List firstPartitionCommit2FileGroups = fsView.getAllFileGroups(DEFAULT_FIRST_PARTITION_PATH).collect(Collectors.toList()); + assertEquals(1, firstPartitionCommit2FileGroups.size()); + assertEquals(1, (int) firstPartitionCommit2FileGroups.get(0).getAllFileSlices().count()); + assertFalse(firstPartitionCommit2FileGroups.get(0).getAllFileSlices().findFirst().get().getBaseFile().isPresent()); + assertEquals(1, firstPartitionCommit2FileGroups.get(0).getAllFileSlices().findFirst().get().getLogFiles().count()); + String generatedFileID = firstPartitionCommit2FileGroups.get(0).getFileGroupId().getFileId(); + + // check hoodieCommitMeta + HoodieCommitMetadata commitMetadata = HoodieCommitMetadata.fromBytes( + table.getMetaClient().getCommitTimeline() + .getInstantDetails(new HoodieInstant(true, HoodieTimeline.DELTA_COMMIT_ACTION, "001")) + .get(), + HoodieCommitMetadata.class); + List firstPartitionWriteStat = commitMetadata.getPartitionToWriteStats().get(DEFAULT_FIRST_PARTITION_PATH); + assertEquals(2, firstPartitionWriteStat.size()); + // we have an empty writeStat for all partition + assert firstPartitionWriteStat.stream().anyMatch(wStat -> StringUtils.isNullOrEmpty(wStat.getFileId())); + // we have one non-empty writeStat which must contains update or insert + assertEquals(1, firstPartitionWriteStat.stream().filter(wStat -> !StringUtils.isNullOrEmpty(wStat.getFileId())).count()); + firstPartitionWriteStat.stream().filter(wStat -> !StringUtils.isNullOrEmpty(wStat.getFileId())).forEach(wStat -> { + assert wStat.getNumInserts() > 0; + }); + + // Write 2 (inserts) + newCommitTime = "002"; + client.startCommitWithTime(newCommitTime); + List updateRecords = Collections.singletonList(dataGen.generateUpdateRecord(records.get(0).getKey(), newCommitTime)); + List insertRecordsInSamePartition = dataGen.generateInsertsForPartition(newCommitTime, 2, DEFAULT_FIRST_PARTITION_PATH); + List insertRecordsInOtherPartition = dataGen.generateInsertsForPartition(newCommitTime, 2, DEFAULT_SECOND_PARTITION_PATH); + List recordsToBeWrite = Stream.concat(Stream.concat(updateRecords.stream(), insertRecordsInSamePartition.stream()), insertRecordsInOtherPartition.stream()) + .collect(Collectors.toList()); + writeRecords = jsc.parallelize(recordsToBeWrite, 1); + statuses = client.upsert(writeRecords, newCommitTime); + client.commit(newCommitTime, statuses); + table = this.getHoodieTable(metaClient, cfg); + commitMetadata = HoodieCommitMetadata.fromBytes( + table.getMetaClient().getCommitTimeline() + .getInstantDetails(new HoodieInstant(false, HoodieTimeline.DELTA_COMMIT_ACTION, newCommitTime)) + .get(), + HoodieCommitMetadata.class); + assert commitMetadata.getPartitionToWriteStats().containsKey(DEFAULT_FIRST_PARTITION_PATH); + assert commitMetadata.getPartitionToWriteStats().containsKey(DEFAULT_SECOND_PARTITION_PATH); + List hoodieWriteStatOptionList = commitMetadata.getPartitionToWriteStats().get(DEFAULT_FIRST_PARTITION_PATH); + // Both update and insert record should enter same existing fileGroup due to small file handling + assertEquals(1, hoodieWriteStatOptionList.size()); + assertEquals(generatedFileID, hoodieWriteStatOptionList.get(0).getFileId()); + // check insert and update numbers + assertEquals(2, hoodieWriteStatOptionList.get(0).getNumInserts()); + assertEquals(1, hoodieWriteStatOptionList.get(0).getNumUpdateWrites()); + + List secondHoodieWriteStatOptionList = commitMetadata.getPartitionToWriteStats().get(DEFAULT_SECOND_PARTITION_PATH); + // All insert should enter one fileGroup + assertEquals(1, secondHoodieWriteStatOptionList.size()); + String fileIdInPartitionTwo = secondHoodieWriteStatOptionList.get(0).getFileId(); + assertEquals(2, hoodieWriteStatOptionList.get(0).getNumInserts()); + + // Rollback + HoodieInstant rollBackInstant = new HoodieInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.DELTA_COMMIT_ACTION, "002"); + BaseRollbackPlanActionExecutor mergeOnReadRollbackPlanActionExecutor = + new BaseRollbackPlanActionExecutor(context, cfg, table, "003", rollBackInstant, false, + cfg.shouldRollbackUsingMarkers()); + mergeOnReadRollbackPlanActionExecutor.execute().get(); + MergeOnReadRollbackActionExecutor mergeOnReadRollbackActionExecutor = new MergeOnReadRollbackActionExecutor( + context, + cfg, + table, + "003", + rollBackInstant, + true, + false); + + //3. assert the rollback stat + Map rollbackMetadata = mergeOnReadRollbackActionExecutor.execute().getPartitionMetadata(); + assertEquals(2, rollbackMetadata.size()); + + //4. assert filegroup after rollback, and compare to the rollbackstat + // assert the first partition data and log file size + HoodieRollbackPartitionMetadata partitionMetadata = rollbackMetadata.get(DEFAULT_FIRST_PARTITION_PATH); + assertTrue(partitionMetadata.getSuccessDeleteFiles().isEmpty()); + assertTrue(partitionMetadata.getFailedDeleteFiles().isEmpty()); + assertEquals(1, partitionMetadata.getRollbackLogFiles().size()); + + // assert the second partition data and log file size + partitionMetadata = rollbackMetadata.get(DEFAULT_SECOND_PARTITION_PATH); + assertEquals(1, partitionMetadata.getSuccessDeleteFiles().size()); + assertTrue(partitionMetadata.getFailedDeleteFiles().isEmpty()); + assertTrue(partitionMetadata.getRollbackLogFiles().isEmpty()); + assertEquals(1, partitionMetadata.getSuccessDeleteFiles().size()); + } + @Test public void testFailForCompletedInstants() { Assertions.assertThrows(IllegalArgumentException.class, () -> { @@ -169,4 +312,13 @@ public void testRollbackWhenFirstCommitFail() throws Exception { client.rollback("001"); } } + + private void setUpDFS() throws IOException { + initDFS(); + initSparkContexts(); + //just generate two partitions + dataGen = new HoodieTestDataGenerator(new String[] {DEFAULT_FIRST_PARTITION_PATH, DEFAULT_SECOND_PARTITION_PATH}); + initFileSystem(); + initDFSMetaClient(); + } }