From 84756c4ee24ced709e431a7d33dae4691a830e2e Mon Sep 17 00:00:00 2001 From: liujinhui1994 <965147871@qq.com> Date: Mon, 18 Jul 2022 15:17:27 +0800 Subject: [PATCH 1/4] HUDI-4412 --- .../apache/hudi/client/transaction/ConcurrentOperation.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/ConcurrentOperation.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/ConcurrentOperation.java index 40da7dca7fcbb..35580229e3867 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/ConcurrentOperation.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/ConcurrentOperation.java @@ -124,14 +124,15 @@ private void init(HoodieInstant instant) { HoodieRequestedReplaceMetadata requestedReplaceMetadata = this.metadataWrapper.getMetadataFromTimeline().getHoodieRequestedReplaceMetadata(); org.apache.hudi.avro.model.HoodieCommitMetadata inflightCommitMetadata = this.metadataWrapper.getMetadataFromTimeline().getHoodieInflightReplaceMetadata(); if (instant.isRequested()) { - if (requestedReplaceMetadata != null) { + // for insert_overwrite/insert_overwrite_table clusteringPlan will be empty + if (requestedReplaceMetadata != null && requestedReplaceMetadata.getClusteringPlan() != null) { this.mutatedFileIds = getFileIdsFromRequestedReplaceMetadata(requestedReplaceMetadata); this.operationType = WriteOperationType.CLUSTER; } } else { if (inflightCommitMetadata != null) { this.mutatedFileIds = getFileIdWithoutSuffixAndRelativePathsFromSpecificRecord(inflightCommitMetadata.getPartitionToWriteStats()).keySet(); - this.operationType = WriteOperationType.fromValue(this.metadataWrapper.getMetadataFromTimeline().getHoodieCommitMetadata().getOperationType()); + this.operationType = WriteOperationType.fromValue(this.metadataWrapper.getMetadataFromTimeline().getHoodieInflightReplaceMetadata().getOperationType()); } else if (requestedReplaceMetadata != null) { // inflight replacecommit metadata is empty due to clustering, read fileIds from requested replacecommit this.mutatedFileIds = getFileIdsFromRequestedReplaceMetadata(requestedReplaceMetadata); From 59a1fd190a6522a403ab8d1a3ddaba4c94ab6e97 Mon Sep 17 00:00:00 2001 From: liujinhui1994 <965147871@qq.com> Date: Thu, 21 Jul 2022 14:12:27 +0800 Subject: [PATCH 2/4] =?UTF-8?q?=E2=80=98=C3=A2=E2=80=98=C3=A2rebuild?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit From 99d5c38a4bc0f3cdb46ca42d59e0fa64df3141f9 Mon Sep 17 00:00:00 2001 From: liujinhui1994 <965147871@qq.com> Date: Thu, 21 Jul 2022 14:12:54 +0800 Subject: [PATCH 3/4] =?UTF-8?q?=E2=80=98=C3=A2=E2=80=98=C3=A2rebuild?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit From 639e47e513791f0fa66804b241f99077434b7a95 Mon Sep 17 00:00:00 2001 From: liujinhui1994 <965147871@qq.com> Date: Mon, 25 Jul 2022 16:50:46 +0800 Subject: [PATCH 4/4] add test --- ...tFileWritesConflictResolutionStrategy.java | 49 +++++++++++++++++++ .../common/testutils/HoodieTestTable.java | 6 +++ 2 files changed, 55 insertions(+) diff --git a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestSimpleConcurrentFileWritesConflictResolutionStrategy.java b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestSimpleConcurrentFileWritesConflictResolutionStrategy.java index e7cc296ff6ae4..39b9e1e6dc474 100644 --- a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestSimpleConcurrentFileWritesConflictResolutionStrategy.java +++ b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestSimpleConcurrentFileWritesConflictResolutionStrategy.java @@ -123,6 +123,41 @@ public void testConcurrentWritesWithInterleavingSuccesssfulCommit() throws Excep } } + @Test + public void testConcurrentWritesWithReplaceInflightCommit() throws Exception { + createReplaceInflight(HoodieActiveTimeline.createNewInstantTime()); + HoodieActiveTimeline timeline = metaClient.getActiveTimeline(); + Option lastSuccessfulInstant = Option.empty(); + + // writer 1 starts + String currentWriterInstant = HoodieActiveTimeline.createNewInstantTime(); + createInflightCommit(currentWriterInstant); + Option currentInstant = Option.of(new HoodieInstant(State.INFLIGHT, HoodieTimeline.COMMIT_ACTION, currentWriterInstant)); + + // writer 2 starts and finishes + String newInstantTime = HoodieActiveTimeline.createNewInstantTime(); + createReplaceInflight(newInstantTime); + + SimpleConcurrentFileWritesConflictResolutionStrategy strategy = new SimpleConcurrentFileWritesConflictResolutionStrategy(); + HoodieCommitMetadata currentMetadata = createCommitMetadata(currentWriterInstant); + timeline = timeline.reload(); + + List candidateInstants = strategy.getCandidateInstants(timeline, currentInstant.get(), lastSuccessfulInstant).collect( + Collectors.toList()); + + // writer 1 conflicts with writer 2 + Assertions.assertTrue(candidateInstants.size() == 1); + ConcurrentOperation thatCommitOperation = new ConcurrentOperation(candidateInstants.get(0), metaClient); + ConcurrentOperation thisCommitOperation = new ConcurrentOperation(currentInstant.get(), currentMetadata); + Assertions.assertTrue(strategy.hasConflict(thisCommitOperation, thatCommitOperation)); + try { + strategy.resolveConflict(null, thisCommitOperation, thatCommitOperation); + Assertions.fail("Cannot reach here, writer 1 and writer 2 should have thrown a conflict"); + } catch (HoodieWriteConflictException e) { + // expected + } + } + @Test public void testConcurrentWritesWithInterleavingScheduledCompaction() throws Exception { createCommit(HoodieActiveTimeline.createNewInstantTime()); @@ -394,6 +429,20 @@ private void createReplaceRequested(String instantTime) throws Exception { .withBaseFilesInPartition(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, fileId1, fileId2); } + private void createReplaceInflight(String instantTime) throws Exception { + String fileId1 = "file-1"; + String fileId2 = "file-2"; + + HoodieCommitMetadata inflightReplaceMetadata = new HoodieCommitMetadata(); + inflightReplaceMetadata.setOperationType(WriteOperationType.INSERT_OVERWRITE); + HoodieWriteStat writeStat = new HoodieWriteStat(); + writeStat.setFileId("file-1"); + inflightReplaceMetadata.addWriteStat(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, writeStat); + HoodieTestTable.of(metaClient) + .addInflightReplace(instantTime, Option.of(inflightReplaceMetadata)) + .withBaseFilesInPartition(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, fileId1, fileId2); + } + private void createReplace(String instantTime, WriteOperationType writeOperationType) throws Exception { String fileId1 = "file-1"; String fileId2 = "file-2"; diff --git a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestTable.java b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestTable.java index 31de82b12cbd5..c2531d47c1c8b 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestTable.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestTable.java @@ -284,6 +284,12 @@ public HoodieTestTable addRequestedReplace(String instantTime, Option inflightReplaceMetadata) throws Exception { + createInflightReplaceCommit(basePath, instantTime, inflightReplaceMetadata); + currentInstantTime = instantTime; + return this; + } + public HoodieTestTable addInflightClean(String instantTime, HoodieCleanerPlan cleanerPlan) throws IOException { createRequestedCleanFile(basePath, instantTime, cleanerPlan); createInflightCleanFile(basePath, instantTime, cleanerPlan);