Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ protected void runPrecommitValidators(HoodieWriteMetadata<O> writeMetadata) {
}
throw new HoodieIOException("Precommit validation not implemented for all engines yet");
}

protected void commitOnAutoCommit(HoodieWriteMetadata result) {
// validate commit action before committing result
runPrecommitValidators(result);
Expand Down Expand Up @@ -249,7 +249,7 @@ protected HoodieWriteMetadata<HoodieData<WriteStatus>> executeClustering(HoodieC
HoodieData<WriteStatus> statuses = updateIndex(writeStatusList, writeMetadata);
writeMetadata.setWriteStats(statuses.map(WriteStatus::getStat).collectAsList());
writeMetadata.setPartitionToReplaceFileIds(getPartitionToReplacedFileIds(clusteringPlan, writeMetadata));
validateWriteResult(clusteringPlan, writeMetadata);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's at least leave a comment (in this PR) that this line was dereferencing RDD 2d time writing out files twice

// if we don't cache the write statuses above, validation will call isEmpty which might retrigger the execution again.
commitOnAutoCommit(writeMetadata);
if (!writeMetadata.getCommitMetadata().isPresent()) {
HoodieCommitMetadata commitMetadata = CommitUtils.buildMetadata(writeMetadata.getWriteStats().get(), writeMetadata.getPartitionToReplaceFileIds(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

package org.apache.hudi.client;

import org.apache.hudi.avro.model.HoodieClusteringGroup;
import org.apache.hudi.avro.model.HoodieClusteringPlan;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.client.embedded.EmbeddedTimelineService;
import org.apache.hudi.client.utils.TransactionUtils;
Expand All @@ -37,7 +39,9 @@
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.ClusteringUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.data.HoodieJavaRDD;
import org.apache.hudi.exception.HoodieClusteringException;
Expand Down Expand Up @@ -356,6 +360,16 @@ public HoodieWriteMetadata<JavaRDD<WriteStatus>> cluster(String clusteringInstan
LOG.info("Starting clustering at " + clusteringInstant);
HoodieWriteMetadata<HoodieData<WriteStatus>> writeMetadata = table.cluster(context, clusteringInstant);
HoodieWriteMetadata<JavaRDD<WriteStatus>> clusteringMetadata = writeMetadata.clone(HoodieJavaRDD.getJavaRDD(writeMetadata.getWriteStatuses()));
if (clusteringMetadata.getWriteStatuses().isEmpty()) {
HoodieClusteringPlan clusteringPlan = ClusteringUtils.getClusteringPlan(
table.getMetaClient(), HoodieTimeline.getReplaceCommitRequestedInstant(clusteringInstant))
.map(Pair::getRight).orElseThrow(() -> new HoodieClusteringException(
"Unable to read clustering plan for instant: " + clusteringInstant));
throw new HoodieClusteringException("Clustering plan produced 0 WriteStatus for " + clusteringInstant
+ " #groups: " + clusteringPlan.getInputGroups().size() + " expected at least "
+ clusteringPlan.getInputGroups().stream().mapToInt(HoodieClusteringGroup::getNumOutputFileGroups).sum()
+ " write statuses");
}
// TODO : Where is shouldComplete used ?
if (shouldComplete && clusteringMetadata.getCommitMetadata().isPresent()) {
completeTableService(TableServiceType.CLUSTER, clusteringMetadata.getCommitMetadata().get(), table, clusteringInstant);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.HoodieReplaceCommitMetadata;
import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.model.IOType;
import org.apache.hudi.common.model.WriteOperationType;
Expand Down Expand Up @@ -106,6 +107,7 @@

import org.apache.avro.generic.GenericRecord;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
Expand Down Expand Up @@ -1456,6 +1458,41 @@ public void testSimpleClustering(boolean populateMetaFields, boolean preserveCom
testInsertAndClustering(clusteringConfig, populateMetaFields, true, false, SqlQueryEqualityPreCommitValidator.class.getName(), COUNT_SQL_QUERY_FOR_VALIDATION, "");
}

@Test
public void testAndValidateClusteringOutputFiles() throws IOException {
String partitionPath = "2015/03/16";
testInsertTwoBatches(true, partitionPath);

// Trigger clustering
HoodieWriteConfig.Builder cfgBuilder = getConfigBuilder().withEmbeddedTimelineServerEnabled(false).withAutoCommit(false)
.withClusteringConfig(HoodieClusteringConfig.newBuilder().withInlineClustering(true).withInlineClusteringNumCommits(2).build());
try (SparkRDDWriteClient client = getHoodieWriteClient(cfgBuilder.build())) {
int numRecords = 200;
String newCommitTime = HoodieActiveTimeline.createNewInstantTime();
List<HoodieRecord> records1 = dataGen.generateInserts(newCommitTime, numRecords);
client.startCommitWithTime(newCommitTime);
JavaRDD<HoodieRecord> insertRecordsRDD1 = jsc.parallelize(records1, 2);
JavaRDD<WriteStatus> statuses = client.insert(insertRecordsRDD1, newCommitTime);
client.commit(newCommitTime, statuses);
List<WriteStatus> statusList = statuses.collect();
assertNoWriteErrors(statusList);

metaClient = HoodieTableMetaClient.reload(metaClient);
HoodieInstant replaceCommitInstant = metaClient.getActiveTimeline().getCompletedReplaceTimeline().firstInstant().get();
HoodieReplaceCommitMetadata replaceCommitMetadata = HoodieReplaceCommitMetadata
.fromBytes(metaClient.getActiveTimeline().getInstantDetails(replaceCommitInstant).get(), HoodieReplaceCommitMetadata.class);

List<String> filesFromReplaceCommit = new ArrayList<>();
replaceCommitMetadata.getPartitionToWriteStats().forEach((k,v) -> v.forEach(entry -> filesFromReplaceCommit.add(entry.getPath())));

// find all parquet files created as part of clustering. Verify it matces w/ whats found in replace commit metadata.
FileStatus[] fileStatuses = fs.listStatus(new Path(basePath + "/" + partitionPath));
List<String> clusteredFiles = Arrays.stream(fileStatuses).filter(entry -> entry.getPath().getName().contains(replaceCommitInstant.getTimestamp()))
.map(fileStatus -> partitionPath + "/" + fileStatus.getPath().getName()).collect(Collectors.toList());
assertEquals(clusteredFiles, filesFromReplaceCommit);
}
}

@Test
public void testRolblackOfRegularCommitWithPendingReplaceCommitInTimeline() throws Exception {
HoodieClusteringConfig clusteringConfig = HoodieClusteringConfig.newBuilder().withClusteringMaxNumGroups(10)
Expand Down Expand Up @@ -1707,18 +1744,22 @@ private List<HoodieRecord> testInsertAndClustering(HoodieClusteringConfig cluste
return allRecords.getLeft().getLeft();
}

private Pair<Pair<List<HoodieRecord>, List<String>>, Set<HoodieFileGroupId>> testInsertTwoBatches(boolean populateMetaFields) throws IOException {
return testInsertTwoBatches(populateMetaFields, "2015/03/16");
}

/**
* This method returns following three items:
* 1. List of all HoodieRecord written in the two batches of insert.
* 2. Commit instants of the two batches.
* 3. List of new file group ids that were written in the two batches.
*/
private Pair<Pair<List<HoodieRecord>, List<String>>, Set<HoodieFileGroupId>> testInsertTwoBatches(boolean populateMetaFields) throws IOException {
private Pair<Pair<List<HoodieRecord>, List<String>>, Set<HoodieFileGroupId>> testInsertTwoBatches(boolean populateMetaFields, String partitionPath) throws IOException {
// create config to not update small files.
HoodieWriteConfig config = getSmallInsertWriteConfig(2000, TRIP_EXAMPLE_SCHEMA, 10, false, populateMetaFields,
populateMetaFields ? new Properties() : getPropertiesForKeyGen());
SparkRDDWriteClient client = getHoodieWriteClient(config);
dataGen = new HoodieTestDataGenerator(new String[] {"2015/03/16"});
dataGen = new HoodieTestDataGenerator(new String[] {partitionPath});
String commitTime1 = HoodieActiveTimeline.createNewInstantTime();
List<HoodieRecord> records1 = dataGen.generateInserts(commitTime1, 200);
List<WriteStatus> statuses1 = writeAndVerifyBatch(client, records1, commitTime1, populateMetaFields);
Expand Down