diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java index 8d42a31a5eaa1..9d89c345db065 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java @@ -860,6 +860,16 @@ public void rollbackFailedBootstrap() { } } + /** + * Some writers use SparkAllowUpdateStrategy and treat replacecommit plan as revocable plan. + * In those cases, their ConflictResolutionStrategy implementation should run conflict resolution + * even for clustering operations. + * @return boolean + */ + protected boolean isPreCommitRequired() { + return this.config.getWriteConflictResolutionStrategy().isPreCommitRequired(); + } + private Option delegateToTableServiceManager(TableServiceType tableServiceType, HoodieTable table) { if (!config.getTableServiceManagerConfig().isEnabledAndActionSupported(ActionType.compaction)) { return Option.empty(); diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/ConcurrentOperation.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/ConcurrentOperation.java index a83fee77eb7eb..2a393bc75c707 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/ConcurrentOperation.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/ConcurrentOperation.java @@ -22,6 +22,7 @@ import org.apache.hudi.client.utils.MetadataConversionUtils; import org.apache.hudi.common.model.HoodieCommitMetadata; import org.apache.hudi.common.model.HoodieMetadataWrapper; +import org.apache.hudi.common.model.HoodieReplaceCommitMetadata; import org.apache.hudi.common.model.WriteOperationType; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.timeline.HoodieInstant; @@ -31,6 +32,8 @@ import java.io.IOException; import java.util.Collections; +import java.util.List; +import java.util.Map; import java.util.Set; import java.util.stream.Collectors; @@ -118,6 +121,8 @@ private void init(HoodieInstant instant) { if (instant.isCompleted()) { this.mutatedPartitionAndFileIds = getPartitionAndFileIdWithoutSuffixFromSpecificRecord( this.metadataWrapper.getMetadataFromTimeline().getHoodieReplaceCommitMetadata().getPartitionToWriteStats()); + Map> partitionToReplaceFileIds = this.metadataWrapper.getMetadataFromTimeline().getHoodieReplaceCommitMetadata().getPartitionToReplaceFileIds(); + this.mutatedPartitionAndFileIds.addAll(CommitUtils.flattenPartitionToReplaceFileIds(partitionToReplaceFileIds)); this.operationType = WriteOperationType.fromValue(this.metadataWrapper.getMetadataFromTimeline().getHoodieReplaceCommitMetadata().getOperationType()); } else { // we need to have different handling for requested and inflight replacecommit because @@ -158,6 +163,10 @@ private void init(HoodieInstant instant) { case LOG_COMPACTION_ACTION: this.mutatedPartitionAndFileIds = CommitUtils.getPartitionAndFileIdWithoutSuffix(this.metadataWrapper.getCommitMetadata().getPartitionToWriteStats()); this.operationType = this.metadataWrapper.getCommitMetadata().getOperationType(); + if (this.operationType.equals(WriteOperationType.CLUSTER) || WriteOperationType.isOverwrite(this.operationType)) { + HoodieReplaceCommitMetadata replaceCommitMetadata = (HoodieReplaceCommitMetadata) this.metadataWrapper.getCommitMetadata(); + mutatedPartitionAndFileIds.addAll(CommitUtils.flattenPartitionToReplaceFileIds(replaceCommitMetadata.getPartitionToReplaceFileIds())); + } break; default: throw new IllegalArgumentException("Unsupported Action Type " + getInstantActionType()); diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/ConflictResolutionStrategy.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/ConflictResolutionStrategy.java index d1e988adb59ae..1deba86fe1df4 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/ConflictResolutionStrategy.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/ConflictResolutionStrategy.java @@ -21,7 +21,7 @@ import org.apache.hudi.ApiMaturityLevel; import org.apache.hudi.PublicAPIMethod; import org.apache.hudi.common.model.HoodieCommitMetadata; -import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; +import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.util.Option; import org.apache.hudi.exception.HoodieWriteConflictException; @@ -40,7 +40,7 @@ public interface ConflictResolutionStrategy { * Stream of instants to check conflicts against. * @return */ - Stream getCandidateInstants(HoodieActiveTimeline activeTimeline, HoodieInstant currentInstant, Option lastSuccessfulInstant); + Stream getCandidateInstants(HoodieTableMetaClient metaClient, HoodieInstant currentInstant, Option lastSuccessfulInstant); /** * Implementations of this method will determine whether a conflict exists between 2 commits. @@ -61,4 +61,11 @@ public interface ConflictResolutionStrategy { Option resolveConflict(HoodieTable table, ConcurrentOperation thisOperation, ConcurrentOperation otherOperation) throws HoodieWriteConflictException; + /** + * Write clients uses their preCommit API to run conflict resolution. + * This method determines whether to execute preCommit for table services like clustering. + * @return boolean + */ + @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING) + boolean isPreCommitRequired(); } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/IngestionPrimaryWriterBasedConflictResolutionStrategy.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/IngestionPrimaryWriterBasedConflictResolutionStrategy.java new file mode 100644 index 0000000000000..1c71be9f70bda --- /dev/null +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/IngestionPrimaryWriterBasedConflictResolutionStrategy.java @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.client.transaction; + +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; +import org.apache.hudi.common.table.timeline.HoodieInstant; +import org.apache.hudi.common.util.ClusteringUtils; +import org.apache.hudi.common.util.CollectionUtils; +import org.apache.hudi.common.util.Option; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Comparator; +import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import static org.apache.hudi.common.table.timeline.HoodieTimeline.COMMIT_ACTION; +import static org.apache.hudi.common.table.timeline.HoodieTimeline.COMPACTION_ACTION; +import static org.apache.hudi.common.table.timeline.HoodieTimeline.DELTA_COMMIT_ACTION; +import static org.apache.hudi.common.table.timeline.HoodieTimeline.REPLACE_COMMIT_ACTION; + +/** + * This class extends the base implementation of conflict resolution strategy. + * It gives preference to ingestion writers compared to table services. + */ +public class IngestionPrimaryWriterBasedConflictResolutionStrategy + extends SimpleConcurrentFileWritesConflictResolutionStrategy { + + private static final Logger LOG = LoggerFactory.getLogger(IngestionPrimaryWriterBasedConflictResolutionStrategy.class); + + /** + * For tableservices like replacecommit and compaction commits this method also returns ingestion inflight commits. + */ + @Override + public Stream getCandidateInstants(HoodieTableMetaClient metaClient, HoodieInstant currentInstant, + Option lastSuccessfulInstant) { + HoodieActiveTimeline activeTimeline = metaClient.reloadActiveTimeline(); + if ((REPLACE_COMMIT_ACTION.equals(currentInstant.getAction()) + && ClusteringUtils.isClusteringCommit(metaClient, currentInstant)) + || COMPACTION_ACTION.equals(currentInstant.getAction())) { + return getCandidateInstantsForTableServicesCommits(activeTimeline, currentInstant); + } else { + return getCandidateInstantsForNonTableServicesCommits(activeTimeline, currentInstant); + } + } + + private Stream getCandidateInstantsForNonTableServicesCommits(HoodieActiveTimeline activeTimeline, HoodieInstant currentInstant) { + + // To findout which instants are conflicting, we apply the following logic + // Get all the completed instants timeline only for commits that have happened + // since the last successful write based on the transition times. + // We need to check for write conflicts since they may have mutated the same files + // that are being newly created by the current write. + List completedCommitsInstants = activeTimeline + .getTimelineOfActions(CollectionUtils.createSet(COMMIT_ACTION, REPLACE_COMMIT_ACTION, COMPACTION_ACTION, DELTA_COMMIT_ACTION)) + .filterCompletedInstants() + .findInstantsModifiedAfterByStateTransitionTime(currentInstant.getTimestamp()) + .getInstantsOrderedByStateTransitionTime() + .collect(Collectors.toList()); + LOG.info(String.format("Instants that may have conflict with %s are %s", currentInstant, completedCommitsInstants)); + return completedCommitsInstants.stream(); + } + + /** + * To find which instants are conflicting, we apply the following logic + * Get both completed instants and ingestion inflight commits that have happened since the last successful write. + * We need to check for write conflicts since they may have mutated the same files + * that are being newly created by the current write. + */ + private Stream getCandidateInstantsForTableServicesCommits(HoodieActiveTimeline activeTimeline, HoodieInstant currentInstant) { + // Fetch list of completed commits. + Stream completedCommitsStream = + activeTimeline + .getTimelineOfActions(CollectionUtils.createSet(COMMIT_ACTION, REPLACE_COMMIT_ACTION, COMPACTION_ACTION, DELTA_COMMIT_ACTION)) + .filterCompletedInstants() + .findInstantsModifiedAfterByStateTransitionTime(currentInstant.getTimestamp()) + .getInstantsAsStream(); + + // Fetch list of ingestion inflight commits. + Stream inflightIngestionCommitsStream = + activeTimeline + .getTimelineOfActions(CollectionUtils.createSet(COMMIT_ACTION, DELTA_COMMIT_ACTION)) + .filterInflights() + .getInstantsAsStream(); + + // Merge and sort the instants and return. + List instantsToConsider = Stream.concat(completedCommitsStream, inflightIngestionCommitsStream) + .sorted(Comparator.comparing(o -> o.getStateTransitionTime())) + .collect(Collectors.toList()); + LOG.info(String.format("Instants that may have conflict with %s are %s", currentInstant, instantsToConsider)); + return instantsToConsider.stream(); + } + + @Override + public boolean isPreCommitRequired() { + return true; + } +} diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/SimpleConcurrentFileWritesConflictResolutionStrategy.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/SimpleConcurrentFileWritesConflictResolutionStrategy.java index ca42c9ed05e6a..ce16e14af22b4 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/SimpleConcurrentFileWritesConflictResolutionStrategy.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/SimpleConcurrentFileWritesConflictResolutionStrategy.java @@ -20,6 +20,7 @@ import org.apache.hudi.common.model.HoodieCommitMetadata; import org.apache.hudi.common.model.WriteOperationType; +import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieTimeline; @@ -49,9 +50,9 @@ public class SimpleConcurrentFileWritesConflictResolutionStrategy private static final Logger LOG = LoggerFactory.getLogger(SimpleConcurrentFileWritesConflictResolutionStrategy.class); @Override - public Stream getCandidateInstants(HoodieActiveTimeline activeTimeline, HoodieInstant currentInstant, - Option lastSuccessfulInstant) { - + public Stream getCandidateInstants(HoodieTableMetaClient metaClient, HoodieInstant currentInstant, + Option lastSuccessfulInstant) { + HoodieActiveTimeline activeTimeline = metaClient.getActiveTimeline(); // To find which instants are conflicting, we apply the following logic // 1. Get completed instants timeline only for commits that have happened since the last successful write. // 2. Get any scheduled or completed compaction or clustering operations that have started and/or finished @@ -110,4 +111,9 @@ public Option resolveConflict(HoodieTable table, throw new HoodieWriteConflictException(new ConcurrentModificationException("Cannot resolve conflicts for overlapping writes")); } + @Override + public boolean isPreCommitRequired() { + return false; + } + } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/TransactionUtils.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/TransactionUtils.java index 15f0665187706..d162fe28a62b0 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/TransactionUtils.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/TransactionUtils.java @@ -72,9 +72,13 @@ public static Option resolveWriteConflictIfAny( Stream completedInstantsDuringCurrentWriteOperation = getCompletedInstantsDuringCurrentWriteOperation(table.getMetaClient(), pendingInstants); ConflictResolutionStrategy resolutionStrategy = config.getWriteConflictResolutionStrategy(); - Stream instantStream = Stream.concat(resolutionStrategy.getCandidateInstants(reloadActiveTimeline - ? table.getMetaClient().reloadActiveTimeline() : table.getActiveTimeline(), currentTxnOwnerInstant.get(), lastCompletedTxnOwnerInstant), + if (reloadActiveTimeline) { + table.getMetaClient().reloadActiveTimeline(); + } + Stream instantStream = Stream.concat(resolutionStrategy.getCandidateInstants( + table.getMetaClient(), currentTxnOwnerInstant.get(), lastCompletedTxnOwnerInstant), completedInstantsDuringCurrentWriteOperation); + final ConcurrentOperation thisOperation = new ConcurrentOperation(currentTxnOwnerInstant.get(), thisCommitMetadata.orElse(new HoodieCommitMetadata())); instantStream.forEach(instant -> { try { @@ -105,6 +109,10 @@ public static Option>> getLastCompletedT HoodieTableMetaClient metaClient) { Option hoodieInstantOption = metaClient.getActiveTimeline().getCommitsTimeline() .filterCompletedInstants().lastInstant(); + return getHoodieInstantAndMetaDataPair(metaClient, hoodieInstantOption); + } + + private static Option>> getHoodieInstantAndMetaDataPair(HoodieTableMetaClient metaClient, Option hoodieInstantOption) { try { if (hoodieInstantOption.isPresent()) { HoodieCommitMetadata commitMetadata = TimelineUtils.getCommitMetadata(hoodieInstantOption.get(), metaClient.getActiveTimeline()); diff --git a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestBucketIndexConcurrentFileWritesConflictResolutionStrategy.java b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestBucketIndexConcurrentFileWritesConflictResolutionStrategy.java index df5b03ec7dd45..796950b9b7160 100644 --- a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestBucketIndexConcurrentFileWritesConflictResolutionStrategy.java +++ b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestBucketIndexConcurrentFileWritesConflictResolutionStrategy.java @@ -58,7 +58,7 @@ public void testNoConcurrentWrites() throws Exception { Option currentInstant = Option.of(new HoodieInstant(State.INFLIGHT, HoodieTimeline.COMMIT_ACTION, newInstantTime)); SimpleConcurrentFileWritesConflictResolutionStrategy strategy = new BucketIndexConcurrentFileWritesConflictResolutionStrategy(); - Stream candidateInstants = strategy.getCandidateInstants(metaClient.getActiveTimeline(), currentInstant.get(), lastSuccessfulInstant); + Stream candidateInstants = strategy.getCandidateInstants(metaClient, currentInstant.get(), lastSuccessfulInstant); Assertions.assertEquals(0, candidateInstants.count()); } @@ -75,7 +75,7 @@ public void testConcurrentWrites() throws Exception { newInstantTime = HoodieTestTable.makeNewCommitTime(); Option currentInstant = Option.of(new HoodieInstant(State.INFLIGHT, HoodieTimeline.COMMIT_ACTION, newInstantTime)); SimpleConcurrentFileWritesConflictResolutionStrategy strategy = new BucketIndexConcurrentFileWritesConflictResolutionStrategy(); - Stream candidateInstants = strategy.getCandidateInstants(metaClient.getActiveTimeline(), currentInstant.get(), lastSuccessfulInstant); + Stream candidateInstants = strategy.getCandidateInstants(metaClient, currentInstant.get(), lastSuccessfulInstant); Assertions.assertEquals(0, candidateInstants.count()); } @@ -95,8 +95,8 @@ public void testConcurrentWritesWithInterleavingSuccessfulCommit() throws Except Option currentInstant = Option.of(new HoodieInstant(State.INFLIGHT, HoodieTimeline.COMMIT_ACTION, currentWriterInstant)); SimpleConcurrentFileWritesConflictResolutionStrategy strategy = new BucketIndexConcurrentFileWritesConflictResolutionStrategy(); HoodieCommitMetadata currentMetadata = createCommitMetadata(currentWriterInstant, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH); - timeline = timeline.reload(); - List candidateInstants = strategy.getCandidateInstants(timeline, currentInstant.get(), lastSuccessfulInstant).collect( + metaClient.reloadActiveTimeline(); + List candidateInstants = strategy.getCandidateInstants(metaClient, currentInstant.get(), lastSuccessfulInstant).collect( Collectors.toList()); // writer 1 conflicts with writer 2 Assertions.assertEquals(1, candidateInstants.size()); @@ -127,8 +127,8 @@ public void testConcurrentWritesWithDifferentPartition() throws Exception { Option currentInstant = Option.of(new HoodieInstant(State.INFLIGHT, HoodieTimeline.COMMIT_ACTION, currentWriterInstant)); SimpleConcurrentFileWritesConflictResolutionStrategy strategy = new BucketIndexConcurrentFileWritesConflictResolutionStrategy(); HoodieCommitMetadata currentMetadata = createCommitMetadata(currentWriterInstant, HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH); - timeline = timeline.reload(); - List candidateInstants = strategy.getCandidateInstants(timeline, currentInstant.get(), lastSuccessfulInstant).collect( + metaClient.reloadActiveTimeline(); + List candidateInstants = strategy.getCandidateInstants(metaClient, currentInstant.get(), lastSuccessfulInstant).collect( Collectors.toList()); // there should be 1 candidate instant diff --git a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestConflictResolutionStrategyUtil.java b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestConflictResolutionStrategyUtil.java new file mode 100644 index 0000000000000..c11a29aa4f60c --- /dev/null +++ b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestConflictResolutionStrategyUtil.java @@ -0,0 +1,285 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.client.transaction; + +import org.apache.hudi.avro.model.HoodieClusteringGroup; +import org.apache.hudi.avro.model.HoodieClusteringPlan; +import org.apache.hudi.avro.model.HoodieCompactionOperation; +import org.apache.hudi.avro.model.HoodieCompactionPlan; +import org.apache.hudi.avro.model.HoodieRequestedReplaceMetadata; +import org.apache.hudi.avro.model.HoodieSliceInfo; +import org.apache.hudi.common.model.HoodieCommitMetadata; +import org.apache.hudi.common.model.HoodieReplaceCommitMetadata; +import org.apache.hudi.common.model.HoodieWriteStat; +import org.apache.hudi.common.model.WriteOperationType; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion; +import org.apache.hudi.common.testutils.FileCreateUtils; +import org.apache.hudi.common.testutils.HoodieTestDataGenerator; +import org.apache.hudi.common.testutils.HoodieTestTable; +import org.apache.hudi.common.util.Option; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class TestConflictResolutionStrategyUtil { + + public static void createCommit(String instantTime, HoodieTableMetaClient metaClient) throws Exception { + String fileId1 = "file-1"; + String fileId2 = "file-2"; + + HoodieCommitMetadata commitMetadata = new HoodieCommitMetadata(); + commitMetadata.addMetadata("test", "test"); + HoodieWriteStat writeStat = new HoodieWriteStat(); + writeStat.setFileId("file-1"); + commitMetadata.addWriteStat(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, writeStat); + commitMetadata.setOperationType(WriteOperationType.INSERT); + HoodieTestTable.of(metaClient) + .addCommit(instantTime, Option.of(commitMetadata)) + .withBaseFilesInPartition(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, fileId1, fileId2); + } + + public static HoodieCommitMetadata createCommitMetadata(String instantTime, String writeFileName) { + HoodieCommitMetadata commitMetadata = new HoodieCommitMetadata(); + commitMetadata.addMetadata("test", "test"); + HoodieWriteStat writeStat = new HoodieWriteStat(); + writeStat.setFileId(writeFileName); + commitMetadata.addWriteStat(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, writeStat); + commitMetadata.setOperationType(WriteOperationType.INSERT); + return commitMetadata; + } + + public static HoodieCommitMetadata createCommitMetadata(String instantTime) { + return createCommitMetadata(instantTime, "file-1"); + } + + public static void createInflightCommit(String instantTime, HoodieTableMetaClient metaClient) throws Exception { + String fileId1 = "file-" + instantTime + "-1"; + String fileId2 = "file-" + instantTime + "-2"; + HoodieTestTable.of(metaClient) + .addInflightCommit(instantTime) + .withBaseFilesInPartition(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, fileId1, fileId2); + } + + public static void createCompactionRequested(String instantTime, HoodieTableMetaClient metaClient) throws Exception { + String fileId1 = "file-1"; + HoodieCompactionPlan compactionPlan = new HoodieCompactionPlan(); + compactionPlan.setVersion(TimelineLayoutVersion.CURR_VERSION); + HoodieCompactionOperation operation = new HoodieCompactionOperation(); + operation.setFileId(fileId1); + operation.setPartitionPath(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH); + operation.setDataFilePath("/file-1"); + operation.setDeltaFilePaths(Arrays.asList("/file-1")); + compactionPlan.setOperations(Arrays.asList(operation)); + HoodieTestTable.of(metaClient) + .addRequestedCompaction(instantTime, compactionPlan); + } + + public static void createCompaction(String instantTime, HoodieTableMetaClient metaClient) throws Exception { + String fileId1 = "file-1"; + String fileId2 = "file-2"; + + HoodieCommitMetadata commitMetadata = new HoodieCommitMetadata(); + commitMetadata.addMetadata("test", "test"); + commitMetadata.setOperationType(WriteOperationType.COMPACT); + commitMetadata.setCompacted(true); + HoodieWriteStat writeStat = new HoodieWriteStat(); + writeStat.setFileId("file-1"); + commitMetadata.addWriteStat(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, writeStat); + HoodieTestTable.of(metaClient) + .addCommit(instantTime, Option.of(commitMetadata)) + .withBaseFilesInPartition(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, fileId1, fileId2); + } + + public static void createReplaceRequested(String instantTime, HoodieTableMetaClient metaClient) throws Exception { + String fileId1 = "file-1"; + String fileId2 = "file-2"; + + // create replace instant to mark fileId1 as deleted + HoodieRequestedReplaceMetadata requestedReplaceMetadata = new HoodieRequestedReplaceMetadata(); + requestedReplaceMetadata.setOperationType(WriteOperationType.CLUSTER.name()); + HoodieClusteringPlan clusteringPlan = new HoodieClusteringPlan(); + HoodieClusteringGroup clusteringGroup = new HoodieClusteringGroup(); + HoodieSliceInfo sliceInfo = new HoodieSliceInfo(); + sliceInfo.setFileId(fileId1); + sliceInfo.setPartitionPath(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH); + clusteringGroup.setSlices(Arrays.asList(sliceInfo)); + clusteringPlan.setInputGroups(Arrays.asList(clusteringGroup)); + requestedReplaceMetadata.setClusteringPlan(clusteringPlan); + requestedReplaceMetadata.setVersion(TimelineLayoutVersion.CURR_VERSION); + HoodieTestTable.of(metaClient) + .addRequestedReplace(instantTime, Option.of(requestedReplaceMetadata)) + .withBaseFilesInPartition(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, fileId1, fileId2); + } + + public static void createReplaceInflight(String instantTime, HoodieTableMetaClient metaClient) throws Exception { + String fileId1 = "file-1"; + String fileId2 = "file-2"; + + HoodieCommitMetadata inflightReplaceMetadata = new HoodieCommitMetadata(); + inflightReplaceMetadata.setOperationType(WriteOperationType.INSERT_OVERWRITE); + HoodieWriteStat writeStat = new HoodieWriteStat(); + writeStat.setFileId("file-1"); + inflightReplaceMetadata.addWriteStat(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, writeStat); + HoodieTestTable.of(metaClient) + .addInflightReplace(instantTime, Option.of(inflightReplaceMetadata)) + .withBaseFilesInPartition(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, fileId1, fileId2); + } + + public static void createReplace(String instantTime, WriteOperationType writeOperationType, HoodieTableMetaClient metaClient) throws Exception { + String fileId1 = "file-1"; + String fileId2 = "file-2"; + + // create replace instant to mark fileId1 as deleted + HoodieReplaceCommitMetadata replaceMetadata = new HoodieReplaceCommitMetadata(); + Map> partitionFileIds = new HashMap<>(); + partitionFileIds.put(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, Arrays.asList(fileId2)); + replaceMetadata.setPartitionToReplaceFileIds(partitionFileIds); + HoodieWriteStat writeStat = new HoodieWriteStat(); + writeStat.setFileId("file-1"); + replaceMetadata.addWriteStat(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, writeStat); + replaceMetadata.setOperationType(writeOperationType); + // create replace instant to mark fileId1 as deleted + HoodieRequestedReplaceMetadata requestedReplaceMetadata = new HoodieRequestedReplaceMetadata(); + requestedReplaceMetadata.setOperationType(WriteOperationType.CLUSTER.name()); + HoodieClusteringPlan clusteringPlan = new HoodieClusteringPlan(); + HoodieClusteringGroup clusteringGroup = new HoodieClusteringGroup(); + HoodieSliceInfo sliceInfo = new HoodieSliceInfo(); + sliceInfo.setFileId(fileId1); + sliceInfo.setPartitionPath(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH); + clusteringGroup.setSlices(Arrays.asList(sliceInfo)); + clusteringPlan.setInputGroups(Arrays.asList(clusteringGroup)); + requestedReplaceMetadata.setClusteringPlan(clusteringPlan); + requestedReplaceMetadata.setVersion(TimelineLayoutVersion.CURR_VERSION); + HoodieTestTable.of(metaClient) + .addReplaceCommit(instantTime, Option.of(requestedReplaceMetadata), Option.empty(), replaceMetadata) + .withBaseFilesInPartition(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, fileId1, fileId2); + } + + public static void createPendingReplace(String instantTime, WriteOperationType writeOperationType, HoodieTableMetaClient metaClient) throws Exception { + String fileId1 = "file-1"; + String fileId2 = "file-2"; + // create replace instant to mark fileId2 as deleted + HoodieRequestedReplaceMetadata requestedReplaceMetadata = new HoodieRequestedReplaceMetadata(); + requestedReplaceMetadata.setOperationType(WriteOperationType.CLUSTER.name()); + HoodieClusteringPlan clusteringPlan = new HoodieClusteringPlan(); + HoodieClusteringGroup clusteringGroup = new HoodieClusteringGroup(); + HoodieSliceInfo sliceInfo = new HoodieSliceInfo(); + sliceInfo.setFileId(fileId2); + sliceInfo.setPartitionPath(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH); + clusteringGroup.setSlices(Arrays.asList(sliceInfo)); + clusteringPlan.setInputGroups(Arrays.asList(clusteringGroup)); + requestedReplaceMetadata.setClusteringPlan(clusteringPlan); + requestedReplaceMetadata.setVersion(TimelineLayoutVersion.CURR_VERSION); + HoodieTestTable.of(metaClient) + .addPendingReplace(instantTime, Option.of(requestedReplaceMetadata), Option.empty()) + .withBaseFilesInPartition(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, fileId1, fileId2); + } + + public static void createCompleteReplace(String instantTime, WriteOperationType writeOperationType, HoodieTableMetaClient metaClient) throws Exception { + String fileId1 = "file-1"; + String fileId2 = "file-2"; + + // create replace instant to mark fileId2 as deleted + HoodieReplaceCommitMetadata replaceMetadata = new HoodieReplaceCommitMetadata(); + Map> partitionFileIds = new HashMap<>(); + partitionFileIds.put(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, Arrays.asList(fileId2)); + replaceMetadata.setPartitionToReplaceFileIds(partitionFileIds); + HoodieWriteStat writeStat = new HoodieWriteStat(); + writeStat.setFileId("file-2"); + replaceMetadata.addWriteStat(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, writeStat); + replaceMetadata.setOperationType(writeOperationType); + FileCreateUtils.createReplaceCommit(metaClient.getBasePath(), instantTime, replaceMetadata); + } + + public static void createPendingCompaction(String instantTime, HoodieTableMetaClient metaClient) throws Exception { + String fileId1 = "file-2"; + HoodieCompactionPlan compactionPlan = new HoodieCompactionPlan(); + compactionPlan.setVersion(TimelineLayoutVersion.CURR_VERSION); + HoodieCompactionOperation operation = new HoodieCompactionOperation(); + operation.setFileId(fileId1); + operation.setPartitionPath(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH); + operation.setDataFilePath("/file-2"); + operation.setDeltaFilePaths(Arrays.asList("/file-2")); + compactionPlan.setOperations(Arrays.asList(operation)); + HoodieTestTable.of(metaClient) + .addRequestedCompaction(instantTime, compactionPlan); + FileCreateUtils.createPendingInflightCompaction(metaClient.getBasePath(), instantTime); + } + + public static void createCompleteCompaction(String instantTime, HoodieTableMetaClient metaClient) throws Exception { + String fileId1 = "file-1"; + String fileId2 = "file-2"; + + HoodieCommitMetadata commitMetadata = new HoodieCommitMetadata(); + commitMetadata.addMetadata("test", "test"); + commitMetadata.setOperationType(WriteOperationType.COMPACT); + commitMetadata.setCompacted(true); + HoodieWriteStat writeStat = new HoodieWriteStat(); + writeStat.setFileId("file-2"); + commitMetadata.addWriteStat(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, writeStat); + HoodieTestTable.of(metaClient) + .addCommit(instantTime, Option.of(commitMetadata)) + .withBaseFilesInPartition(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, fileId1, fileId2); + } + + public static void createRequestedCommit(String instantTime, HoodieTableMetaClient metaClient) throws Exception { + HoodieTestTable.of(metaClient) + .addInflightCommit(instantTime); + } + + public static void createCompleteCommit(String instantTime, HoodieTableMetaClient metaClient) throws Exception { + String fileId1 = "file-1"; + String fileId2 = "file-2"; + + HoodieCommitMetadata commitMetadata = new HoodieCommitMetadata(); + commitMetadata.addMetadata("test", "test"); + HoodieWriteStat writeStat = new HoodieWriteStat(); + writeStat.setFileId("file-2"); + commitMetadata.addWriteStat(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, writeStat); + commitMetadata.setOperationType(WriteOperationType.INSERT); + HoodieTestTable.of(metaClient) + .addCommit(instantTime, Option.of(commitMetadata)) + .withBaseFilesInPartition(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, fileId1, fileId2); + } + + public static void createReplaceInflight(String instantTime, WriteOperationType writeOperationType, HoodieTableMetaClient metaClient) throws Exception { + Option inflightReplaceMetadata = Option.empty(); + if (WriteOperationType.INSERT_OVERWRITE.equals(writeOperationType)) { + inflightReplaceMetadata = Option.of(createReplaceCommitMetadata(WriteOperationType.INSERT_OVERWRITE)); + } + HoodieTestTable.of(metaClient) + .addInflightReplace(instantTime, inflightReplaceMetadata); + } + + private static HoodieReplaceCommitMetadata createReplaceCommitMetadata(WriteOperationType writeOperationType) { + String fileId1 = "file-1"; + HoodieReplaceCommitMetadata replaceMetadata = new HoodieReplaceCommitMetadata(); + Map> partitionFileIds = new HashMap<>(); + partitionFileIds.put(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, Arrays.asList(fileId1)); + replaceMetadata.setPartitionToReplaceFileIds(partitionFileIds); + HoodieWriteStat writeStat = new HoodieWriteStat(); + writeStat.setFileId("file-2"); + replaceMetadata.addWriteStat(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, writeStat); + replaceMetadata.setOperationType(writeOperationType); + return replaceMetadata; + } +} diff --git a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestIngestionPrimaryWriterBasedConflictResolutionStrategy.java b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestIngestionPrimaryWriterBasedConflictResolutionStrategy.java new file mode 100644 index 0000000000000..966da46690e2f --- /dev/null +++ b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestIngestionPrimaryWriterBasedConflictResolutionStrategy.java @@ -0,0 +1,260 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.client.transaction; + +import org.apache.hudi.common.model.HoodieCommitMetadata; +import org.apache.hudi.common.model.WriteOperationType; +import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; +import org.apache.hudi.common.table.timeline.HoodieInstant; +import org.apache.hudi.common.table.timeline.HoodieTimeline; +import org.apache.hudi.common.testutils.HoodieCommonTestHarness; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.exception.HoodieWriteConflictException; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.io.IOException; +import java.util.List; +import java.util.stream.Collectors; + +import static org.apache.hudi.client.transaction.TestConflictResolutionStrategyUtil.createCommit; +import static org.apache.hudi.client.transaction.TestConflictResolutionStrategyUtil.createCommitMetadata; +import static org.apache.hudi.client.transaction.TestConflictResolutionStrategyUtil.createCompaction; +import static org.apache.hudi.client.transaction.TestConflictResolutionStrategyUtil.createCompactionRequested; +import static org.apache.hudi.client.transaction.TestConflictResolutionStrategyUtil.createInflightCommit; +import static org.apache.hudi.client.transaction.TestConflictResolutionStrategyUtil.createReplace; +import static org.apache.hudi.client.transaction.TestConflictResolutionStrategyUtil.createReplaceInflight; +import static org.apache.hudi.client.transaction.TestConflictResolutionStrategyUtil.createReplaceRequested; + +public class TestIngestionPrimaryWriterBasedConflictResolutionStrategy extends HoodieCommonTestHarness { + + @BeforeEach + public void init() throws IOException { + initMetaClient(); + } + + @Test + public void testConcurrentWritesWithInterleavingScheduledCompaction() throws Exception { + createCommit(HoodieActiveTimeline.createNewInstantTime(), metaClient); + HoodieActiveTimeline timeline = metaClient.getActiveTimeline(); + // consider commits before this are all successful + Option lastSuccessfulInstant = timeline.getCommitsTimeline().filterCompletedInstants().lastInstant(); + // writer 1 starts + String currentWriterInstant = HoodieActiveTimeline.createNewInstantTime(); + createInflightCommit(currentWriterInstant, metaClient); + // compaction 1 gets scheduled + String newInstantTime = HoodieActiveTimeline.createNewInstantTime(); + createCompactionRequested(newInstantTime, metaClient); + + Option currentInstant = Option.of(new HoodieInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.COMMIT_ACTION, currentWriterInstant)); + IngestionPrimaryWriterBasedConflictResolutionStrategy strategy = new IngestionPrimaryWriterBasedConflictResolutionStrategy(); + List candidateInstants = strategy.getCandidateInstants(metaClient, currentInstant.get(), lastSuccessfulInstant).collect( + Collectors.toList()); + // writer 1 does not have a conflict with scheduled compaction plan 1 + // Since, scheduled compaction plan is given lower priority compared ingestion commit. + Assertions.assertEquals(0, candidateInstants.size()); + } + + @Test + public void testConcurrentWritesWithInterleavingSuccessfulCompaction() throws Exception { + createCommit(HoodieActiveTimeline.createNewInstantTime(), metaClient); + HoodieActiveTimeline timeline = metaClient.getActiveTimeline(); + // consider commits before this are all successful + Option lastSuccessfulInstant = timeline.getCommitsTimeline().filterCompletedInstants().lastInstant(); + // writer 1 starts + String currentWriterInstant = HoodieActiveTimeline.createNewInstantTime(); + createInflightCommit(currentWriterInstant, metaClient); + // compaction 1 gets scheduled and finishes + String newInstantTime = HoodieActiveTimeline.createNewInstantTime(); + // TODO: Remove sleep stmt once the modified times issue is fixed. + // Sleep thread for atleast 1sec for consecutive commits that way they do not have two commits modified times falls on the same millisecond. + Thread.sleep(1000); + createCompaction(newInstantTime, metaClient); + + Option currentInstant = Option.of(new HoodieInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.COMMIT_ACTION, currentWriterInstant)); + IngestionPrimaryWriterBasedConflictResolutionStrategy strategy = new IngestionPrimaryWriterBasedConflictResolutionStrategy(); + HoodieCommitMetadata currentMetadata = createCommitMetadata(currentWriterInstant); + List candidateInstants = strategy.getCandidateInstants(metaClient, currentInstant.get(), lastSuccessfulInstant).collect( + Collectors.toList()); + // writer 1 conflicts with compaction 1 + Assertions.assertEquals(1, candidateInstants.size()); + Assertions.assertEquals(newInstantTime, candidateInstants.get(0).getTimestamp()); + ConcurrentOperation thatCommitOperation = new ConcurrentOperation(candidateInstants.get(0), metaClient); + ConcurrentOperation thisCommitOperation = new ConcurrentOperation(currentInstant.get(), currentMetadata); + Assertions.assertTrue(strategy.hasConflict(thisCommitOperation, thatCommitOperation)); + try { + strategy.resolveConflict(null, thisCommitOperation, thatCommitOperation); + Assertions.fail("Cannot reach here, should have thrown a conflict"); + } catch (HoodieWriteConflictException e) { + // expected + } + } + + @Test + public void testConcurrentWritesWithInterleavingCompaction() throws Exception { + createCommit(HoodieActiveTimeline.createNewInstantTime(), metaClient); + HoodieActiveTimeline timeline = metaClient.getActiveTimeline(); + // consider commits before this are all successful + Option lastSuccessfulInstant = timeline.getCommitsTimeline().filterCompletedInstants().lastInstant(); + // writer 1 starts + String currentWriterInstant = HoodieActiveTimeline.createNewInstantTime(); + createInflightCommit(currentWriterInstant, metaClient); + // compaction 1 gets scheduled and finishes + String newInstantTime = HoodieActiveTimeline.createNewInstantTime(); + createCompactionRequested(newInstantTime, metaClient); + + Option currentInstant = Option.of(new HoodieInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.COMPACTION_ACTION, newInstantTime)); + IngestionPrimaryWriterBasedConflictResolutionStrategy strategy = new IngestionPrimaryWriterBasedConflictResolutionStrategy(); + // TODO Create method to create compactCommitMetadata + // HoodieCommitMetadata currentMetadata = createCommitMetadata(newInstantTime); + List candidateInstants = strategy.getCandidateInstants(metaClient, currentInstant.get(), lastSuccessfulInstant).collect( + Collectors.toList()); + // writer 1 conflicts with compaction 1 + Assertions.assertEquals(1, candidateInstants.size()); + Assertions.assertEquals(currentWriterInstant, candidateInstants.get(0).getTimestamp()); + // TODO: Once compactCommitMetadata is created use that to verify resolveConflict method. + } + + /** + * This method is verifying if a conflict exists for already commit compaction commit with current running ingestion commit. + */ + @Test + public void testConcurrentWriteAndCompactionScheduledEarlier() throws Exception { + createCommit(HoodieActiveTimeline.createNewInstantTime(), metaClient); + // consider commits before this are all successful + // compaction 1 gets scheduled + String newInstantTime = HoodieActiveTimeline.createNewInstantTime(); + createCompaction(newInstantTime, metaClient); + HoodieActiveTimeline timeline = metaClient.getActiveTimeline(); + Option lastSuccessfulInstant = timeline.getCommitsTimeline().filterCompletedInstants().lastInstant(); + // writer 1 starts + String currentWriterInstant = HoodieActiveTimeline.createNewInstantTime(); + createInflightCommit(currentWriterInstant, metaClient); + + Option currentInstant = Option.of(new HoodieInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.COMMIT_ACTION, currentWriterInstant)); + IngestionPrimaryWriterBasedConflictResolutionStrategy strategy = new IngestionPrimaryWriterBasedConflictResolutionStrategy(); + List candidateInstants = strategy.getCandidateInstants(metaClient, currentInstant.get(), lastSuccessfulInstant).collect( + Collectors.toList()); + // writer 1 should not conflict with an earlier scheduled compaction 1 with the same file ids + Assertions.assertEquals(0, candidateInstants.size()); + } + + /** + * This method confirms that ingestion commit when completing only looks at the completed commits. + */ + @Test + public void testConcurrentWritesWithInterleavingScheduledCluster() throws Exception { + createCommit(HoodieActiveTimeline.createNewInstantTime(), metaClient); + HoodieActiveTimeline timeline = metaClient.getActiveTimeline(); + // consider commits before this are all successful + Option lastSuccessfulInstant = timeline.getCommitsTimeline().filterCompletedInstants().lastInstant(); + // writer 1 starts + String currentWriterInstant = HoodieActiveTimeline.createNewInstantTime(); + createInflightCommit(currentWriterInstant, metaClient); + // clustering 1 gets scheduled + String newInstantTime = HoodieActiveTimeline.createNewInstantTime(); + createReplaceRequested(newInstantTime, metaClient); + createReplaceInflight(newInstantTime, WriteOperationType.CLUSTER, metaClient); + + Option currentInstant = Option.of(new HoodieInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.COMMIT_ACTION, currentWriterInstant)); + IngestionPrimaryWriterBasedConflictResolutionStrategy strategy = new IngestionPrimaryWriterBasedConflictResolutionStrategy(); + List candidateInstants = strategy.getCandidateInstants(metaClient, currentInstant.get(), lastSuccessfulInstant).collect( + Collectors.toList()); + // Since we give preference to ingestion over clustering, there wont be a conflict with replacecommit. + Assertions.assertEquals(0, candidateInstants.size()); + } + + /** + * This method confirms ingestion commit failing due to already present replacecommit. + * Here the replacecommit is allowed to commit. Ideally replacecommit cannot be committed when there is a ingestion inflight. + * The following case can occur, during transition phase of ingestion commit from Requested to Inflight, + * during that time replacecommit can be completed. + */ + @Test + public void testConcurrentWritesWithInterleavingSuccessfulCluster() throws Exception { + createCommit(HoodieActiveTimeline.createNewInstantTime(), metaClient); + HoodieActiveTimeline timeline = metaClient.getActiveTimeline(); + // consider commits before this are all successful + Option lastSuccessfulInstant = timeline.getCommitsTimeline().filterCompletedInstants().lastInstant(); + // writer 1 starts + String currentWriterInstant = HoodieActiveTimeline.createNewInstantTime(); + createInflightCommit(currentWriterInstant, metaClient); + // TODO: Remove sleep stmt once the modified times issue is fixed. + // Sleep thread for atleast 1sec for consecutive commits that way they do not have two commits modified times falls on the same millisecond. + Thread.sleep(1000); + // clustering writer starts and complete before ingestion commit. + String replaceWriterInstant = HoodieActiveTimeline.createNewInstantTime(); + createReplace(replaceWriterInstant, WriteOperationType.CLUSTER, metaClient); + + Option currentInstant = Option.of(new HoodieInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.COMMIT_ACTION, currentWriterInstant)); + IngestionPrimaryWriterBasedConflictResolutionStrategy strategy = new IngestionPrimaryWriterBasedConflictResolutionStrategy(); + metaClient.reloadActiveTimeline(); + List candidateInstants = strategy + .getCandidateInstants(metaClient, currentInstant.get(), lastSuccessfulInstant) + .collect(Collectors.toList()); + Assertions.assertEquals(1, candidateInstants.size()); + Assertions.assertEquals(replaceWriterInstant, candidateInstants.get(0).getTimestamp()); + HoodieCommitMetadata currentMetadata = createCommitMetadata(currentWriterInstant); + ConcurrentOperation thatCommitOperation = new ConcurrentOperation(candidateInstants.get(0), metaClient); + ConcurrentOperation thisCommitOperation = new ConcurrentOperation(currentInstant.get(), currentMetadata); + Assertions.assertTrue(strategy.hasConflict(thisCommitOperation, thatCommitOperation)); + try { + strategy.resolveConflict(null, thisCommitOperation, thatCommitOperation); + Assertions.fail("Cannot reach here, should have thrown a conflict"); + } catch (HoodieWriteConflictException e) { + // expected + } + } + + @Test + public void testConcurrentWritesWithInterleavingSuccessfulReplace() throws Exception { + createCommit(HoodieActiveTimeline.createNewInstantTime(), metaClient); + HoodieActiveTimeline timeline = metaClient.getActiveTimeline(); + // consider commits before this are all successful + Option lastSuccessfulInstant = timeline.getCommitsTimeline().filterCompletedInstants().lastInstant(); + // writer 1 starts + String currentWriterInstant = HoodieActiveTimeline.createNewInstantTime(); + createInflightCommit(currentWriterInstant, metaClient); + // TODO: Remove sleep stmt once the modified times issue is fixed. + // Sleep thread for atleast 1sec for consecutive commits that way they do not have two commits modified times falls on the same millisecond. + Thread.sleep(1000); + // replace 1 gets scheduled and finished + String newInstantTime = HoodieActiveTimeline.createNewInstantTime(); + createReplace(newInstantTime, WriteOperationType.INSERT_OVERWRITE, metaClient); + + Option currentInstant = Option.of(new HoodieInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.COMMIT_ACTION, currentWriterInstant)); + IngestionPrimaryWriterBasedConflictResolutionStrategy strategy = new IngestionPrimaryWriterBasedConflictResolutionStrategy(); + HoodieCommitMetadata currentMetadata = createCommitMetadata(currentWriterInstant); + List candidateInstants = strategy.getCandidateInstants(metaClient, currentInstant.get(), lastSuccessfulInstant).collect( + Collectors.toList()); + // writer 1 conflicts with replace 1 + Assertions.assertEquals(1, candidateInstants.size()); + ConcurrentOperation thatCommitOperation = new ConcurrentOperation(candidateInstants.get(0), metaClient); + ConcurrentOperation thisCommitOperation = new ConcurrentOperation(currentInstant.get(), currentMetadata); + Assertions.assertTrue(strategy.hasConflict(thisCommitOperation, thatCommitOperation)); + try { + strategy.resolveConflict(null, thisCommitOperation, thatCommitOperation); + Assertions.fail("Cannot reach here, should have thrown a conflict"); + } catch (HoodieWriteConflictException e) { + // expected + } + } + +} diff --git a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestSimpleConcurrentFileWritesConflictResolutionStrategy.java b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestSimpleConcurrentFileWritesConflictResolutionStrategy.java index e220c63f3e86a..cc18433b96336 100644 --- a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestSimpleConcurrentFileWritesConflictResolutionStrategy.java +++ b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestSimpleConcurrentFileWritesConflictResolutionStrategy.java @@ -18,25 +18,14 @@ package org.apache.hudi.client.transaction; -import org.apache.hudi.avro.model.HoodieClusteringGroup; -import org.apache.hudi.avro.model.HoodieClusteringPlan; -import org.apache.hudi.avro.model.HoodieCompactionOperation; -import org.apache.hudi.avro.model.HoodieCompactionPlan; -import org.apache.hudi.avro.model.HoodieRequestedReplaceMetadata; -import org.apache.hudi.avro.model.HoodieSliceInfo; import org.apache.hudi.client.utils.TransactionUtils; import org.apache.hudi.common.model.HoodieCommitMetadata; -import org.apache.hudi.common.model.HoodieReplaceCommitMetadata; -import org.apache.hudi.common.model.HoodieWriteStat; import org.apache.hudi.common.model.WriteOperationType; import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieInstant.State; import org.apache.hudi.common.table.timeline.HoodieTimeline; -import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion; -import org.apache.hudi.common.testutils.FileCreateUtils; import org.apache.hudi.common.testutils.HoodieCommonTestHarness; -import org.apache.hudi.common.testutils.HoodieTestDataGenerator; import org.apache.hudi.common.testutils.HoodieTestTable; import org.apache.hudi.common.util.Option; import org.apache.hudi.exception.HoodieWriteConflictException; @@ -46,14 +35,26 @@ import org.junit.jupiter.api.Test; import java.io.IOException; -import java.util.Arrays; -import java.util.HashMap; import java.util.List; -import java.util.Map; import java.util.Set; import java.util.stream.Collectors; import java.util.stream.Stream; +import static org.apache.hudi.client.transaction.TestConflictResolutionStrategyUtil.createCommit; +import static org.apache.hudi.client.transaction.TestConflictResolutionStrategyUtil.createCommitMetadata; +import static org.apache.hudi.client.transaction.TestConflictResolutionStrategyUtil.createCompaction; +import static org.apache.hudi.client.transaction.TestConflictResolutionStrategyUtil.createCompactionRequested; +import static org.apache.hudi.client.transaction.TestConflictResolutionStrategyUtil.createCompleteCommit; +import static org.apache.hudi.client.transaction.TestConflictResolutionStrategyUtil.createCompleteCompaction; +import static org.apache.hudi.client.transaction.TestConflictResolutionStrategyUtil.createCompleteReplace; +import static org.apache.hudi.client.transaction.TestConflictResolutionStrategyUtil.createInflightCommit; +import static org.apache.hudi.client.transaction.TestConflictResolutionStrategyUtil.createPendingCompaction; +import static org.apache.hudi.client.transaction.TestConflictResolutionStrategyUtil.createPendingReplace; +import static org.apache.hudi.client.transaction.TestConflictResolutionStrategyUtil.createReplace; +import static org.apache.hudi.client.transaction.TestConflictResolutionStrategyUtil.createReplaceInflight; +import static org.apache.hudi.client.transaction.TestConflictResolutionStrategyUtil.createReplaceRequested; +import static org.apache.hudi.client.transaction.TestConflictResolutionStrategyUtil.createRequestedCommit; + public class TestSimpleConcurrentFileWritesConflictResolutionStrategy extends HoodieCommonTestHarness { @BeforeEach @@ -64,7 +65,7 @@ public void init() throws IOException { @Test public void testNoConcurrentWrites() throws Exception { String newInstantTime = HoodieTestTable.makeNewCommitTime(); - createCommit(newInstantTime); + createCommit(newInstantTime, metaClient); // consider commits before this are all successful Option lastSuccessfulInstant = metaClient.getCommitsTimeline().filterCompletedInstants().lastInstant(); @@ -72,45 +73,45 @@ public void testNoConcurrentWrites() throws Exception { Option currentInstant = Option.of(new HoodieInstant(State.INFLIGHT, HoodieTimeline.COMMIT_ACTION, newInstantTime)); SimpleConcurrentFileWritesConflictResolutionStrategy strategy = new SimpleConcurrentFileWritesConflictResolutionStrategy(); - Stream candidateInstants = strategy.getCandidateInstants(metaClient.getActiveTimeline(), currentInstant.get(), lastSuccessfulInstant); + Stream candidateInstants = strategy.getCandidateInstants(metaClient, currentInstant.get(), lastSuccessfulInstant); Assertions.assertTrue(candidateInstants.count() == 0); } @Test public void testConcurrentWrites() throws Exception { String newInstantTime = HoodieTestTable.makeNewCommitTime(); - createCommit(newInstantTime); + createCommit(newInstantTime, metaClient); // consider commits before this are all successful // writer 1 - createInflightCommit(HoodieTestTable.makeNewCommitTime()); + createInflightCommit(HoodieTestTable.makeNewCommitTime(), metaClient); // writer 2 - createInflightCommit(HoodieTestTable.makeNewCommitTime()); + createInflightCommit(HoodieTestTable.makeNewCommitTime(), metaClient); Option lastSuccessfulInstant = metaClient.getCommitsTimeline().filterCompletedInstants().lastInstant(); newInstantTime = HoodieTestTable.makeNewCommitTime(); Option currentInstant = Option.of(new HoodieInstant(State.INFLIGHT, HoodieTimeline.COMMIT_ACTION, newInstantTime)); SimpleConcurrentFileWritesConflictResolutionStrategy strategy = new SimpleConcurrentFileWritesConflictResolutionStrategy(); - Stream candidateInstants = strategy.getCandidateInstants(metaClient.getActiveTimeline(), currentInstant.get(), lastSuccessfulInstant); + Stream candidateInstants = strategy.getCandidateInstants(metaClient, currentInstant.get(), lastSuccessfulInstant); Assertions.assertTrue(candidateInstants.count() == 0); } @Test public void testConcurrentWritesWithInterleavingSuccessfulCommit() throws Exception { - createCommit(HoodieActiveTimeline.createNewInstantTime()); + createCommit(HoodieActiveTimeline.createNewInstantTime(), metaClient); HoodieActiveTimeline timeline = metaClient.getActiveTimeline(); // consider commits before this are all successful Option lastSuccessfulInstant = timeline.getCommitsTimeline().filterCompletedInstants().lastInstant(); // writer 1 starts String currentWriterInstant = HoodieActiveTimeline.createNewInstantTime(); - createInflightCommit(currentWriterInstant); + createInflightCommit(currentWriterInstant, metaClient); // writer 2 starts and finishes String newInstantTime = HoodieActiveTimeline.createNewInstantTime(); - createCommit(newInstantTime); + createCommit(newInstantTime, metaClient); Option currentInstant = Option.of(new HoodieInstant(State.INFLIGHT, HoodieTimeline.COMMIT_ACTION, currentWriterInstant)); SimpleConcurrentFileWritesConflictResolutionStrategy strategy = new SimpleConcurrentFileWritesConflictResolutionStrategy(); HoodieCommitMetadata currentMetadata = createCommitMetadata(currentWriterInstant); - timeline = timeline.reload(); - List candidateInstants = strategy.getCandidateInstants(timeline, currentInstant.get(), lastSuccessfulInstant).collect( + metaClient.reloadActiveTimeline(); + List candidateInstants = strategy.getCandidateInstants(metaClient, currentInstant.get(), lastSuccessfulInstant).collect( Collectors.toList()); // writer 1 conflicts with writer 2 Assertions.assertTrue(candidateInstants.size() == 1); @@ -127,24 +128,24 @@ public void testConcurrentWritesWithInterleavingSuccessfulCommit() throws Except @Test public void testConcurrentWritesWithReplaceInflightCommit() throws Exception { - createReplaceInflight(HoodieActiveTimeline.createNewInstantTime()); + createReplaceInflight(HoodieActiveTimeline.createNewInstantTime(), metaClient); HoodieActiveTimeline timeline = metaClient.getActiveTimeline(); Option lastSuccessfulInstant = Option.empty(); // writer 1 starts String currentWriterInstant = HoodieActiveTimeline.createNewInstantTime(); - createInflightCommit(currentWriterInstant); + createInflightCommit(currentWriterInstant, metaClient); Option currentInstant = Option.of(new HoodieInstant(State.INFLIGHT, HoodieTimeline.COMMIT_ACTION, currentWriterInstant)); // writer 2 starts and finishes String newInstantTime = HoodieActiveTimeline.createNewInstantTime(); - createReplaceInflight(newInstantTime); + createReplaceInflight(newInstantTime, metaClient); SimpleConcurrentFileWritesConflictResolutionStrategy strategy = new SimpleConcurrentFileWritesConflictResolutionStrategy(); HoodieCommitMetadata currentMetadata = createCommitMetadata(currentWriterInstant); - timeline = timeline.reload(); + metaClient.reloadActiveTimeline(); - List candidateInstants = strategy.getCandidateInstants(timeline, currentInstant.get(), lastSuccessfulInstant).collect( + List candidateInstants = strategy.getCandidateInstants(metaClient, currentInstant.get(), lastSuccessfulInstant).collect( Collectors.toList()); // writer 1 conflicts with writer 2 @@ -162,22 +163,22 @@ public void testConcurrentWritesWithReplaceInflightCommit() throws Exception { @Test public void testConcurrentWritesWithInterleavingScheduledCompaction() throws Exception { - createCommit(HoodieActiveTimeline.createNewInstantTime()); + createCommit(HoodieActiveTimeline.createNewInstantTime(), metaClient); HoodieActiveTimeline timeline = metaClient.getActiveTimeline(); // consider commits before this are all successful Option lastSuccessfulInstant = timeline.getCommitsTimeline().filterCompletedInstants().lastInstant(); // writer 1 starts String currentWriterInstant = HoodieActiveTimeline.createNewInstantTime(); - createInflightCommit(currentWriterInstant); + createInflightCommit(currentWriterInstant, metaClient); // compaction 1 gets scheduled String newInstantTime = HoodieActiveTimeline.createNewInstantTime(); - createCompactionRequested(newInstantTime); + createCompactionRequested(newInstantTime, metaClient); Option currentInstant = Option.of(new HoodieInstant(State.INFLIGHT, HoodieTimeline.COMMIT_ACTION, currentWriterInstant)); SimpleConcurrentFileWritesConflictResolutionStrategy strategy = new SimpleConcurrentFileWritesConflictResolutionStrategy(); HoodieCommitMetadata currentMetadata = createCommitMetadata(currentWriterInstant); - timeline = timeline.reload(); - List candidateInstants = strategy.getCandidateInstants(timeline, currentInstant.get(), lastSuccessfulInstant).collect( + metaClient.reloadActiveTimeline(); + List candidateInstants = strategy.getCandidateInstants(metaClient, currentInstant.get(), lastSuccessfulInstant).collect( Collectors.toList()); // writer 1 conflicts with scheduled compaction plan 1 Assertions.assertTrue(candidateInstants.size() == 1); @@ -194,22 +195,22 @@ public void testConcurrentWritesWithInterleavingScheduledCompaction() throws Exc @Test public void testConcurrentWritesWithInterleavingSuccessfulCompaction() throws Exception { - createCommit(HoodieActiveTimeline.createNewInstantTime()); + createCommit(HoodieActiveTimeline.createNewInstantTime(), metaClient); HoodieActiveTimeline timeline = metaClient.getActiveTimeline(); // consider commits before this are all successful Option lastSuccessfulInstant = timeline.getCommitsTimeline().filterCompletedInstants().lastInstant(); // writer 1 starts String currentWriterInstant = HoodieActiveTimeline.createNewInstantTime(); - createInflightCommit(currentWriterInstant); + createInflightCommit(currentWriterInstant, metaClient); // compaction 1 gets scheduled and finishes String newInstantTime = HoodieActiveTimeline.createNewInstantTime(); - createCompaction(newInstantTime); + createCompaction(newInstantTime, metaClient); Option currentInstant = Option.of(new HoodieInstant(State.INFLIGHT, HoodieTimeline.COMMIT_ACTION, currentWriterInstant)); SimpleConcurrentFileWritesConflictResolutionStrategy strategy = new SimpleConcurrentFileWritesConflictResolutionStrategy(); HoodieCommitMetadata currentMetadata = createCommitMetadata(currentWriterInstant); - timeline = timeline.reload(); - List candidateInstants = strategy.getCandidateInstants(timeline, currentInstant.get(), lastSuccessfulInstant).collect( + metaClient.reloadActiveTimeline(); + List candidateInstants = strategy.getCandidateInstants(metaClient, currentInstant.get(), lastSuccessfulInstant).collect( Collectors.toList()); // writer 1 conflicts with compaction 1 Assertions.assertTrue(candidateInstants.size() == 1); @@ -224,24 +225,27 @@ public void testConcurrentWritesWithInterleavingSuccessfulCompaction() throws Ex } } + /** + * This method is verifying if a conflict exists for already commit compaction commit with current running ingestion commit. + */ @Test public void testConcurrentWriteAndCompactionScheduledEarlier() throws Exception { - createCommit(HoodieActiveTimeline.createNewInstantTime()); + createCommit(HoodieActiveTimeline.createNewInstantTime(), metaClient); // compaction 1 gets scheduled String newInstantTime = HoodieActiveTimeline.createNewInstantTime(); - createCompaction(newInstantTime); + createCompaction(newInstantTime, metaClient); // consider commits before this are all successful HoodieActiveTimeline timeline = metaClient.getActiveTimeline(); Option lastSuccessfulInstant = timeline.getCommitsTimeline().filterCompletedInstants().lastInstant(); // writer 1 starts String currentWriterInstant = HoodieActiveTimeline.createNewInstantTime(); - createInflightCommit(currentWriterInstant); + createInflightCommit(currentWriterInstant, metaClient); Option currentInstant = Option.of(new HoodieInstant(State.INFLIGHT, HoodieTimeline.COMMIT_ACTION, currentWriterInstant)); SimpleConcurrentFileWritesConflictResolutionStrategy strategy = new SimpleConcurrentFileWritesConflictResolutionStrategy(); HoodieCommitMetadata currentMetadata = createCommitMetadata(currentWriterInstant); - timeline = timeline.reload(); - List candidateInstants = strategy.getCandidateInstants(timeline, currentInstant.get(), lastSuccessfulInstant).collect( + metaClient.reloadActiveTimeline(); + List candidateInstants = strategy.getCandidateInstants(metaClient, currentInstant.get(), lastSuccessfulInstant).collect( Collectors.toList()); // writer 1 should not conflict with an earlier scheduled compaction 1 with the same file ids Assertions.assertTrue(candidateInstants.size() == 0); @@ -249,22 +253,22 @@ public void testConcurrentWriteAndCompactionScheduledEarlier() throws Exception @Test public void testConcurrentWritesWithInterleavingScheduledCluster() throws Exception { - createCommit(HoodieActiveTimeline.createNewInstantTime()); + createCommit(HoodieActiveTimeline.createNewInstantTime(), metaClient); HoodieActiveTimeline timeline = metaClient.getActiveTimeline(); // consider commits before this are all successful Option lastSuccessfulInstant = timeline.getCommitsTimeline().filterCompletedInstants().lastInstant(); // writer 1 starts String currentWriterInstant = HoodieActiveTimeline.createNewInstantTime(); - createInflightCommit(currentWriterInstant); + createInflightCommit(currentWriterInstant, metaClient); // clustering 1 gets scheduled String newInstantTime = HoodieActiveTimeline.createNewInstantTime(); - createReplaceRequested(newInstantTime); + createReplaceRequested(newInstantTime, metaClient); Option currentInstant = Option.of(new HoodieInstant(State.INFLIGHT, HoodieTimeline.COMMIT_ACTION, currentWriterInstant)); SimpleConcurrentFileWritesConflictResolutionStrategy strategy = new SimpleConcurrentFileWritesConflictResolutionStrategy(); HoodieCommitMetadata currentMetadata = createCommitMetadata(currentWriterInstant); - timeline = timeline.reload(); - List candidateInstants = strategy.getCandidateInstants(timeline, currentInstant.get(), lastSuccessfulInstant).collect( + metaClient.reloadActiveTimeline(); + List candidateInstants = strategy.getCandidateInstants(metaClient, currentInstant.get(), lastSuccessfulInstant).collect( Collectors.toList()); // writer 1 conflicts with scheduled compaction plan 1 Assertions.assertTrue(candidateInstants.size() == 1); @@ -281,22 +285,22 @@ public void testConcurrentWritesWithInterleavingScheduledCluster() throws Except @Test public void testConcurrentWritesWithInterleavingSuccessfulCluster() throws Exception { - createCommit(HoodieActiveTimeline.createNewInstantTime()); + createCommit(HoodieActiveTimeline.createNewInstantTime(), metaClient); HoodieActiveTimeline timeline = metaClient.getActiveTimeline(); // consider commits before this are all successful Option lastSuccessfulInstant = timeline.getCommitsTimeline().filterCompletedInstants().lastInstant(); // writer 1 starts String currentWriterInstant = HoodieActiveTimeline.createNewInstantTime(); - createInflightCommit(currentWriterInstant); + createInflightCommit(currentWriterInstant, metaClient); // cluster 1 gets scheduled and finishes String newInstantTime = HoodieActiveTimeline.createNewInstantTime(); - createReplace(newInstantTime, WriteOperationType.CLUSTER); + createReplace(newInstantTime, WriteOperationType.CLUSTER, metaClient); Option currentInstant = Option.of(new HoodieInstant(State.INFLIGHT, HoodieTimeline.COMMIT_ACTION, currentWriterInstant)); SimpleConcurrentFileWritesConflictResolutionStrategy strategy = new SimpleConcurrentFileWritesConflictResolutionStrategy(); HoodieCommitMetadata currentMetadata = createCommitMetadata(currentWriterInstant); - timeline = timeline.reload(); - List candidateInstants = strategy.getCandidateInstants(timeline, currentInstant.get(), lastSuccessfulInstant).collect( + metaClient.reloadActiveTimeline(); + List candidateInstants = strategy.getCandidateInstants(metaClient, currentInstant.get(), lastSuccessfulInstant).collect( Collectors.toList()); // writer 1 conflicts with cluster 1 Assertions.assertTrue(candidateInstants.size() == 1); @@ -313,22 +317,22 @@ public void testConcurrentWritesWithInterleavingSuccessfulCluster() throws Excep @Test public void testConcurrentWritesWithInterleavingSuccessfulReplace() throws Exception { - createCommit(HoodieActiveTimeline.createNewInstantTime()); + createCommit(HoodieActiveTimeline.createNewInstantTime(), metaClient); HoodieActiveTimeline timeline = metaClient.getActiveTimeline(); // consider commits before this are all successful Option lastSuccessfulInstant = timeline.getCommitsTimeline().filterCompletedInstants().lastInstant(); // writer 1 starts String currentWriterInstant = HoodieActiveTimeline.createNewInstantTime(); - createInflightCommit(currentWriterInstant); + createInflightCommit(currentWriterInstant, metaClient); // replace 1 gets scheduled and finished String newInstantTime = HoodieActiveTimeline.createNewInstantTime(); - createReplace(newInstantTime, WriteOperationType.INSERT_OVERWRITE); + createReplace(newInstantTime, WriteOperationType.INSERT_OVERWRITE, metaClient); Option currentInstant = Option.of(new HoodieInstant(State.INFLIGHT, HoodieTimeline.COMMIT_ACTION, currentWriterInstant)); SimpleConcurrentFileWritesConflictResolutionStrategy strategy = new SimpleConcurrentFileWritesConflictResolutionStrategy(); HoodieCommitMetadata currentMetadata = createCommitMetadata(currentWriterInstant); - timeline = timeline.reload(); - List candidateInstants = strategy.getCandidateInstants(timeline, currentInstant.get(), lastSuccessfulInstant).collect( + metaClient.reloadActiveTimeline(); + List candidateInstants = strategy.getCandidateInstants(metaClient, currentInstant.get(), lastSuccessfulInstant).collect( Collectors.toList()); // writer 1 conflicts with replace 1 Assertions.assertTrue(candidateInstants.size() == 1); @@ -343,170 +347,38 @@ public void testConcurrentWritesWithInterleavingSuccessfulReplace() throws Excep } } - private void createCommit(String instantTime) throws Exception { - String fileId1 = "file-1"; - String fileId2 = "file-2"; - - HoodieCommitMetadata commitMetadata = new HoodieCommitMetadata(); - commitMetadata.addMetadata("test", "test"); - HoodieWriteStat writeStat = new HoodieWriteStat(); - writeStat.setFileId("file-1"); - commitMetadata.addWriteStat(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, writeStat); - commitMetadata.setOperationType(WriteOperationType.INSERT); - HoodieTestTable.of(metaClient) - .addCommit(instantTime, Option.of(commitMetadata)) - .withBaseFilesInPartition(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, fileId1, fileId2); - } - - private HoodieCommitMetadata createCommitMetadata(String instantTime, String writeFileName) { - HoodieCommitMetadata commitMetadata = new HoodieCommitMetadata(); - commitMetadata.addMetadata("test", "test"); - HoodieWriteStat writeStat = new HoodieWriteStat(); - writeStat.setFileId(writeFileName); - commitMetadata.addWriteStat(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, writeStat); - commitMetadata.setOperationType(WriteOperationType.INSERT); - return commitMetadata; - } - - private HoodieCommitMetadata createCommitMetadata(String instantTime) { - return createCommitMetadata(instantTime, "file-1"); - } - - private void createInflightCommit(String instantTime) throws Exception { - String fileId1 = "file-" + instantTime + "-1"; - String fileId2 = "file-" + instantTime + "-2"; - HoodieTestTable.of(metaClient) - .addInflightCommit(instantTime) - .withBaseFilesInPartition(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, fileId1, fileId2); - } - - private void createCompactionRequested(String instantTime) throws Exception { - String fileId1 = "file-1"; - HoodieCompactionPlan compactionPlan = new HoodieCompactionPlan(); - compactionPlan.setVersion(TimelineLayoutVersion.CURR_VERSION); - HoodieCompactionOperation operation = new HoodieCompactionOperation(); - operation.setFileId(fileId1); - operation.setPartitionPath(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH); - operation.setDataFilePath("/file-1"); - operation.setDeltaFilePaths(Arrays.asList("/file-1")); - compactionPlan.setOperations(Arrays.asList(operation)); - HoodieTestTable.of(metaClient) - .addRequestedCompaction(instantTime, compactionPlan); - } - - private void createCompaction(String instantTime) throws Exception { - String fileId1 = "file-1"; - String fileId2 = "file-2"; - - HoodieCommitMetadata commitMetadata = new HoodieCommitMetadata(); - commitMetadata.addMetadata("test", "test"); - commitMetadata.setOperationType(WriteOperationType.COMPACT); - commitMetadata.setCompacted(true); - HoodieWriteStat writeStat = new HoodieWriteStat(); - writeStat.setFileId("file-1"); - commitMetadata.addWriteStat(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, writeStat); - HoodieTestTable.of(metaClient) - .addCommit(instantTime, Option.of(commitMetadata)) - .withBaseFilesInPartition(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, fileId1, fileId2); - } - - private void createReplaceRequested(String instantTime) throws Exception { - String fileId1 = "file-1"; - String fileId2 = "file-2"; - - // create replace instant to mark fileId1 as deleted - HoodieRequestedReplaceMetadata requestedReplaceMetadata = new HoodieRequestedReplaceMetadata(); - requestedReplaceMetadata.setOperationType(WriteOperationType.CLUSTER.name()); - HoodieClusteringPlan clusteringPlan = new HoodieClusteringPlan(); - HoodieClusteringGroup clusteringGroup = new HoodieClusteringGroup(); - HoodieSliceInfo sliceInfo = new HoodieSliceInfo(); - sliceInfo.setFileId(fileId1); - sliceInfo.setPartitionPath(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH); - clusteringGroup.setSlices(Arrays.asList(sliceInfo)); - clusteringPlan.setInputGroups(Arrays.asList(clusteringGroup)); - requestedReplaceMetadata.setClusteringPlan(clusteringPlan); - requestedReplaceMetadata.setVersion(TimelineLayoutVersion.CURR_VERSION); - HoodieTestTable.of(metaClient) - .addRequestedReplace(instantTime, Option.of(requestedReplaceMetadata)) - .withBaseFilesInPartition(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, fileId1, fileId2); - } - - private void createReplaceInflight(String instantTime) throws Exception { - String fileId1 = "file-1"; - String fileId2 = "file-2"; - - HoodieCommitMetadata inflightReplaceMetadata = new HoodieCommitMetadata(); - inflightReplaceMetadata.setOperationType(WriteOperationType.INSERT_OVERWRITE); - HoodieWriteStat writeStat = new HoodieWriteStat(); - writeStat.setFileId("file-1"); - inflightReplaceMetadata.addWriteStat(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, writeStat); - HoodieTestTable.of(metaClient) - .addInflightReplace(instantTime, Option.of(inflightReplaceMetadata)) - .withBaseFilesInPartition(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, fileId1, fileId2); - } - - private void createReplace(String instantTime, WriteOperationType writeOperationType) throws Exception { - String fileId1 = "file-1"; - String fileId2 = "file-2"; - - // create replace instant to mark fileId1 as deleted - HoodieReplaceCommitMetadata replaceMetadata = new HoodieReplaceCommitMetadata(); - Map> partitionFileIds = new HashMap<>(); - partitionFileIds.put(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, Arrays.asList(fileId2)); - replaceMetadata.setPartitionToReplaceFileIds(partitionFileIds); - HoodieWriteStat writeStat = new HoodieWriteStat(); - writeStat.setFileId("file-1"); - replaceMetadata.addWriteStat(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, writeStat); - replaceMetadata.setOperationType(writeOperationType); - // create replace instant to mark fileId1 as deleted - HoodieRequestedReplaceMetadata requestedReplaceMetadata = new HoodieRequestedReplaceMetadata(); - requestedReplaceMetadata.setOperationType(WriteOperationType.CLUSTER.name()); - HoodieClusteringPlan clusteringPlan = new HoodieClusteringPlan(); - HoodieClusteringGroup clusteringGroup = new HoodieClusteringGroup(); - HoodieSliceInfo sliceInfo = new HoodieSliceInfo(); - sliceInfo.setFileId(fileId1); - sliceInfo.setPartitionPath(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH); - clusteringGroup.setSlices(Arrays.asList(sliceInfo)); - clusteringPlan.setInputGroups(Arrays.asList(clusteringGroup)); - requestedReplaceMetadata.setClusteringPlan(clusteringPlan); - requestedReplaceMetadata.setVersion(TimelineLayoutVersion.CURR_VERSION); - HoodieTestTable.of(metaClient) - .addReplaceCommit(instantTime, Option.of(requestedReplaceMetadata), Option.empty(), replaceMetadata) - .withBaseFilesInPartition(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, fileId1, fileId2); - } - // try to simulate HUDI-3355 @Test public void testConcurrentWritesWithPendingInstants() throws Exception { // step1: create a pending replace/commit/compact instant: C1,C11,C12 String newInstantTimeC1 = HoodieActiveTimeline.createNewInstantTime(); - createPendingReplace(newInstantTimeC1, WriteOperationType.CLUSTER); + createPendingReplace(newInstantTimeC1, WriteOperationType.CLUSTER, metaClient); String newCompactionInstantTimeC11 = HoodieActiveTimeline.createNewInstantTime(); - createPendingCompaction(newCompactionInstantTimeC11); + createPendingCompaction(newCompactionInstantTimeC11, metaClient); String newCommitInstantTimeC12 = HoodieActiveTimeline.createNewInstantTime(); - createInflightCommit(newCommitInstantTimeC12); + createInflightCommit(newCommitInstantTimeC12, metaClient); // step2: create a complete commit which has no conflict with C1,C11,C12, named it as C2 - createCommit(HoodieActiveTimeline.createNewInstantTime()); + createCommit(HoodieActiveTimeline.createNewInstantTime(), metaClient); HoodieActiveTimeline timeline = metaClient.getActiveTimeline(); // consider commits before this are all successful Option lastSuccessfulInstant = timeline.getCommitsTimeline().filterCompletedInstants().lastInstant(); // step3: write 1 starts, which has conflict with C1,C11,C12, named it as C3 String currentWriterInstant = HoodieActiveTimeline.createNewInstantTime(); - createInflightCommit(currentWriterInstant); + createInflightCommit(currentWriterInstant, metaClient); // step4: create a requested commit, which has conflict with C3, named it as C4 String commitC4 = HoodieActiveTimeline.createNewInstantTime(); - createRequestedCommit(commitC4); + createRequestedCommit(commitC4, metaClient); // get PendingCommit during write 1 operation metaClient.reloadActiveTimeline(); Set pendingInstant = TransactionUtils.getInflightAndRequestedInstants(metaClient); pendingInstant.remove(currentWriterInstant); // step5: finished pending cluster/compaction/commit operation - createCompleteReplace(newInstantTimeC1, WriteOperationType.CLUSTER); - createCompleteCompaction(newCompactionInstantTimeC11); - createCompleteCommit(newCommitInstantTimeC12); - createCompleteCommit(commitC4); + createCompleteReplace(newInstantTimeC1, WriteOperationType.CLUSTER, metaClient); + createCompleteCompaction(newCompactionInstantTimeC11, metaClient); + createCompleteCommit(newCommitInstantTimeC12, metaClient); + createCompleteCommit(commitC4, metaClient); // step6: do check Option currentInstant = Option.of(new HoodieInstant(State.INFLIGHT, HoodieTimeline.COMMIT_ACTION, currentWriterInstant)); @@ -531,91 +403,4 @@ public void testConcurrentWritesWithPendingInstants() throws Exception { } } } - - private void createPendingReplace(String instantTime, WriteOperationType writeOperationType) throws Exception { - String fileId1 = "file-1"; - String fileId2 = "file-2"; - // create replace instant to mark fileId2 as deleted - HoodieRequestedReplaceMetadata requestedReplaceMetadata = new HoodieRequestedReplaceMetadata(); - requestedReplaceMetadata.setOperationType(WriteOperationType.CLUSTER.name()); - HoodieClusteringPlan clusteringPlan = new HoodieClusteringPlan(); - HoodieClusteringGroup clusteringGroup = new HoodieClusteringGroup(); - HoodieSliceInfo sliceInfo = new HoodieSliceInfo(); - sliceInfo.setFileId(fileId2); - sliceInfo.setPartitionPath(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH); - clusteringGroup.setSlices(Arrays.asList(sliceInfo)); - clusteringPlan.setInputGroups(Arrays.asList(clusteringGroup)); - requestedReplaceMetadata.setClusteringPlan(clusteringPlan); - requestedReplaceMetadata.setVersion(TimelineLayoutVersion.CURR_VERSION); - HoodieTestTable.of(metaClient) - .addPendingReplace(instantTime, Option.of(requestedReplaceMetadata), Option.empty()) - .withBaseFilesInPartition(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, fileId1, fileId2); - } - - private void createCompleteReplace(String instantTime, WriteOperationType writeOperationType) throws Exception { - String fileId1 = "file-1"; - String fileId2 = "file-2"; - - // create replace instant to mark fileId2 as deleted - HoodieReplaceCommitMetadata replaceMetadata = new HoodieReplaceCommitMetadata(); - Map> partitionFileIds = new HashMap<>(); - partitionFileIds.put(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, Arrays.asList(fileId2)); - replaceMetadata.setPartitionToReplaceFileIds(partitionFileIds); - HoodieWriteStat writeStat = new HoodieWriteStat(); - writeStat.setFileId("file-2"); - replaceMetadata.addWriteStat(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, writeStat); - replaceMetadata.setOperationType(writeOperationType); - FileCreateUtils.createReplaceCommit(metaClient.getBasePath(), instantTime, replaceMetadata); - } - - private void createPendingCompaction(String instantTime) throws Exception { - String fileId1 = "file-2"; - HoodieCompactionPlan compactionPlan = new HoodieCompactionPlan(); - compactionPlan.setVersion(TimelineLayoutVersion.CURR_VERSION); - HoodieCompactionOperation operation = new HoodieCompactionOperation(); - operation.setFileId(fileId1); - operation.setPartitionPath(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH); - operation.setDataFilePath("/file-2"); - operation.setDeltaFilePaths(Arrays.asList("/file-2")); - compactionPlan.setOperations(Arrays.asList(operation)); - HoodieTestTable.of(metaClient) - .addRequestedCompaction(instantTime, compactionPlan); - FileCreateUtils.createPendingInflightCompaction(metaClient.getBasePath(), instantTime); - } - - private void createCompleteCompaction(String instantTime) throws Exception { - String fileId1 = "file-1"; - String fileId2 = "file-2"; - - HoodieCommitMetadata commitMetadata = new HoodieCommitMetadata(); - commitMetadata.addMetadata("test", "test"); - commitMetadata.setOperationType(WriteOperationType.COMPACT); - commitMetadata.setCompacted(true); - HoodieWriteStat writeStat = new HoodieWriteStat(); - writeStat.setFileId("file-2"); - commitMetadata.addWriteStat(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, writeStat); - HoodieTestTable.of(metaClient) - .addCommit(instantTime, Option.of(commitMetadata)) - .withBaseFilesInPartition(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, fileId1, fileId2); - } - - private void createRequestedCommit(String instantTime) throws Exception { - HoodieTestTable.of(metaClient) - .addInflightCommit(instantTime); - } - - private void createCompleteCommit(String instantTime) throws Exception { - String fileId1 = "file-1"; - String fileId2 = "file-2"; - - HoodieCommitMetadata commitMetadata = new HoodieCommitMetadata(); - commitMetadata.addMetadata("test", "test"); - HoodieWriteStat writeStat = new HoodieWriteStat(); - writeStat.setFileId("file-2"); - commitMetadata.addWriteStat(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, writeStat); - commitMetadata.setOperationType(WriteOperationType.INSERT); - HoodieTestTable.of(metaClient) - .addCommit(instantTime, Option.of(commitMetadata)) - .withBaseFilesInPartition(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, fileId1, fileId2); - } } diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkTableServiceClient.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkTableServiceClient.java index f690efbe06c78..e8927783057b0 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkTableServiceClient.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkTableServiceClient.java @@ -127,9 +127,14 @@ protected void completeClustering( try { this.txnManager.beginTransaction(Option.of(clusteringInstant), Option.empty()); finalizeWrite(table, clusteringCommitTime, writeStats); + // Only in some cases conflict resolution needs to be performed. + // So, check if preCommit method that does conflict resolution needs to be triggered. + if (isPreCommitRequired()) { + preCommit(metadata); + } // commit to data table after committing to metadata table. - // Do not do any conflict resolution here as we do with regular writes. We take the lock here to ensure all writes to metadata table happens within a - // single lock (single writer). Because more than one write to metadata table will result in conflicts since all of them updates the same partition. + // We take the lock here to ensure all writes to metadata table happens within a single lock (single writer). + // Because more than one write to metadata table will result in conflicts since all of them updates the same partition. writeTableMetadata(table, clusteringCommitTime, clusteringInstant.getAction(), metadata); LOG.info("Committing Clustering {} finished with result {}.", clusteringCommitTime, metadata); table.getActiveTimeline().transitionReplaceInflightToComplete( diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDTableServiceClient.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDTableServiceClient.java index 6079d339317b4..c0fb8a723516e 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDTableServiceClient.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDTableServiceClient.java @@ -257,6 +257,11 @@ private void completeClustering(HoodieReplaceCommitMetadata metadata, this.txnManager.beginTransaction(Option.of(clusteringInstant), Option.empty()); finalizeWrite(table, clusteringCommitTime, writeStats); + // Only in some cases conflict resolution needs to be performed. + // So, check if preCommit method that does conflict resolution needs to be triggered. + if (isPreCommitRequired()) { + preCommit(metadata); + } // Update table's metadata (table) updateTableMetadata(table, metadata, clusteringInstant); diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieClientMultiWriter.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieClientMultiWriter.java index 97bac5738677d..0984f9436bf47 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieClientMultiWriter.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieClientMultiWriter.java @@ -18,6 +18,9 @@ package org.apache.hudi.client; +import org.apache.hudi.client.transaction.ConflictResolutionStrategy; +import org.apache.hudi.client.transaction.IngestionPrimaryWriterBasedConflictResolutionStrategy; +import org.apache.hudi.client.transaction.SimpleConcurrentFileWritesConflictResolutionStrategy; import org.apache.hudi.client.transaction.lock.FileSystemBasedLockProvider; import org.apache.hudi.client.transaction.lock.InProcessLockProvider; import org.apache.hudi.client.transaction.lock.ZookeeperBasedLockProvider; @@ -31,6 +34,7 @@ import org.apache.hudi.common.model.WriteConcurrencyMode; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.marker.MarkerType; +import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.view.FileSystemViewStorageConfig; import org.apache.hudi.common.table.view.FileSystemViewStorageType; @@ -137,11 +141,17 @@ public void clean() throws IOException { InProcessLockProvider.class, FileSystemBasedLockProvider.class); - private static Iterable providerClassAndTableType() { + private static final List CONFLICT_RESOLUTION_STRATEGY_CLASSES = Arrays.asList( + new SimpleConcurrentFileWritesConflictResolutionStrategy(), + new IngestionPrimaryWriterBasedConflictResolutionStrategy()); + + private static Iterable providerClassResolutionStrategyAndTableType() { List opts = new ArrayList<>(); for (Object providerClass : LOCK_PROVIDER_CLASSES) { - opts.add(new Object[] {HoodieTableType.COPY_ON_WRITE, providerClass}); - opts.add(new Object[] {HoodieTableType.MERGE_ON_READ, providerClass}); + for (ConflictResolutionStrategy resolutionStrategy: CONFLICT_RESOLUTION_STRATEGY_CLASSES) { + opts.add(new Object[] {HoodieTableType.COPY_ON_WRITE, providerClass, resolutionStrategy}); + opts.add(new Object[] {HoodieTableType.MERGE_ON_READ, providerClass, resolutionStrategy}); + } } return opts; } @@ -253,8 +263,9 @@ public void testHoodieClientBasicMultiWriterWithEarlyConflictDetection(String ta } @ParameterizedTest - @MethodSource("providerClassAndTableType") - public void testHoodieClientBasicMultiWriter(HoodieTableType tableType, Class providerClass) throws Exception { + @MethodSource("providerClassResolutionStrategyAndTableType") + public void testHoodieClientBasicMultiWriter(HoodieTableType tableType, Class providerClass, + ConflictResolutionStrategy resolutionStrategy) throws Exception { if (tableType == HoodieTableType.MERGE_ON_READ) { setUpMORTestTable(); } @@ -270,6 +281,7 @@ public void testHoodieClientBasicMultiWriter(HoodieTableType tableType, Class pr // Timeline-server-based markers are not used for multi-writer tests .withMarkersType(MarkerType.DIRECT.name()) .withLockConfig(HoodieLockConfig.newBuilder().withLockProvider(providerClass) + .withConflictResolutionStrategy(resolutionStrategy) .build()).withAutoCommit(false).withProperties(lockProperties).build(); // Create the first commit @@ -286,7 +298,7 @@ public void testHoodieClientBasicMultiWriter(HoodieTableType tableType, Class pr Future future1 = executors.submit(() -> { try { - final String nextCommitTime = "002"; + final String nextCommitTime = HoodieActiveTimeline.createNewInstantTime(); final JavaRDD writeStatusList = startCommitForUpdate(writeConfig, client1, nextCommitTime, 100); // Wait for the 2nd writer to start the commit @@ -307,7 +319,7 @@ public void testHoodieClientBasicMultiWriter(HoodieTableType tableType, Class pr Future future2 = executors.submit(() -> { try { - final String nextCommitTime = "003"; + final String nextCommitTime = HoodieActiveTimeline.createNewInstantTime(); // Wait for the 1st writer to make progress with the commit cyclicBarrier.await(60, TimeUnit.SECONDS); @@ -408,8 +420,9 @@ private void latchCountDownAndWait(CountDownLatch latch, long waitTimeMillis) { } @ParameterizedTest - @MethodSource("providerClassAndTableType") - public void testMultiWriterWithAsyncTableServicesWithConflict(HoodieTableType tableType, Class providerClass) throws Exception { + @MethodSource("providerClassResolutionStrategyAndTableType") + public void testMultiWriterWithAsyncTableServicesWithConflict(HoodieTableType tableType, Class providerClass, + ConflictResolutionStrategy resolutionStrategy) throws Exception { // create inserts X 1 if (tableType == HoodieTableType.MERGE_ON_READ) { setUpMORTestTable(); @@ -419,6 +432,7 @@ public void testMultiWriterWithAsyncTableServicesWithConflict(HoodieTableType ta .withCleanConfig(HoodieCleanConfig.newBuilder() .withAutoClean(false) .withAsyncClean(true) + .retainCommits(0) .withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY).build()) .withCompactionConfig(HoodieCompactionConfig.newBuilder() .withInlineCompaction(false) @@ -430,18 +444,24 @@ public void testMultiWriterWithAsyncTableServicesWithConflict(HoodieTableType ta FileSystemViewStorageType.MEMORY).build()) .withWriteConcurrencyMode(WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL) .withLockConfig(HoodieLockConfig.newBuilder().withLockProvider(providerClass) + .withConflictResolutionStrategy(resolutionStrategy) .build()).withAutoCommit(false).withProperties(lockProperties); Set validInstants = new HashSet<>(); + // Create the first commit with inserts HoodieWriteConfig cfg = writeConfigBuilder.build(); SparkRDDWriteClient client = getHoodieWriteClient(cfg); - createCommitWithInserts(cfg, client, "000", "001", 200, true); - validInstants.add("001"); + String firstCommitTime = HoodieActiveTimeline.createNewInstantTime(); + createCommitWithInserts(cfg, client, "000", firstCommitTime, 200, true); + validInstants.add(firstCommitTime); + // Create 2 commits with upserts - createCommitWithUpserts(cfg, client, "001", "000", "002", 100); - createCommitWithUpserts(cfg, client, "002", "000", "003", 100); - validInstants.add("002"); - validInstants.add("003"); + String secondCommitTime = HoodieActiveTimeline.createNewInstantTime(); + createCommitWithUpserts(cfg, client, firstCommitTime, "000", secondCommitTime, 100); + String thirdCommitTime = HoodieActiveTimeline.createNewInstantTime(); + createCommitWithUpserts(cfg, client, secondCommitTime, "000", thirdCommitTime, 100); + validInstants.add(secondCommitTime); + validInstants.add(thirdCommitTime); // Three clients running actions in parallel final int threadCount = 3; @@ -461,9 +481,9 @@ public void testMultiWriterWithAsyncTableServicesWithConflict(HoodieTableType ta // Create upserts, schedule cleaning, schedule compaction in parallel Future future1 = executors.submit(() -> { - final String newCommitTime = "004"; + final String newCommitTime = HoodieActiveTimeline.createNewInstantTime(); final int numRecords = 100; - final String commitTimeBetweenPrevAndNew = "002"; + final String commitTimeBetweenPrevAndNew = secondCommitTime; // We want the upsert to go through only after the compaction // and cleaning schedule completion. So, waiting on latch here. @@ -472,13 +492,13 @@ public void testMultiWriterWithAsyncTableServicesWithConflict(HoodieTableType ta // Since the compaction already went in, this upsert has // to fail assertThrows(IllegalArgumentException.class, () -> { - createCommitWithUpserts(cfg, client1, "003", commitTimeBetweenPrevAndNew, newCommitTime, numRecords); + createCommitWithUpserts(cfg, client1, thirdCommitTime, commitTimeBetweenPrevAndNew, newCommitTime, numRecords); }); } else { // We don't have the compaction for COW and so this upsert // has to pass assertDoesNotThrow(() -> { - createCommitWithUpserts(cfg, client1, "003", commitTimeBetweenPrevAndNew, newCommitTime, numRecords); + createCommitWithUpserts(cfg, client1, thirdCommitTime, commitTimeBetweenPrevAndNew, newCommitTime, numRecords); }); validInstants.add(newCommitTime); } @@ -487,7 +507,8 @@ public void testMultiWriterWithAsyncTableServicesWithConflict(HoodieTableType ta Future future2 = executors.submit(() -> { if (tableType == HoodieTableType.MERGE_ON_READ) { assertDoesNotThrow(() -> { - client2.scheduleTableService("005", Option.empty(), TableServiceType.COMPACT); + String compactionTimeStamp = HoodieActiveTimeline.createNewInstantTime(); + client2.scheduleTableService(compactionTimeStamp, Option.empty(), TableServiceType.COMPACT); }); } latchCountDownAndWait(scheduleCountDownLatch, 30000); @@ -496,22 +517,33 @@ public void testMultiWriterWithAsyncTableServicesWithConflict(HoodieTableType ta Future future3 = executors.submit(() -> { assertDoesNotThrow(() -> { latchCountDownAndWait(scheduleCountDownLatch, 30000); - client3.scheduleTableService("006", Option.empty(), TableServiceType.CLEAN); + String cleanCommitTime = HoodieActiveTimeline.createNewInstantTime(); + client3.scheduleTableService(cleanCommitTime, Option.empty(), TableServiceType.CLEAN); }); }); future1.get(); future2.get(); future3.get(); + String pendingCompactionTime = (tableType == HoodieTableType.MERGE_ON_READ) + ? metaClient.reloadActiveTimeline().filterPendingCompactionTimeline() + .firstInstant().get().getTimestamp() + : ""; + Option pendingCleanInstantOp = metaClient.reloadActiveTimeline().getCleanerTimeline().filterInflightsAndRequested() + .firstInstant(); + String pendingCleanTime = pendingCleanInstantOp.isPresent() + ? pendingCleanInstantOp.get().getTimestamp() + : HoodieActiveTimeline.createNewInstantTime(); + CountDownLatch runCountDownLatch = new CountDownLatch(threadCount); // Create inserts, run cleaning, run compaction in parallel future1 = executors.submit(() -> { - final String newCommitTime = "007"; + final String newCommitTime = HoodieActiveTimeline.createNewInstantTime(); final int numRecords = 100; latchCountDownAndWait(runCountDownLatch, 30000); assertDoesNotThrow(() -> { - createCommitWithInserts(cfg, client1, "003", newCommitTime, numRecords, true); - validInstants.add("007"); + createCommitWithInserts(cfg, client1, thirdCommitTime, newCommitTime, numRecords, true); + validInstants.add(newCommitTime); }); }); @@ -519,9 +551,9 @@ public void testMultiWriterWithAsyncTableServicesWithConflict(HoodieTableType ta latchCountDownAndWait(runCountDownLatch, 30000); if (tableType == HoodieTableType.MERGE_ON_READ) { assertDoesNotThrow(() -> { - HoodieWriteMetadata> compactionMetadata = client2.compact("005"); - client2.commitCompaction("005", compactionMetadata.getCommitMetadata().get(), Option.empty()); - validInstants.add("005"); + HoodieWriteMetadata> compactionMetadata = client2.compact(pendingCompactionTime); + client2.commitCompaction(pendingCompactionTime, compactionMetadata.getCommitMetadata().get(), Option.empty()); + validInstants.add(pendingCompactionTime); }); } }); @@ -529,8 +561,8 @@ public void testMultiWriterWithAsyncTableServicesWithConflict(HoodieTableType ta future3 = executors.submit(() -> { latchCountDownAndWait(runCountDownLatch, 30000); assertDoesNotThrow(() -> { - client3.clean("006", false); - validInstants.add("006"); + client3.clean(pendingCleanTime, false); + validInstants.add(pendingCleanTime); }); }); future1.get(); @@ -649,6 +681,7 @@ public void testHoodieClientMultiWriterWithClustering(HoodieTableType tableType) // Timeline-server-based markers are not used for multi-writer tests .withMarkersType(MarkerType.DIRECT.name()) .withLockConfig(HoodieLockConfig.newBuilder().withLockProvider(InProcessLockProvider.class) + .withConflictResolutionStrategy(new SimpleConcurrentFileWritesConflictResolutionStrategy()) .build()).withAutoCommit(false).withProperties(properties); HoodieWriteConfig cfg = writeConfigBuilder.build(); HoodieWriteConfig cfg2 = writeConfigBuilder.build(); diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestMultiwriterWithIngestionAsPrimaryWriter.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestMultiwriterWithIngestionAsPrimaryWriter.java new file mode 100644 index 0000000000000..6d2e21821064d --- /dev/null +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestMultiwriterWithIngestionAsPrimaryWriter.java @@ -0,0 +1,256 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.client; + +import org.apache.hadoop.fs.Path; +import org.apache.hudi.client.transaction.lock.InProcessLockProvider; +import org.apache.hudi.client.transaction.IngestionPrimaryWriterBasedConflictResolutionStrategy; +import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy; +import org.apache.hudi.common.model.HoodieFileFormat; +import org.apache.hudi.common.model.HoodieTableType; +import org.apache.hudi.common.model.TableServiceType; +import org.apache.hudi.common.model.WriteConcurrencyMode; +import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; +import org.apache.hudi.common.table.timeline.HoodieInstant; +import org.apache.hudi.common.table.view.FileSystemViewStorageConfig; +import org.apache.hudi.common.table.view.FileSystemViewStorageType; +import org.apache.hudi.common.testutils.HoodieTestUtils; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.config.HoodieCleanConfig; +import org.apache.hudi.config.HoodieClusteringConfig; +import org.apache.hudi.config.HoodieCompactionConfig; +import org.apache.hudi.config.HoodieLockConfig; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.exception.HoodieClusteringException; +import org.apache.hudi.exception.HoodieWriteConflictException; +import org.apache.hudi.table.action.HoodieWriteMetadata; +import org.apache.hudi.testutils.HoodieClientTestBase; +import org.apache.spark.api.java.JavaRDD; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.EnumSource; + +import java.io.IOException; +import java.util.Arrays; +import java.util.HashSet; +import java.util.Properties; +import java.util.Set; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.stream.Collectors; + +import static org.apache.hudi.common.config.LockConfiguration.FILESYSTEM_LOCK_PATH_PROP_KEY; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class TestMultiwriterWithIngestionAsPrimaryWriter extends HoodieClientTestBase { + + public void setUpMORTestTable() throws IOException { + cleanupResources(); + initPath(); + initSparkContexts(); + initTestDataGenerator(); + initFileSystem(); + fs.mkdirs(new Path(basePath)); + metaClient = HoodieTestUtils.init(hadoopConf, basePath, HoodieTableType.MERGE_ON_READ, HoodieFileFormat.PARQUET); + initTestDataGenerator(); + } + + @AfterEach + public void clean() throws IOException { + cleanupResources(); + } + + @ParameterizedTest + @EnumSource(value = HoodieTableType.class, names = {"COPY_ON_WRITE", "MERGE_ON_READ"}) + public void testMultiWriterWithAsyncTableServicesWithConflict(HoodieTableType tableType) throws Exception { + // create inserts X 1 + if (tableType == HoodieTableType.MERGE_ON_READ) { + setUpMORTestTable(); + } + Properties properties = new Properties(); + properties.setProperty(FILESYSTEM_LOCK_PATH_PROP_KEY, basePath + "/.hoodie/.locks"); + // Disabling embedded timeline server, it doesn't work with multiwriter + HoodieWriteConfig cfg = getConfigBuilder() + .withCompactionConfig(HoodieCompactionConfig.newBuilder() + .withInlineCompaction(false) + .withMaxNumDeltaCommitsBeforeCompaction(2).build()) + .withCleanConfig(HoodieCleanConfig.newBuilder() + .withAutoClean(false).withAsyncClean(true) + .withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY).build()) + .withEmbeddedTimelineServerEnabled(false) + .withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder().withStorageType( + FileSystemViewStorageType.MEMORY).build()) + .withClusteringConfig(HoodieClusteringConfig.newBuilder().withInlineClusteringNumCommits(1).build()) + .withWriteConcurrencyMode(WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL) + .withLockConfig(HoodieLockConfig.newBuilder().withLockProvider(InProcessLockProvider.class) + .withConflictResolutionStrategy(new IngestionPrimaryWriterBasedConflictResolutionStrategy()) + .build()).withAutoCommit(false).withProperties(properties).build(); + Set validInstants = new HashSet<>(); + // Create the first commit with inserts + SparkRDDWriteClient client = getHoodieWriteClient(cfg); + String instantTime1 = HoodieActiveTimeline.createNewInstantTime(); + createCommitWithInserts(cfg, client, "000", instantTime1, 200); + validInstants.add(instantTime1); + // Create 2 commits with upserts + String instantTime2 = HoodieActiveTimeline.createNewInstantTime(); + createCommitWithUpserts(cfg, client, instantTime1, "000", instantTime2, 100); + String instantTime3 = HoodieActiveTimeline.createNewInstantTime(); + createCommitWithUpserts(cfg, client, instantTime2, "000", instantTime3, 100); + validInstants.add(instantTime2); + validInstants.add(instantTime3); + ExecutorService executors = Executors.newFixedThreadPool(2); + SparkRDDWriteClient client1 = getHoodieWriteClient(cfg); + SparkRDDWriteClient client2 = getHoodieWriteClient(cfg); + // Create upserts, schedule cleaning, schedule compaction in parallel + String instant4 = HoodieActiveTimeline.createNewInstantTime(); + Future future1 = executors.submit(() -> { + int numRecords = 100; + String commitTimeBetweenPrevAndNew = instantTime2; + try { + // For both COW and MOR table types the commit should not be blocked, since we are giving preference to ingestion. + createCommitWithUpserts(cfg, client1, instantTime3, commitTimeBetweenPrevAndNew, instant4, numRecords); + validInstants.add(instant4); + } catch (Exception e1) { + throw new RuntimeException(e1); + } + }); + String instant5 = HoodieActiveTimeline.createNewInstantTime(); + Future future2 = executors.submit(() -> { + try { + client2.scheduleTableService(instant5, Option.empty(), TableServiceType.COMPACT); + } catch (Exception e2) { + if (tableType == HoodieTableType.MERGE_ON_READ) { + throw new RuntimeException(e2); + } + } + }); + String instant6 = HoodieActiveTimeline.createNewInstantTime(); + Future future3 = executors.submit(() -> { + try { + client2.scheduleTableService(instant6, Option.empty(), TableServiceType.CLEAN); + } catch (Exception e2) { + throw new RuntimeException(e2); + } + }); + future1.get(); + future2.get(); + future3.get(); + // Create inserts, run cleaning, run compaction in parallel + String instant7 = HoodieActiveTimeline.createNewInstantTime(); + future1 = executors.submit(() -> { + int numRecords = 100; + try { + createCommitWithInserts(cfg, client1, instantTime3, instant7, numRecords); + validInstants.add(instant7); + } catch (Exception e1) { + throw new RuntimeException(e1); + } + }); + future2 = executors.submit(() -> { + try { + HoodieWriteMetadata> compactionMetadata = client2.compact(instant5); + client2.commitCompaction(instant5, compactionMetadata.getCommitMetadata().get(), Option.empty()); + validInstants.add(instant5); + } catch (Exception e2) { + if (tableType == HoodieTableType.MERGE_ON_READ) { + Assertions.assertTrue(e2 instanceof HoodieWriteConflictException); + } + } + }); + future3 = executors.submit(() -> { + try { + client2.clean(instant6, false); + validInstants.add(instant6); + } catch (Exception e2) { + throw new RuntimeException(e2); + } + }); + future1.get(); + future2.get(); + future3.get(); + Set completedInstants = metaClient.getActiveTimeline().getCommitsTimeline() + .filterCompletedInstants().getInstantsAsStream().map(HoodieInstant::getTimestamp) + .collect(Collectors.toSet()); + Assertions.assertTrue(validInstants.containsAll(completedInstants)); + } + + @ParameterizedTest + @EnumSource(value = HoodieTableType.class, names = {"COPY_ON_WRITE", "MERGE_ON_READ"}) + public void testHoodieClientMultiWriterWithClustering(HoodieTableType tableType) throws Exception { + if (tableType == HoodieTableType.MERGE_ON_READ) { + setUpMORTestTable(); + } + Properties properties = new Properties(); + properties.setProperty(FILESYSTEM_LOCK_PATH_PROP_KEY, basePath + "/.hoodie/.locks"); + HoodieWriteConfig cfg = getConfigBuilder() + .withCleanConfig(HoodieCleanConfig.newBuilder().withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY) + .withAutoClean(false).build()) + .withClusteringConfig(HoodieClusteringConfig.newBuilder().withInlineClusteringNumCommits(1).build()) + .withWriteConcurrencyMode(WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL) + .withLockConfig(HoodieLockConfig.newBuilder().withLockProvider(InProcessLockProvider.class) + .withConflictResolutionStrategy(new IngestionPrimaryWriterBasedConflictResolutionStrategy()) + .build()).withAutoCommit(false).withProperties(properties).build(); + // Create the first commit + String instant1 = HoodieActiveTimeline.createNewInstantTime(); + createCommitWithInserts(cfg, getHoodieWriteClient(cfg), "000", instant1, 200); + // Start another inflight commit + String instant2 = HoodieActiveTimeline.createNewInstantTime(); + int numRecords = 100; + SparkRDDWriteClient client1 = getHoodieWriteClient(cfg); + + JavaRDD result1 = updateBatch(cfg, client1, instant2, instant1, + Option.of(Arrays.asList(instant1)), "000", numRecords, SparkRDDWriteClient::upsert, false, false, + numRecords, 200, 2); + // Start and finish another commit while the previous writer for commit 003 is running + String instant3 = HoodieActiveTimeline.createNewInstantTime(); + SparkRDDWriteClient client2 = getHoodieWriteClient(cfg); + JavaRDD result2 = updateBatch(cfg, client2, instant3, instant1, + Option.of(Arrays.asList(instant1)), "000", numRecords, SparkRDDWriteClient::upsert, false, false, + numRecords, 200, 2); + client2.commit(instant3, result2); + // Schedule and run clustering while previous writer for commit 003 is running + SparkRDDWriteClient client3 = getHoodieWriteClient(cfg); + // schedule clustering + Option clusterInstant = client3.scheduleTableService(Option.empty(), TableServiceType.CLUSTER); + + // Since instant 2 is still in inflight the clustering commit should fail with HoodieWriteConflictException exception. + assertThrows(HoodieClusteringException.class, () -> client3.cluster(clusterInstant.get(), true)); + } + + private void createCommitWithInserts(HoodieWriteConfig cfg, SparkRDDWriteClient client, + String prevCommitTime, String newCommitTime, int numRecords) throws Exception { + // Finish first base commmit + JavaRDD result = insertFirstBatch(cfg, client, newCommitTime, prevCommitTime, numRecords, SparkRDDWriteClient::bulkInsert, + false, false, numRecords); + assertTrue(client.commit(newCommitTime, result), "Commit should succeed"); + } + + private void createCommitWithUpserts(HoodieWriteConfig cfg, SparkRDDWriteClient client, String prevCommit, + String commitTimeBetweenPrevAndNew, String newCommitTime, int numRecords) + throws Exception { + JavaRDD result = updateBatch(cfg, client, newCommitTime, prevCommit, + Option.of(Arrays.asList(commitTimeBetweenPrevAndNew)), "000", numRecords, SparkRDDWriteClient::upsert, false, false, + numRecords, 200, 2); + client.commit(newCommitTime, result); + } + +} diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java index 21fc571bb44b4..a3fd7775229cc 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java @@ -1862,7 +1862,7 @@ public void testMultiWriterForDoubleLocking() throws Exception { SparkRDDWriteClient writeClient = new SparkRDDWriteClient(engineContext, writeConfig); String partitionPath = dataGen.getPartitionPaths()[0]; for (int j = 0; j < 6; j++) { - String newCommitTime = "000000" + j; + String newCommitTime = HoodieActiveTimeline.createNewInstantTime(); List records = dataGen.generateInsertsForPartition(newCommitTime, 100, partitionPath); writeClient.startCommitWithTime(newCommitTime); JavaRDD writeStatuses = writeClient.insert(jsc.parallelize(records, 1), newCommitTime); diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java index 301ec688f088e..6e3dc3793ffab 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java @@ -30,6 +30,9 @@ import org.apache.hudi.client.clustering.run.strategy.SparkSingleFileSortExecutionStrategy; import org.apache.hudi.client.transaction.lock.InProcessLockProvider; import org.apache.hudi.client.clustering.update.strategy.SparkRejectUpdateStrategy; +import org.apache.hudi.client.transaction.FileSystemBasedLockProviderTestClass; +import org.apache.hudi.client.transaction.IngestionPrimaryWriterBasedConflictResolutionStrategy; +import org.apache.hudi.client.transaction.SimpleConcurrentFileWritesConflictResolutionStrategy; import org.apache.hudi.client.validator.SparkPreCommitValidator; import org.apache.hudi.client.validator.SqlQueryEqualityPreCommitValidator; import org.apache.hudi.client.validator.SqlQuerySingleResultPreCommitValidator; @@ -92,6 +95,7 @@ import org.apache.hudi.config.HoodiePreCommitValidatorConfig; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.data.HoodieJavaRDD; +import org.apache.hudi.exception.HoodieClusteringException; import org.apache.hudi.exception.HoodieCommitException; import org.apache.hudi.exception.HoodieCorruptedDataException; import org.apache.hudi.exception.HoodieException; @@ -100,6 +104,7 @@ import org.apache.hudi.exception.HoodieUpsertException; import org.apache.hudi.exception.HoodieValidationException; import org.apache.hudi.execution.bulkinsert.RDDCustomColumnsSortPartitioner; +import org.apache.hudi.exception.HoodieWriteConflictException; import org.apache.hudi.index.HoodieIndex; import org.apache.hudi.index.HoodieIndex.IndexType; import org.apache.hudi.io.HoodieMergeHandle; @@ -154,6 +159,7 @@ import java.util.stream.Collectors; import java.util.stream.Stream; +import static org.apache.hudi.common.config.LockConfiguration.FILESYSTEM_LOCK_PATH_PROP_KEY; import static org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy.EAGER; import static org.apache.hudi.common.table.timeline.HoodieInstant.State.COMPLETED; import static org.apache.hudi.common.table.timeline.HoodieInstant.State.INFLIGHT; @@ -2762,6 +2768,145 @@ public void testMultiOperationsPerCommit(boolean populateMetaFields) throws IOEx "Must contain " + totalRecords + " records"); } + @Test + public void testClusteringCommitInPresenceOfInflightCommit() throws Exception { + Properties properties = new Properties(); + properties.setProperty(FILESYSTEM_LOCK_PATH_PROP_KEY, basePath + "/.hoodie/.locks"); + HoodieLockConfig lockConfig = HoodieLockConfig.newBuilder() + .withLockProvider(FileSystemBasedLockProviderTestClass.class) + .withConflictResolutionStrategy(new IngestionPrimaryWriterBasedConflictResolutionStrategy()) + .build(); + HoodieCleanConfig cleanConfig = HoodieCleanConfig.newBuilder() + .withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY).withAutoClean(false).build(); + HoodieWriteConfig insertWriteConfig = getConfigBuilder().withAutoCommit(false) + .withCleanConfig(cleanConfig) + .withLockConfig(lockConfig) + .withWriteConcurrencyMode(WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL) + .withProperties(properties) + .build(); + SparkRDDWriteClient client = getHoodieWriteClient(insertWriteConfig); + + // Create a base commit on a file. + int numRecords = 200; + String firstCommit = HoodieActiveTimeline.createNewInstantTime(); + String partitionStr = HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH; + HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator(new String[]{partitionStr}); + writeBatch(client, firstCommit, "000", Option.of(Arrays.asList("000")), "000", + numRecords, dataGenerator::generateInserts, SparkRDDWriteClient::insert, true, numRecords, numRecords, + 1, true); + + // Do an upsert operation without autocommit. + String inflightCommit = HoodieActiveTimeline.createNewInstantTime(); + writeBatch(client, inflightCommit, firstCommit, Option.of(Arrays.asList("000")), "000", + 100, dataGenerator::generateUniqueUpdates, SparkRDDWriteClient::upsert, false, 0, 200, + 2, false); + + // Schedule and execute a clustering plan on the same partition. During conflict resolution the commit should fail. + HoodieClusteringConfig clusteringConfig = HoodieClusteringConfig.newBuilder().withClusteringMaxNumGroups(10) + .withClusteringTargetPartitions(0).withInlineClusteringNumCommits(1) + .build(); + HoodiePreCommitValidatorConfig preCommitValidatorConfig = HoodiePreCommitValidatorConfig.newBuilder() + .withPreCommitValidator(StringUtils.nullToEmpty(SqlQuerySingleResultPreCommitValidator.class.getName())) + .withPrecommitValidatorSingleResultSqlQueries(COUNT_SQL_QUERY_FOR_VALIDATION + "#200") + .build(); + HoodieWriteConfig clusteringWriteConfig = getConfigBuilder().withAutoCommit(false) + .withCleanConfig(cleanConfig) + .withClusteringConfig(clusteringConfig) + .withPreCommitValidatorConfig(preCommitValidatorConfig) + .withLockConfig(lockConfig) + .withWriteConcurrencyMode(WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL) + .withProperties(properties) + .build(); + + // create client with new config. + String clusteringCommitTime = HoodieActiveTimeline.createNewInstantTime();// HoodieActiveTimeline.createNewInstantTime(); + SparkRDDWriteClient clusteringWriteClient = getHoodieWriteClient(clusteringWriteConfig); + + // Schedule and execute clustering, this should fail since there is a conflict between ingestion inflight commit. + clusteringWriteClient.scheduleClusteringAtInstant(clusteringCommitTime, Option.empty()); + assertThrows(HoodieClusteringException.class, () -> clusteringWriteClient.cluster(clusteringCommitTime, true)); + + // Do a rollback on the replacecommit that is failed + clusteringWriteClient.rollback(clusteringCommitTime); + + // Verify the timeline + List instants = metaClient.reloadActiveTimeline().getInstants(); + assertEquals(3, instants.size()); + assertEquals(HoodieActiveTimeline.ROLLBACK_ACTION, instants.get(2).getAction()); + assertEquals(new HoodieInstant(true, HoodieActiveTimeline.COMMIT_ACTION, inflightCommit), instants.get(1)); + } + + @Test + public void testIngestionCommitInPresenceOfCompletedClusteringCommit() throws Exception { + Properties properties = new Properties(); + properties.setProperty(FILESYSTEM_LOCK_PATH_PROP_KEY, basePath + "/.hoodie/.locks"); + HoodieLockConfig lockConfig = HoodieLockConfig.newBuilder() + .withLockProvider(FileSystemBasedLockProviderTestClass.class) + .withConflictResolutionStrategy(new IngestionPrimaryWriterBasedConflictResolutionStrategy()) + .build(); + HoodieCleanConfig cleanConfig = HoodieCleanConfig.newBuilder() + .withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY).withAutoClean(false).build(); + HoodieWriteConfig insertWriteConfig = getConfigBuilder().withAutoCommit(false) + .withCleanConfig(cleanConfig) + .withLockConfig(lockConfig) + .withWriteConcurrencyMode(WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL) + .withProperties(properties) + .build(); + SparkRDDWriteClient client = getHoodieWriteClient(insertWriteConfig); + + // Create a base commit on a file. + int numRecords = 200; + String firstCommit = HoodieActiveTimeline.createNewInstantTime(); + String partitionStr = HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH; + HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator(new String[]{partitionStr}); + writeBatch(client, firstCommit, "000", Option.of(Arrays.asList("000")), "000", + numRecords, dataGenerator::generateInserts, SparkRDDWriteClient::insert, true, numRecords, numRecords, + 1, true); + + // Create and temporarily block a lower timestamp for ingestion. + String inflightCommit = HoodieActiveTimeline.createNewInstantTime(); + JavaRDD ingestionResult = writeBatch(client, inflightCommit, firstCommit, Option.of(Arrays.asList("000")), "000", + 100, dataGenerator::generateUniqueUpdates, SparkRDDWriteClient::upsert, false, 0, 200, + 2, false); + + // Schedule and execute a clustering plan on the same partition. During conflict resolution the commit should fail. + HoodieClusteringConfig clusteringConfig = HoodieClusteringConfig.newBuilder().withClusteringMaxNumGroups(10) + .withClusteringTargetPartitions(0) + .withInlineClusteringNumCommits(1) + .build(); + + // Since it is harder to test corner cases where the ingestion writer is at dedupe step right before the inflight file creation + // and clustering commit is just about to complete, using the default conflict resolutions strategy only for clustering job + // To create a successful commit. + HoodieLockConfig clusteringLockConfig = HoodieLockConfig.newBuilder() + .withLockProvider(FileSystemBasedLockProviderTestClass.class) + .withConflictResolutionStrategy(new SimpleConcurrentFileWritesConflictResolutionStrategy()) + .build(); + HoodiePreCommitValidatorConfig preCommitValidatorConfig = HoodiePreCommitValidatorConfig.newBuilder() + .withPreCommitValidator(StringUtils.nullToEmpty(SqlQuerySingleResultPreCommitValidator.class.getName())) + .withPrecommitValidatorSingleResultSqlQueries(COUNT_SQL_QUERY_FOR_VALIDATION + "#200") + .build(); + HoodieWriteConfig clusteringWriteConfig = getConfigBuilder().withAutoCommit(false) + .withCleanConfig(cleanConfig) + .withClusteringConfig(clusteringConfig) + .withPreCommitValidatorConfig(preCommitValidatorConfig) + .withLockConfig(clusteringLockConfig) + .withWriteConcurrencyMode(WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL) + .withProperties(properties) + .build(); + + // create client with new config. + String clusteringCommitTime = HoodieActiveTimeline.createNewInstantTime(); + SparkRDDWriteClient clusteringWriteClient = getHoodieWriteClient(clusteringWriteConfig); + + // Schedule and execute clustering, this complete successfully. + clusteringWriteClient.scheduleClusteringAtInstant(clusteringCommitTime, Option.empty()); + clusteringWriteClient.cluster(clusteringCommitTime, true); + + // When ingestion writer is committing it should throw an exception. + assertThrows(HoodieWriteConflictException.class, () -> client.commit(inflightCommit, ingestionResult)); + } + /** * Build Hoodie Write Config for small data file sizes. */ diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieDefaultTimeline.java b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieDefaultTimeline.java index c130a19ee53e3..8c4a5cb377e45 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieDefaultTimeline.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieDefaultTimeline.java @@ -204,6 +204,13 @@ public HoodieDefaultTimeline findInstantsInRangeByStateTransitionTime(String sta details); } + @Override + public HoodieDefaultTimeline findInstantsModifiedAfterByStateTransitionTime(String instantTime) { + return new HoodieDefaultTimeline(instants.stream() + .filter(s -> HoodieTimeline.compareTimestamps(s.getStateTransitionTime(), + GREATER_THAN, instantTime) && !s.getTimestamp().equals(instantTime)), details); + } + @Override public HoodieDefaultTimeline findInstantsAfter(String instantTime, int numCommits) { return new HoodieDefaultTimeline(getInstantsAsStream() diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieInstant.java b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieInstant.java index 20dc5ced07ae1..901530b11d6ed 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieInstant.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieInstant.java @@ -19,7 +19,6 @@ package org.apache.hudi.common.table.timeline; import org.apache.hudi.common.util.StringUtils; - import org.apache.hadoop.fs.FileStatus; import java.io.Serializable; diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieTimeline.java b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieTimeline.java index b3c8d2a27a049..2cfcffb623d49 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieTimeline.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieTimeline.java @@ -241,6 +241,11 @@ public interface HoodieTimeline extends Serializable { */ HoodieTimeline findInstantsInRangeByStateTransitionTime(String startTs, String endTs); + /** + * Create new timeline with all instants that were modified after specified time. + */ + HoodieDefaultTimeline findInstantsModifiedAfterByStateTransitionTime(String instantTime); + /** * Create a new Timeline with all the instants after startTs. */ diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/view/HoodieTablePreCommitFileSystemView.java b/hudi-common/src/main/java/org/apache/hudi/common/table/view/HoodieTablePreCommitFileSystemView.java index d8d210e9c3d58..b50b74e65dfd1 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/view/HoodieTablePreCommitFileSystemView.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/view/HoodieTablePreCommitFileSystemView.java @@ -39,7 +39,7 @@ * */ public class HoodieTablePreCommitFileSystemView { - + private Map> partitionToReplaceFileIds; private List filesWritten; private String preCommitInstantTime; diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/ClusteringUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/util/ClusteringUtils.java index fae8362a74469..817e6cdcefa42 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/ClusteringUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/ClusteringUtils.java @@ -73,6 +73,13 @@ public static Stream> getAllPendingClu .filter(Option::isPresent).map(Option::get); } + /** + * Checks if the replacecommit is clustering commit. + */ + public static boolean isClusteringCommit(HoodieTableMetaClient metaClient, HoodieInstant pendingReplaceInstant) { + return getClusteringPlan(metaClient, pendingReplaceInstant).isPresent(); + } + /** * Get requested replace metadata from timeline. * @param metaClient diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/CommitUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/util/CommitUtils.java index 05ec523bd2c19..ed31f79e51809 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/CommitUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/CommitUtils.java @@ -40,6 +40,7 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.stream.Collectors; /** * Helper class to generate commit metadata. @@ -145,6 +146,12 @@ public static Set> getPartitionAndFileIdWithoutSuffix(Map> flattenPartitionToReplaceFileIds(Map> partitionToReplaceFileIds) { + return partitionToReplaceFileIds.entrySet().stream() + .flatMap(partitionFileIds -> partitionFileIds.getValue().stream().map(replaceFileId -> Pair.of(partitionFileIds.getKey(), replaceFileId))) + .collect(Collectors.toSet()); + } + /** * Process previous commits metadata in the timeline to determine the checkpoint given a checkpoint key. * NOTE: This is very similar in intent to DeltaSync#getLatestCommitMetadataWithValidCheckpointInfo except that diff --git a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestTable.java b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestTable.java index ff58cd00c14c5..dc8a067e6bbd1 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestTable.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestTable.java @@ -65,8 +65,6 @@ import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import java.io.IOException; import java.nio.file.Files; @@ -133,7 +131,6 @@ public class HoodieTestTable { public static final String PHONY_TABLE_SCHEMA = "{\"namespace\": \"org.apache.hudi.avro.model\", \"type\": \"record\", \"name\": \"PhonyRecord\", \"fields\": []}"; - private static final Logger LOG = LoggerFactory.getLogger(HoodieTestTable.class); private static final Random RANDOM = new Random(); protected static HoodieTestTableState testTableState;