diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/ConcurrentOperation.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/ConcurrentOperation.java index 40da7dca7fcbb..35580229e3867 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/ConcurrentOperation.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/ConcurrentOperation.java @@ -124,14 +124,15 @@ private void init(HoodieInstant instant) { HoodieRequestedReplaceMetadata requestedReplaceMetadata = this.metadataWrapper.getMetadataFromTimeline().getHoodieRequestedReplaceMetadata(); org.apache.hudi.avro.model.HoodieCommitMetadata inflightCommitMetadata = this.metadataWrapper.getMetadataFromTimeline().getHoodieInflightReplaceMetadata(); if (instant.isRequested()) { - if (requestedReplaceMetadata != null) { + // for insert_overwrite/insert_overwrite_table clusteringPlan will be empty + if (requestedReplaceMetadata != null && requestedReplaceMetadata.getClusteringPlan() != null) { this.mutatedFileIds = getFileIdsFromRequestedReplaceMetadata(requestedReplaceMetadata); this.operationType = WriteOperationType.CLUSTER; } } else { if (inflightCommitMetadata != null) { this.mutatedFileIds = getFileIdWithoutSuffixAndRelativePathsFromSpecificRecord(inflightCommitMetadata.getPartitionToWriteStats()).keySet(); - this.operationType = WriteOperationType.fromValue(this.metadataWrapper.getMetadataFromTimeline().getHoodieCommitMetadata().getOperationType()); + this.operationType = WriteOperationType.fromValue(this.metadataWrapper.getMetadataFromTimeline().getHoodieInflightReplaceMetadata().getOperationType()); } else if (requestedReplaceMetadata != null) { // inflight replacecommit metadata is empty due to clustering, read fileIds from requested replacecommit this.mutatedFileIds = getFileIdsFromRequestedReplaceMetadata(requestedReplaceMetadata); diff --git a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestSimpleConcurrentFileWritesConflictResolutionStrategy.java b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestSimpleConcurrentFileWritesConflictResolutionStrategy.java index e7cc296ff6ae4..39b9e1e6dc474 100644 --- a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestSimpleConcurrentFileWritesConflictResolutionStrategy.java +++ b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestSimpleConcurrentFileWritesConflictResolutionStrategy.java @@ -123,6 +123,41 @@ public void testConcurrentWritesWithInterleavingSuccesssfulCommit() throws Excep } } + @Test + public void testConcurrentWritesWithReplaceInflightCommit() throws Exception { + createReplaceInflight(HoodieActiveTimeline.createNewInstantTime()); + HoodieActiveTimeline timeline = metaClient.getActiveTimeline(); + Option lastSuccessfulInstant = Option.empty(); + + // writer 1 starts + String currentWriterInstant = HoodieActiveTimeline.createNewInstantTime(); + createInflightCommit(currentWriterInstant); + Option currentInstant = Option.of(new HoodieInstant(State.INFLIGHT, HoodieTimeline.COMMIT_ACTION, currentWriterInstant)); + + // writer 2 starts and finishes + String newInstantTime = HoodieActiveTimeline.createNewInstantTime(); + createReplaceInflight(newInstantTime); + + SimpleConcurrentFileWritesConflictResolutionStrategy strategy = new SimpleConcurrentFileWritesConflictResolutionStrategy(); + HoodieCommitMetadata currentMetadata = createCommitMetadata(currentWriterInstant); + timeline = timeline.reload(); + + List candidateInstants = strategy.getCandidateInstants(timeline, currentInstant.get(), lastSuccessfulInstant).collect( + Collectors.toList()); + + // writer 1 conflicts with writer 2 + Assertions.assertTrue(candidateInstants.size() == 1); + ConcurrentOperation thatCommitOperation = new ConcurrentOperation(candidateInstants.get(0), metaClient); + ConcurrentOperation thisCommitOperation = new ConcurrentOperation(currentInstant.get(), currentMetadata); + Assertions.assertTrue(strategy.hasConflict(thisCommitOperation, thatCommitOperation)); + try { + strategy.resolveConflict(null, thisCommitOperation, thatCommitOperation); + Assertions.fail("Cannot reach here, writer 1 and writer 2 should have thrown a conflict"); + } catch (HoodieWriteConflictException e) { + // expected + } + } + @Test public void testConcurrentWritesWithInterleavingScheduledCompaction() throws Exception { createCommit(HoodieActiveTimeline.createNewInstantTime()); @@ -394,6 +429,20 @@ private void createReplaceRequested(String instantTime) throws Exception { .withBaseFilesInPartition(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, fileId1, fileId2); } + private void createReplaceInflight(String instantTime) throws Exception { + String fileId1 = "file-1"; + String fileId2 = "file-2"; + + HoodieCommitMetadata inflightReplaceMetadata = new HoodieCommitMetadata(); + inflightReplaceMetadata.setOperationType(WriteOperationType.INSERT_OVERWRITE); + HoodieWriteStat writeStat = new HoodieWriteStat(); + writeStat.setFileId("file-1"); + inflightReplaceMetadata.addWriteStat(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, writeStat); + HoodieTestTable.of(metaClient) + .addInflightReplace(instantTime, Option.of(inflightReplaceMetadata)) + .withBaseFilesInPartition(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, fileId1, fileId2); + } + private void createReplace(String instantTime, WriteOperationType writeOperationType) throws Exception { String fileId1 = "file-1"; String fileId2 = "file-2"; diff --git a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestTable.java b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestTable.java index 31de82b12cbd5..c2531d47c1c8b 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestTable.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestTable.java @@ -284,6 +284,12 @@ public HoodieTestTable addRequestedReplace(String instantTime, Option inflightReplaceMetadata) throws Exception { + createInflightReplaceCommit(basePath, instantTime, inflightReplaceMetadata); + currentInstantTime = instantTime; + return this; + } + public HoodieTestTable addInflightClean(String instantTime, HoodieCleanerPlan cleanerPlan) throws IOException { createRequestedCleanFile(basePath, instantTime, cleanerPlan); createInflightCleanFile(basePath, instantTime, cleanerPlan);