Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -71,20 +71,26 @@ public SparkRecentDaysClusteringPlanStrategy(HoodieSparkMergeOnReadTable<T> tabl
protected Stream<HoodieClusteringGroup> buildClusteringGroupsForPartition(String partitionPath, List<FileSlice> fileSlices) {
List<Pair<List<FileSlice>, Integer>> fileSliceGroups = new ArrayList<>();
List<FileSlice> currentGroup = new ArrayList<>();
int totalSizeSoFar = 0;
long totalSizeSoFar = 0;
for (FileSlice currentSlice : fileSlices) {
// assume each filegroup size is ~= parquet.max.file.size
totalSizeSoFar += currentSlice.getBaseFile().isPresent() ? currentSlice.getBaseFile().get().getFileSize() : getWriteConfig().getParquetMaxFileSize();
// check if max size is reached and create new group, if needed.
if (totalSizeSoFar >= getWriteConfig().getClusteringMaxBytesInGroup() && !currentGroup.isEmpty()) {
fileSliceGroups.add(Pair.of(currentGroup, getNumberOfOutputFileGroups(totalSizeSoFar, getWriteConfig().getClusteringTargetFileMaxBytes())));
int numOutputGroups = getNumberOfOutputFileGroups(totalSizeSoFar, getWriteConfig().getClusteringTargetFileMaxBytes());
Copy link
Member Author

@satishkotha satishkotha Jan 28, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is just for logging improvement

LOG.info("Adding one clustering group " + totalSizeSoFar + " max bytes: "
+ getWriteConfig().getClusteringMaxBytesInGroup() + " num input slices: " + currentGroup.size() + " output groups: " + numOutputGroups);
fileSliceGroups.add(Pair.of(currentGroup, numOutputGroups));
currentGroup = new ArrayList<>();
totalSizeSoFar = 0;
}
currentGroup.add(currentSlice);
}
if (!currentGroup.isEmpty()) {
fileSliceGroups.add(Pair.of(currentGroup, getNumberOfOutputFileGroups(totalSizeSoFar, getWriteConfig().getClusteringTargetFileMaxBytes())));
int numOutputGroups = getNumberOfOutputFileGroups(totalSizeSoFar, getWriteConfig().getClusteringTargetFileMaxBytes());
LOG.info("Adding final clustering group " + totalSizeSoFar + " max bytes: "
+ getWriteConfig().getClusteringMaxBytesInGroup() + " num input slices: " + currentGroup.size() + " output groups: " + numOutputGroups);
fileSliceGroups.add(Pair.of(currentGroup, numOutputGroups));
}

return fileSliceGroups.stream().map(fileSliceGroup -> HoodieClusteringGroup.newBuilder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieStorageConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.execution.bulkinsert.RDDCustomColumnsSortPartitioner;
import org.apache.hudi.table.BulkInsertPartitioner;
Expand Down Expand Up @@ -66,10 +67,12 @@ public SparkSortAndSizeExecutionStrategy(HoodieSparkMergeOnReadTable<T> table,
@Override
public JavaRDD<WriteStatus> performClustering(final JavaRDD<HoodieRecord<T>> inputRecords, final int numOutputGroups,
final String instantTime, final Map<String, String> strategyParams, final Schema schema) {
LOG.info("Starting clustering for a group, parallelism:" + numOutputGroups + " commit:" + instantTime);
Properties props = getWriteConfig().getProps();
props.put(HoodieWriteConfig.BULKINSERT_PARALLELISM, String.valueOf(numOutputGroups));
// We are calling another action executor - disable auto commit. Strategy is only expected to write data in new files.
props.put(HoodieWriteConfig.HOODIE_AUTO_COMMIT_PROP, Boolean.FALSE.toString());
props.put(HoodieStorageConfig.PARQUET_FILE_MAX_BYTES, String.valueOf(getWriteConfig().getClusteringTargetFileMaxBytes()));
HoodieWriteConfig newConfig = HoodieWriteConfig.newBuilder().withProps(props).build();
return (JavaRDD<WriteStatus>) SparkBulkInsertHelper.newInstance().bulkInsert(inputRecords, instantTime, getHoodieTable(), newConfig,
false, getPartitioner(strategyParams, schema), true, numOutputGroups);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,12 +94,13 @@ public HoodieWriteMetadata<JavaRDD<WriteStatus>> execute() {
.map(inputGroup -> runClusteringForGroupAsync(inputGroup, clusteringPlan.getStrategy().getStrategyParams()))
.map(CompletableFuture::join)
.reduce((rdd1, rdd2) -> rdd1.union(rdd2)).orElse(engineContext.emptyRDD());
if (writeStatusRDD.isEmpty()) {
throw new HoodieClusteringException("Clustering plan produced 0 WriteStatus for " + instantTime + " #groups: " + clusteringPlan.getInputGroups().size());
}


HoodieWriteMetadata<JavaRDD<WriteStatus>> writeMetadata = buildWriteMetadata(writeStatusRDD);
updateIndexAndCommitIfNeeded(writeStatusRDD, writeMetadata);
JavaRDD<WriteStatus> statuses = updateIndex(writeStatusRDD, writeMetadata);
writeMetadata.setWriteStats(statuses.map(WriteStatus::getStat).collect());
// validate clustering action before committing result
validateWriteResult(writeMetadata);
commitOnAutoCommit(writeMetadata);
if (!writeMetadata.getCommitMetadata().isPresent()) {
HoodieCommitMetadata commitMetadata = CommitUtils.buildMetadata(writeStatusRDD.map(WriteStatus::getStat).collect(), writeMetadata.getPartitionToReplaceFileIds(),
extraMetadata, operationType, getSchemaToStoreInCommit(), getCommitActionType());
Expand All @@ -108,6 +109,21 @@ public HoodieWriteMetadata<JavaRDD<WriteStatus>> execute() {
return writeMetadata;
}

/**
* Validate actions taken by clustering. In the first implementation, we validate at least one new file is written.
* But we can extend this to add more validation. E.g. number of records read = number of records written etc.
*
* We can also make these validations in BaseCommitActionExecutor to reuse pre-commit hooks for multiple actions.
*/
private void validateWriteResult(HoodieWriteMetadata<JavaRDD<WriteStatus>> writeMetadata) {
if (writeMetadata.getWriteStatuses().isEmpty()) {
throw new HoodieClusteringException("Clustering plan produced 0 WriteStatus for " + instantTime
+ " #groups: " + clusteringPlan.getInputGroups().size() + " expected at least "
+ clusteringPlan.getInputGroups().stream().mapToInt(HoodieClusteringGroup::getNumOutputFileGroups).sum()
+ " write statuses");
}
}

/**
* Submit job to execute clustering for the group.
*/
Expand Down Expand Up @@ -222,7 +238,6 @@ private HoodieWriteMetadata<JavaRDD<WriteStatus>> buildWriteMetadata(JavaRDD<Wri
HoodieWriteMetadata<JavaRDD<WriteStatus>> result = new HoodieWriteMetadata<>();
result.setPartitionToReplaceFileIds(getPartitionToReplacedFileIds(writeStatusJavaRDD));
result.setWriteStatuses(writeStatusJavaRDD);
result.setWriteStats(writeStatusJavaRDD.map(WriteStatus::getStat).collect());
result.setCommitMetadata(Option.empty());
result.setCommitted(false);
return result;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ record -> new Tuple2<>(new Tuple2<>(record.getKey(), Option.ofNullable(record.ge
return partitionedRDD.map(Tuple2::_2);
}

protected void updateIndexAndCommitIfNeeded(JavaRDD<WriteStatus> writeStatusRDD, HoodieWriteMetadata result) {
protected JavaRDD<WriteStatus> updateIndex(JavaRDD<WriteStatus> writeStatusRDD, HoodieWriteMetadata result) {
// cache writeStatusRDD before updating index, so that all actions before this are not triggered again for future
// RDD actions that are performed after updating the index.
writeStatusRDD = writeStatusRDD.persist(SparkMemoryUtils.getWriteStatusStorageLevel(config.getProps()));
Expand All @@ -218,6 +218,11 @@ protected void updateIndexAndCommitIfNeeded(JavaRDD<WriteStatus> writeStatusRDD,
result.setIndexUpdateDuration(Duration.between(indexStartTime, Instant.now()));
result.setWriteStatuses(statuses);
result.setPartitionToReplaceFileIds(getPartitionToReplacedFileIds(statuses));
return statuses;
}

protected void updateIndexAndCommitIfNeeded(JavaRDD<WriteStatus> writeStatusRDD, HoodieWriteMetadata result) {
updateIndex(writeStatusRDD, result);
commitOnAutoCommit(result);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ package org.apache.hudi.functional
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hudi.common.model.FileSlice
import org.apache.hudi.common.table.HoodieTableMetaClient
import org.apache.hudi.common.testutils.{HoodieTestDataGenerator, HoodieTestTable}
import org.apache.hudi.common.testutils.RawTripTestPayload.recordsToStrings
import org.apache.hudi.common.testutils.{HoodieTestDataGenerator, HoodieTestTable}
import org.apache.hudi.config.{HoodieClusteringConfig, HoodieStorageConfig, HoodieWriteConfig}
import org.apache.hudi.exception.TableNotFoundException
import org.apache.hudi.testutils.HoodieClientTestBase
Expand Down Expand Up @@ -243,17 +243,24 @@ class TestStructuredStreaming extends HoodieClientTestBase {
val f2 = Future {
inputDF1.coalesce(1).write.mode(SaveMode.Append).json(sourcePath)
// wait for spark streaming to process one microbatch
val currNumCommits = waitTillAtleastNCommits(fs, destPath, 1, 120, 5)
var currNumCommits = waitTillAtleastNCommits(fs, destPath, 1, 120, 5)
assertTrue(HoodieDataSourceHelpers.hasNewCommits(fs, destPath, "000"))

inputDF2.coalesce(1).write.mode(SaveMode.Append).json(sourcePath)
// wait for spark streaming to process second microbatch
waitTillAtleastNCommits(fs, destPath, currNumCommits + 1, 120, 5)
assertEquals(2, HoodieDataSourceHelpers.listCommitsSince(fs, destPath, "000").size())

// check have more than one file group
this.metaClient = new HoodieTableMetaClient(fs.getConf, destPath, true)
assertTrue(getLatestFileGroupsFileId(partitionOfRecords).size > 1)
currNumCommits = waitTillAtleastNCommits(fs, destPath, currNumCommits + 1, 120, 5)
// for inline clustering, clustering may be complete along with 2nd commit
if (HoodieDataSourceHelpers.allCompletedCommitsCompactions(fs, destPath).getCompletedReplaceTimeline().countInstants() > 0) {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@lw309637554 It seems like this test has a race condition. PTAL and confirm my fix is reasonable.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's reasonable

assertEquals(3, HoodieDataSourceHelpers.listCommitsSince(fs, destPath, "000").size())
// check have at least one file group
this.metaClient = new HoodieTableMetaClient(fs.getConf, destPath, true)
assertTrue(getLatestFileGroupsFileId(partitionOfRecords).size > 0)
} else {
assertEquals(currNumCommits, HoodieDataSourceHelpers.listCommitsSince(fs, destPath, "000").size())
// check have more than one file group
this.metaClient = new HoodieTableMetaClient(fs.getConf, destPath, true)
assertTrue(getLatestFileGroupsFileId(partitionOfRecords).size > 1)
}

// check clustering result
checkClusteringResult(destPath)
Expand Down