From 972220b90912d6481234a8842e89494c2aa7d87c Mon Sep 17 00:00:00 2001 From: Sivabalan Narayanan Date: Mon, 1 Nov 2021 08:15:23 -0400 Subject: [PATCH 1/4] Fixing metadata table updates such that only regular writes from data table can trigger table services in metadata table --- .../client/AbstractHoodieWriteClient.java | 2 +- .../HoodieBackedTableMetadataWriter.java | 26 +++++++------- .../metadata/HoodieTableMetadataWriter.java | 4 ++- .../hudi/table/action/BaseActionExecutor.java | 2 +- .../testutils/HoodieMetadataTestTable.java | 12 +++---- .../hudi/client/HoodieFlinkWriteClient.java | 10 +++--- .../FlinkHoodieBackedTableMetadataWriter.java | 8 +++-- .../hudi/client/SparkRDDWriteClient.java | 12 ++++--- .../SparkHoodieBackedTableMetadataWriter.java | 8 +++-- .../functional/TestHoodieBackedMetadata.java | 36 +++++++++++++++++-- 10 files changed, 82 insertions(+), 38 deletions(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java index 3e6b7ab490b7c..83945e8a1a688 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java @@ -251,7 +251,7 @@ protected void preCommit(String instantTime, HoodieCommitMetadata metadata) { // Create a Hoodie table after starting the transaction which encapsulated the commits and files visible. // Important to create this after the lock to ensure latest commits show up in the timeline without need for reload HoodieTable table = createTable(config, hadoopConf); - table.getMetadataWriter().ifPresent(w -> ((HoodieTableMetadataWriter)w).update(metadata, instantTime)); + table.getMetadataWriter().ifPresent(w -> ((HoodieTableMetadataWriter)w).update(metadata, instantTime, false)); } /** diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java index eb0c6ea899bcc..7a21942e3e586 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java @@ -409,7 +409,7 @@ private boolean bootstrapFromFilesystem(HoodieEngineContext engineContext, Hoodi }); LOG.info("Committing " + partitionToFileStatus.size() + " partitions and " + stats[0] + " files to metadata"); - update(commitMetadata, createInstantTime); + update(commitMetadata, createInstantTime, false); return true; } @@ -523,23 +523,24 @@ private interface ConvertMetadataFunction { * @param instantTime instant time of interest. * @param convertMetadataFunction converter function to convert the respective metadata to List of HoodieRecords to be written to metadata table. * @param type of commit metadata. + * @param canTriggerTableService true if table services can be triggered. false otherwise. */ - private void processAndCommit(String instantTime, ConvertMetadataFunction convertMetadataFunction) { + private void processAndCommit(String instantTime, ConvertMetadataFunction convertMetadataFunction, boolean canTriggerTableService) { if (enabled && metadata != null) { List records = convertMetadataFunction.convertMetadata(); - commit(records, MetadataPartitionType.FILES.partitionPath(), instantTime); + commit(records, MetadataPartitionType.FILES.partitionPath(), instantTime, canTriggerTableService); } } /** * Update from {@code HoodieCommitMetadata}. - * * @param commitMetadata {@code HoodieCommitMetadata} * @param instantTime Timestamp at which the commit was performed + * @param isTableService */ @Override - public void update(HoodieCommitMetadata commitMetadata, String instantTime) { - processAndCommit(instantTime, () -> HoodieTableMetadataUtil.convertMetadataToRecords(commitMetadata, instantTime)); + public void update(HoodieCommitMetadata commitMetadata, String instantTime, boolean isTableService) { + processAndCommit(instantTime, () -> HoodieTableMetadataUtil.convertMetadataToRecords(commitMetadata, instantTime), !isTableService); } /** @@ -550,7 +551,8 @@ public void update(HoodieCommitMetadata commitMetadata, String instantTime) { */ @Override public void update(HoodieCleanMetadata cleanMetadata, String instantTime) { - processAndCommit(instantTime, () -> HoodieTableMetadataUtil.convertMetadataToRecords(cleanMetadata, instantTime)); + processAndCommit(instantTime, () -> HoodieTableMetadataUtil.convertMetadataToRecords(cleanMetadata, instantTime), + false); } /** @@ -562,7 +564,7 @@ public void update(HoodieCleanMetadata cleanMetadata, String instantTime) { @Override public void update(HoodieRestoreMetadata restoreMetadata, String instantTime) { processAndCommit(instantTime, () -> HoodieTableMetadataUtil.convertMetadataToRecords(metadataMetaClient.getActiveTimeline(), - restoreMetadata, instantTime, metadata.getSyncedInstantTime())); + restoreMetadata, instantTime, metadata.getSyncedInstantTime()), false); } /** @@ -588,7 +590,7 @@ public void update(HoodieRollbackMetadata rollbackMetadata, String instantTime) List records = HoodieTableMetadataUtil.convertMetadataToRecords(metadataMetaClient.getActiveTimeline(), rollbackMetadata, instantTime, metadata.getSyncedInstantTime(), wasSynced); - commit(records, MetadataPartitionType.FILES.partitionPath(), instantTime); + commit(records, MetadataPartitionType.FILES.partitionPath(), instantTime, false); } } @@ -601,12 +603,12 @@ public void close() throws Exception { /** * Commit the {@code HoodieRecord}s to Metadata Table as a new delta-commit. - * - * @param records The list of records to be written. + * @param records The list of records to be written. * @param partitionName The partition to which the records are to be written. * @param instantTime The timestamp to use for the deltacommit. + * @param canTriggerTableService true if table services can be scheduled and executed. false otherwise. */ - protected abstract void commit(List records, String partitionName, String instantTime); + protected abstract void commit(List records, String partitionName, String instantTime, boolean canTriggerTableService); /** * Perform a compaction on the Metadata Table. diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataWriter.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataWriter.java index f5c4d26d0ce4d..bc12af82a63ce 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataWriter.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataWriter.java @@ -34,8 +34,10 @@ public interface HoodieTableMetadataWriter extends Serializable, AutoCloseable { * Update the metadata table due to a COMMIT operation. * @param commitMetadata commit metadata of the operation of interest. * @param instantTime instant time of the commit. + * @param isTableService true if caller is a table service. false otherwise. Only regular write operations can trigger metadata table services and this argument + * will assist in this. */ - void update(HoodieCommitMetadata commitMetadata, String instantTime); + void update(HoodieCommitMetadata commitMetadata, String instantTime, boolean isTableService); /** * Update the metadata table due to a CLEAN operation. diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/BaseActionExecutor.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/BaseActionExecutor.java index cd32a5bc87307..49c864be889e9 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/BaseActionExecutor.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/BaseActionExecutor.java @@ -57,7 +57,7 @@ public BaseActionExecutor(HoodieEngineContext context, HoodieWriteConfig config, * @param metadata commit metadata of interest. */ protected final void writeTableMetadata(HoodieCommitMetadata metadata) { - table.getMetadataWriter().ifPresent(w -> w.update(metadata, instantTime)); + table.getMetadataWriter().ifPresent(w -> w.update(metadata, instantTime, false)); } /** diff --git a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/common/testutils/HoodieMetadataTestTable.java b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/common/testutils/HoodieMetadataTestTable.java index bbaf073743b7d..fa0f5df61b183 100644 --- a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/common/testutils/HoodieMetadataTestTable.java +++ b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/common/testutils/HoodieMetadataTestTable.java @@ -77,7 +77,7 @@ public HoodieCommitMetadata doWriteOperation(String commitTime, WriteOperationTy HoodieCommitMetadata commitMetadata = super.doWriteOperation(commitTime, operationType, newPartitionsToAdd, partitionToFilesNameLengthMap, bootstrap, createInflightCommit); if (writer != null && !createInflightCommit) { - writer.update(commitMetadata, commitTime); + writer.update(commitMetadata, commitTime, false); } return commitMetadata; } @@ -86,7 +86,7 @@ public HoodieCommitMetadata doWriteOperation(String commitTime, WriteOperationTy public HoodieTestTable moveInflightCommitToComplete(String instantTime, HoodieCommitMetadata metadata) throws IOException { super.moveInflightCommitToComplete(instantTime, metadata); if (writer != null) { - writer.update(metadata, instantTime); + writer.update(metadata, instantTime, false); } return this; } @@ -94,7 +94,7 @@ public HoodieTestTable moveInflightCommitToComplete(String instantTime, HoodieCo public HoodieTestTable moveInflightCommitToComplete(String instantTime, HoodieCommitMetadata metadata, boolean ignoreWriter) throws IOException { super.moveInflightCommitToComplete(instantTime, metadata); if (!ignoreWriter && writer != null) { - writer.update(metadata, instantTime); + writer.update(metadata, instantTime, false); } return this; } @@ -103,7 +103,7 @@ public HoodieTestTable moveInflightCommitToComplete(String instantTime, HoodieCo public HoodieTestTable moveInflightCompactionToComplete(String instantTime, HoodieCommitMetadata metadata) throws IOException { super.moveInflightCompactionToComplete(instantTime, metadata); if (writer != null) { - writer.update(metadata, instantTime); + writer.update(metadata, instantTime, true); } return this; } @@ -120,7 +120,7 @@ public HoodieCleanMetadata doClean(String commitTime, Map parti public HoodieTestTable addCompaction(String instantTime, HoodieCommitMetadata commitMetadata) throws Exception { super.addCompaction(instantTime, commitMetadata); if (writer != null) { - writer.update(commitMetadata, instantTime); + writer.update(commitMetadata, instantTime, true); } return this; } @@ -151,7 +151,7 @@ public HoodieTestTable addReplaceCommit( HoodieReplaceCommitMetadata completeReplaceMetadata) throws Exception { super.addReplaceCommit(instantTime, requestedReplaceMetadata, inflightReplaceMetadata, completeReplaceMetadata); if (writer != null) { - writer.update(completeReplaceMetadata, instantTime); + writer.update(completeReplaceMetadata, instantTime, true); } return this; } diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java index c73de656a8d6c..485ed3ddb73f6 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java @@ -261,7 +261,7 @@ protected void preWrite(String instantTime, WriteOperationType writeOperationTyp protected void preCommit(String instantTime, HoodieCommitMetadata metadata) { this.metadataWriterOption.ifPresent(w -> { w.initTableMetadata(); // refresh the timeline - w.update(metadata, instantTime); + w.update(metadata, instantTime, false); }); } @@ -362,7 +362,8 @@ public void completeCompaction( String compactionCommitTime) { this.context.setJobStatus(this.getClass().getSimpleName(), "Collect compaction write status and commit compaction"); List writeStats = writeStatuses.stream().map(WriteStatus::getStat).collect(Collectors.toList()); - writeTableMetadata(table, metadata, new HoodieInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.COMPACTION_ACTION, compactionCommitTime)); + writeTableMetadata(table, metadata, new HoodieInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.COMPACTION_ACTION, compactionCommitTime), + true); // commit to data table after committing to metadata table. finalizeWrite(table, compactionCommitTime, writeStats); LOG.info("Committing Compaction {} finished with result {}.", compactionCommitTime, metadata); @@ -401,12 +402,13 @@ public HoodieWriteMetadata> cluster(final String clusteringIns private void writeTableMetadata(HoodieTable>, List, List> table, HoodieCommitMetadata commitMetadata, - HoodieInstant hoodieInstant) { + HoodieInstant hoodieInstant, + boolean isTableService) { try { this.txnManager.beginTransaction(Option.of(hoodieInstant), Option.empty()); // Do not do any conflict resolution here as we do with regular writes. We take the lock here to ensure all writes to metadata table happens within a // single lock (single writer). Because more than one write to metadata table will result in conflicts since all of them updates the same partition. - table.getMetadataWriter().ifPresent(w -> w.update(commitMetadata, hoodieInstant.getTimestamp())); + table.getMetadataWriter().ifPresent(w -> w.update(commitMetadata, hoodieInstant.getTimestamp(), isTableService)); } finally { this.txnManager.endTransaction(); } diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java index 9ae3e622d35da..8254d0b884616 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java @@ -90,7 +90,7 @@ protected void initialize(HoodieEngineContext eng } @Override - protected void commit(List records, String partitionName, String instantTime) { + protected void commit(List records, String partitionName, String instantTime, boolean canTriggerTableService) { ValidationUtils.checkState(enabled, "Metadata table cannot be committed to as it is not enabled"); List recordList = prepRecords(records, partitionName, 1); @@ -125,8 +125,10 @@ protected void commit(List records, String partitionName, String i // reload timeline metadataMetaClient.reloadActiveTimeline(); - compactIfNecessary(writeClient, instantTime); - doClean(writeClient, instantTime); + if (canTriggerTableService) { + compactIfNecessary(writeClient, instantTime); + doClean(writeClient, instantTime); + } } // Update total size of the metadata and count of base/log files diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java index 4100b0463e026..8c4584a125c1b 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java @@ -305,7 +305,8 @@ protected void completeCompaction(HoodieCommitMetadata metadata, JavaRDD writeStats = writeStatuses.map(WriteStatus::getStat).collect(); - writeTableMetadata(table, metadata, new HoodieInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.COMPACTION_ACTION, compactionCommitTime)); + writeTableMetadata(table, metadata, new HoodieInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.COMPACTION_ACTION, compactionCommitTime), + true); // commit to data table after committing to metadata table. finalizeWrite(table, compactionCommitTime, writeStats); LOG.info("Committing Compaction " + compactionCommitTime + ". Finished with result " + metadata); @@ -377,7 +378,8 @@ private void completeClustering(HoodieReplaceCommitMetadata metadata, JavaRDD s.getTotalWriteErrors() > 0L).map(s -> s.getFileId()).collect(Collectors.joining(","))); } - writeTableMetadata(table, metadata, new HoodieInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.REPLACE_COMMIT_ACTION, clusteringCommitTime)); + writeTableMetadata(table, metadata, new HoodieInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.REPLACE_COMMIT_ACTION, clusteringCommitTime), + true); finalizeWrite(table, clusteringCommitTime, writeStats); try { LOG.info("Committing Clustering " + clusteringCommitTime + ". Finished with result " + metadata); @@ -403,12 +405,12 @@ private void completeClustering(HoodieReplaceCommitMetadata metadata, JavaRDD>, JavaRDD, JavaRDD> table, HoodieCommitMetadata commitMetadata, - HoodieInstant hoodieInstant) { + HoodieInstant hoodieInstant, boolean isTableService) { try { this.txnManager.beginTransaction(Option.of(hoodieInstant), Option.empty()); // Do not do any conflict resolution here as we do with regular writes. We take the lock here to ensure all writes to metadata table happens within a // single lock (single writer). Because more than one write to metadata table will result in conflicts since all of them updates the same partition. - table.getMetadataWriter().ifPresent(w -> w.update(commitMetadata, hoodieInstant.getTimestamp())); + table.getMetadataWriter().ifPresent(w -> w.update(commitMetadata, hoodieInstant.getTimestamp(), isTableService)); } finally { this.txnManager.endTransaction(); } @@ -480,7 +482,7 @@ protected void preCommit(String instantTime, HoodieCommitMetadata metadata) { HoodieTable table = createTable(config, hadoopConf); TransactionUtils.resolveWriteConflictIfAny(table, this.txnManager.getCurrentTransactionOwner(), Option.of(metadata), config, txnManager.getLastCompletedTransactionOwner()); - table.getMetadataWriter().ifPresent(w -> ((HoodieTableMetadataWriter)w).update(metadata, instantTime)); + table.getMetadataWriter().ifPresent(w -> ((HoodieTableMetadataWriter)w).update(metadata, instantTime, false)); } @Override diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java index e59e195836149..95ab7dc79a202 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java @@ -103,7 +103,7 @@ protected void initialize(HoodieEngineContext eng } @Override - protected void commit(List records, String partitionName, String instantTime) { + protected void commit(List records, String partitionName, String instantTime, boolean canTriggerTableService) { ValidationUtils.checkState(enabled, "Metadata table cannot be committed to as it is not enabled"); JavaRDD recordRDD = prepRecords(records, partitionName, 1); @@ -132,8 +132,10 @@ protected void commit(List records, String partitionName, String i // reload timeline metadataMetaClient.reloadActiveTimeline(); - compactIfNecessary(writeClient, instantTime); - doClean(writeClient, instantTime); + if (canTriggerTableService) { + compactIfNecessary(writeClient, instantTime); + doClean(writeClient, instantTime); + } } // Update total size of the metadata and count of base/log files diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java index de757a0800905..145abf62f6a81 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java @@ -273,6 +273,38 @@ public void testInsertUpsertCluster(HoodieTableType tableType) throws Exception validateMetadata(testTable, emptyList(), true); } + /** + * Tests that table services in data table won't trigger table services in metadata table. + * @throws Exception + */ + @Test + public void testMetadataTableServices() throws Exception { + HoodieTableType tableType = COPY_ON_WRITE; + init(tableType, false); + writeConfig = getWriteConfigBuilder(true, true, false) + .withMetadataConfig(HoodieMetadataConfig.newBuilder() + .enable(true) + .enableFullScan(true) + .enableMetrics(false) + .withMaxNumDeltaCommitsBeforeCompaction(3) // after 3 delta commits for regular writer operations, compaction should kick in. + .build()).build(); + initWriteConfigAndMetatableWriter(writeConfig, true); + + doWriteOperation(testTable, "0000001", INSERT); + //doWriteOperation(testTable, "0000002"); + doCleanAndValidate(testTable, "0000003", Arrays.asList("0000001")); + + HoodieTableMetadata tableMetadata = metadata(writeConfig, context); + // since clean was the last commit, table servives should not get triggered in metadata table. + assertFalse(tableMetadata.getLatestCompactionTime().isPresent()); + + doWriteOperation(testTable, "0000004", UPSERT); + // this should have triggered compaction in metadata table + tableMetadata = metadata(writeConfig, context); + assertTrue(tableMetadata.getLatestCompactionTime().isPresent()); + assertEquals(tableMetadata.getLatestCompactionTime().get(), "0000004001"); + } + /** * Test rollback of various table operations sync to Metadata Table correctly. */ @@ -685,8 +717,8 @@ public void testMultiWriterForDoubleLocking() throws Exception { /** * Lets say clustering commit succeeded in metadata table, but failed before committing to datatable. - * Next time, when clustering kicks in, hudi will rollback pending clustering and re-attempt the clustering with same instant time. - * So, this test ensures the 2nd attempt succeeds with metadata enabled. + * Next time, when clustering kicks in, hudi will rollback pending clustering (in data table) and re-attempt the clustering with same + * instant time. So, this test ensures the 2nd attempt succeeds with metadata enabled. * This is applicable to any table service where instant time is fixed. So, how many ever times the operation fails, re attempt will * be made with same commit time. * Tests uses clustering to test out the scenario. From 1745ace2e2600f758fef7f9c065c0dba21a42589 Mon Sep 17 00:00:00 2001 From: Sivabalan Narayanan Date: Tue, 2 Nov 2021 07:37:00 -0400 Subject: [PATCH 2/4] Fixing test failures in TestHoodieTimelineArchiveLog --- .../hudi/io/TestHoodieTimelineArchiveLog.java | 51 ++++++++++++++----- 1 file changed, 38 insertions(+), 13 deletions(-) diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiveLog.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiveLog.java index 53f0cdde3da86..7cb9740a8c6cc 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiveLog.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiveLog.java @@ -305,18 +305,20 @@ public void testNoArchivalWithInflightCompactionInMiddle(boolean enableMetadata) } } - // move inflight compaction to complete. archival should archive more commits. - // before this move, timeline 2_inflight_compaction, 3,4,5,6,7. - // after this move. 6,7. (2,3,4,5 will be archived) + // move inflight compaction to complete and add one regular write commit. archival should archive more commits. + // an extra one commit is required, bcoz compaction in data table will not trigger table services in metadata table. + // before this move, timeline : 2_inflight_compaction, 3,4,5,6,7. + // after this move: 6,7,8 (2,3,4,5 will be archived) testTable.moveInflightCompactionToComplete("00000002", inflightCompactionMetadata); + testTable.doWriteOperation("00000008", WriteOperationType.UPSERT, Arrays.asList("p1", "p2"), 2); + Pair, List> commitsList = archiveAndGetCommitsList(writeConfig); - List originalCommits = commitsList.getKey(); List commitsAfterArchival = commitsList.getValue(); - List archivedInstants = getAllArchivedCommitInstants(Arrays.asList("00000001", "00000003", "00000004", "00000005"), HoodieTimeline.DELTA_COMMIT_ACTION); + List archivedInstants = getAllArchivedCommitInstants(Arrays.asList("00000001", "00000003", "00000004", "00000005", "00000006"), HoodieTimeline.DELTA_COMMIT_ACTION); archivedInstants.add(new HoodieInstant(State.REQUESTED, HoodieTimeline.COMPACTION_ACTION, "00000002")); archivedInstants.add(new HoodieInstant(State.COMPLETED, HoodieTimeline.COMMIT_ACTION, "00000002")); - verifyArchival(archivedInstants, getActiveCommitInstants(Arrays.asList("00000006", "00000007"), HoodieTimeline.DELTA_COMMIT_ACTION), commitsAfterArchival); + verifyArchival(archivedInstants, getActiveCommitInstants(Arrays.asList("00000007", "00000008"), HoodieTimeline.DELTA_COMMIT_ACTION), commitsAfterArchival); } @Test @@ -379,7 +381,8 @@ public void testConvertCommitMetadata() throws Exception { public void testArchiveTableWithCleanCommits(boolean enableMetadata) throws Exception { HoodieWriteConfig writeConfig = initTestTableAndGetWriteConfig(enableMetadata, 2, 4, 2); - // min archival commits is 2 and max archival commits is 4(either clean commits has to be > 4 or commits has to be greater than 4. and so, after 5th commit, 3 commits will be archived. + // min archival commits is 2 and max archival commits is 4(either clean commits has to be > 4 or commits has to be greater than 4. + // and so, after 5th commit, 3 commits will be archived. // 1,2,3,4,5,6 : after archival -> 1,5,6 (because, 2,3,4,5 and 6 are clean commits and are eligible for archival) // after 7th and 8th commit no-op wrt archival. Map cleanStats = new HashMap<>(); @@ -400,13 +403,35 @@ public void testArchiveTableWithCleanCommits(boolean enableMetadata) throws Exce if (i < 6) { assertEquals(originalCommits, commitsAfterArchival); } else if (i == 6) { - // 1,2,3,4,5,6 : after archival -> 1,5,6 (bcoz, 2,3,4,5 and 6 are clean commits and are eligible for archival) - List expectedActiveInstants = new ArrayList<>(); - expectedActiveInstants.addAll(getActiveCommitInstants(Arrays.asList("00000001"))); - expectedActiveInstants.addAll(getActiveCommitInstants(Arrays.asList("00000005", "00000006"), HoodieTimeline.CLEAN_ACTION)); - verifyArchival(getAllArchivedCommitInstants(Arrays.asList("00000002", "00000003", "00000004"), HoodieTimeline.CLEAN_ACTION), expectedActiveInstants, commitsAfterArchival); + if (!enableMetadata) { + // 1,2,3,4,5,6 : after archival -> 1,5,6 (bcoz, 2,3,4,5 and 6 are clean commits and are eligible for archival) + List expectedActiveInstants = new ArrayList<>(); + expectedActiveInstants.addAll(getActiveCommitInstants(Arrays.asList("00000001"))); + expectedActiveInstants.addAll(getActiveCommitInstants(Arrays.asList("00000005", "00000006"), HoodieTimeline.CLEAN_ACTION)); + verifyArchival(getAllArchivedCommitInstants(Arrays.asList("00000002", "00000003", "00000004"), HoodieTimeline.CLEAN_ACTION), expectedActiveInstants, commitsAfterArchival); + } else { + // with metadata enabled, archival in data table is fenced based on compaction in metadata table. Clean commits in data table will not trigger compaction in + // metadata table. + List expectedActiveInstants = new ArrayList<>(); + expectedActiveInstants.addAll(getActiveCommitInstants(Arrays.asList("00000001"))); + expectedActiveInstants.addAll(getActiveCommitInstants(Arrays.asList("00000002", "00000003", "00000004", "00000005", "00000006"), HoodieTimeline.CLEAN_ACTION)); + verifyArchival(getAllArchivedCommitInstants(Collections.emptyList(), HoodieTimeline.CLEAN_ACTION), expectedActiveInstants, commitsAfterArchival); + } } else { - assertEquals(originalCommits, commitsAfterArchival); + if (!enableMetadata) { + assertEquals(originalCommits, commitsAfterArchival); + } else { + if (i == 7) { + // when i == 7 compaction in metadata table will be triggered and hence archival in datatable will kick in. + // 1,2,3,4,5,6 : after archival -> 1,5,6 (bcoz, 2,3,4,5 and 6 are clean commits and are eligible for archival) + List expectedActiveInstants = new ArrayList<>(); + expectedActiveInstants.addAll(getActiveCommitInstants(Arrays.asList("00000001", "00000007"))); + expectedActiveInstants.addAll(getActiveCommitInstants(Arrays.asList("00000005", "00000006"), HoodieTimeline.CLEAN_ACTION)); + verifyArchival(getAllArchivedCommitInstants(Arrays.asList("00000002", "00000003", "00000004"), HoodieTimeline.CLEAN_ACTION), expectedActiveInstants, commitsAfterArchival); + } else { + assertEquals(originalCommits, commitsAfterArchival); + } + } } } } From 593e91273c600b170ed8a24bda833071ca2a0090 Mon Sep 17 00:00:00 2001 From: Sivabalan Narayanan Date: Sat, 6 Nov 2021 14:45:17 -0400 Subject: [PATCH 3/4] Simplifying detection of table services --- .../hudi/client/AbstractHoodieWriteClient.java | 12 +++++++----- .../java/org/apache/hudi/table/HoodieTable.java | 6 ++++++ .../hudi/table/action/BaseActionExecutor.java | 5 +++-- .../apache/hudi/client/HoodieFlinkWriteClient.java | 12 ++++++------ .../commit/BaseFlinkCommitActionExecutor.java | 2 +- .../commit/BaseJavaCommitActionExecutor.java | 2 +- .../apache/hudi/client/SparkRDDWriteClient.java | 14 +++++++------- .../SparkBootstrapCommitActionExecutor.java | 2 +- .../commit/BaseSparkCommitActionExecutor.java | 2 +- 9 files changed, 33 insertions(+), 24 deletions(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java index 83945e8a1a688..3c460d177a030 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java @@ -184,11 +184,12 @@ public boolean commitStats(String instantTime, List stats, Opti HoodieTable table = createTable(config, hadoopConf); HoodieCommitMetadata metadata = CommitUtils.buildMetadata(stats, partitionToReplaceFileIds, extraMetadata, operationType, config.getWriteSchema(), commitActionType); + HoodieInstant inflightInstant = new HoodieInstant(State.INFLIGHT, table.getMetaClient().getCommitActionType(), instantTime); HeartbeatUtils.abortIfHeartbeatExpired(instantTime, table, heartbeatClient, config); - this.txnManager.beginTransaction(Option.of(new HoodieInstant(State.INFLIGHT, table.getMetaClient().getCommitActionType(), instantTime)), + this.txnManager.beginTransaction(Option.of(inflightInstant), lastCompletedTxnAndMetadata.isPresent() ? Option.of(lastCompletedTxnAndMetadata.get().getLeft()) : Option.empty()); try { - preCommit(instantTime, metadata); + preCommit(inflightInstant, metadata); commit(table, commitActionType, instantTime, metadata, stats); postCommit(table, metadata, instantTime, extraMetadata); LOG.info("Committed " + instantTime); @@ -244,14 +245,15 @@ void emitCommitMetrics(String instantTime, HoodieCommitMetadata metadata, String /** * Any pre-commit actions like conflict resolution or updating metadata table goes here. - * @param instantTime commit instant time. + * @param hoodieInstant hoodie instant of inflight operation. * @param metadata commit metadata for which pre commit is being invoked. */ - protected void preCommit(String instantTime, HoodieCommitMetadata metadata) { + protected void preCommit(HoodieInstant hoodieInstant, HoodieCommitMetadata metadata) { // Create a Hoodie table after starting the transaction which encapsulated the commits and files visible. // Important to create this after the lock to ensure latest commits show up in the timeline without need for reload HoodieTable table = createTable(config, hadoopConf); - table.getMetadataWriter().ifPresent(w -> ((HoodieTableMetadataWriter)w).update(metadata, instantTime, false)); + boolean isTableService = table.isTableService(hoodieInstant.getAction()); + table.getMetadataWriter().ifPresent(w -> ((HoodieTableMetadataWriter)w).update(metadata, hoodieInstant.getTimestamp(), isTableService)); } /** diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java index a6c14e6d2aea3..63faa2fdec4cc 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java @@ -41,6 +41,7 @@ import org.apache.hudi.common.model.HoodieFileFormat; import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodieRecordPayload; +import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.common.model.HoodieWriteStat; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.TableSchemaResolver; @@ -728,6 +729,11 @@ public final Option getMetadataWriter() { return getMetadataWriter(Option.empty()); } + public boolean isTableService(String actionType) { + return !((getMetaClient().getTableType() == HoodieTableType.COPY_ON_WRITE && actionType.equals(HoodieTimeline.COMMIT_ACTION)) + || (getMetaClient().getTableType() == HoodieTableType.MERGE_ON_READ && actionType.equals(HoodieTimeline.DELTA_COMMIT_ACTION))); + } + /** * Get Table metadata writer. * diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/BaseActionExecutor.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/BaseActionExecutor.java index 49c864be889e9..30ab915f7cec0 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/BaseActionExecutor.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/BaseActionExecutor.java @@ -56,8 +56,9 @@ public BaseActionExecutor(HoodieEngineContext context, HoodieWriteConfig config, * Writes commits metadata to table metadata. * @param metadata commit metadata of interest. */ - protected final void writeTableMetadata(HoodieCommitMetadata metadata) { - table.getMetadataWriter().ifPresent(w -> w.update(metadata, instantTime, false)); + protected final void writeTableMetadata(HoodieCommitMetadata metadata, String actionType) { + boolean isTableService = table.isTableService(actionType); + table.getMetadataWriter().ifPresent(w -> w.update(metadata, instantTime, isTableService)); } /** diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java index 485ed3ddb73f6..22b16ce7e1531 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java @@ -258,10 +258,11 @@ protected void preWrite(String instantTime, WriteOperationType writeOperationTyp } @Override - protected void preCommit(String instantTime, HoodieCommitMetadata metadata) { + protected void preCommit(HoodieInstant hoodieInstant, HoodieCommitMetadata metadata) { this.metadataWriterOption.ifPresent(w -> { w.initTableMetadata(); // refresh the timeline - w.update(metadata, instantTime, false); + boolean isTableService = getHoodieTable().isTableService(hoodieInstant.getAction()); + w.update(metadata, hoodieInstant.getTimestamp(), isTableService); }); } @@ -362,8 +363,7 @@ public void completeCompaction( String compactionCommitTime) { this.context.setJobStatus(this.getClass().getSimpleName(), "Collect compaction write status and commit compaction"); List writeStats = writeStatuses.stream().map(WriteStatus::getStat).collect(Collectors.toList()); - writeTableMetadata(table, metadata, new HoodieInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.COMPACTION_ACTION, compactionCommitTime), - true); + writeTableMetadata(table, metadata, new HoodieInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.COMPACTION_ACTION, compactionCommitTime)); // commit to data table after committing to metadata table. finalizeWrite(table, compactionCommitTime, writeStats); LOG.info("Committing Compaction {} finished with result {}.", compactionCommitTime, metadata); @@ -402,12 +402,12 @@ public HoodieWriteMetadata> cluster(final String clusteringIns private void writeTableMetadata(HoodieTable>, List, List> table, HoodieCommitMetadata commitMetadata, - HoodieInstant hoodieInstant, - boolean isTableService) { + HoodieInstant hoodieInstant) { try { this.txnManager.beginTransaction(Option.of(hoodieInstant), Option.empty()); // Do not do any conflict resolution here as we do with regular writes. We take the lock here to ensure all writes to metadata table happens within a // single lock (single writer). Because more than one write to metadata table will result in conflicts since all of them updates the same partition. + boolean isTableService = table.isTableService(hoodieInstant.getAction()); table.getMetadataWriter().ifPresent(w -> w.update(commitMetadata, hoodieInstant.getTimestamp(), isTableService)); } finally { this.txnManager.endTransaction(); diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/BaseFlinkCommitActionExecutor.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/BaseFlinkCommitActionExecutor.java index fce159ec8a408..5dfa511a8823f 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/BaseFlinkCommitActionExecutor.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/BaseFlinkCommitActionExecutor.java @@ -147,7 +147,7 @@ protected void commit(Option> extraMetadata, HoodieWriteMeta HoodieCommitMetadata metadata = CommitUtils.buildMetadata(writeStats, result.getPartitionToReplaceFileIds(), extraMetadata, operationType, getSchemaToStoreInCommit(), getCommitActionType()); - writeTableMetadata(metadata); + writeTableMetadata(metadata, actionType); activeTimeline.saveAsComplete(new HoodieInstant(true, getCommitActionType(), instantTime), Option.of(metadata.toJsonString().getBytes(StandardCharsets.UTF_8))); diff --git a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/BaseJavaCommitActionExecutor.java b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/BaseJavaCommitActionExecutor.java index 79aad595fd997..66cb40758bdc0 100644 --- a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/BaseJavaCommitActionExecutor.java +++ b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/BaseJavaCommitActionExecutor.java @@ -208,7 +208,7 @@ protected void commit(Option> extraMetadata, HoodieWriteMeta HoodieCommitMetadata metadata = CommitUtils.buildMetadata(writeStats, result.getPartitionToReplaceFileIds(), extraMetadata, operationType, getSchemaToStoreInCommit(), getCommitActionType()); - writeTableMetadata(metadata); + writeTableMetadata(metadata, actionType); activeTimeline.saveAsComplete(new HoodieInstant(true, getCommitActionType(), instantTime), Option.of(metadata.toJsonString().getBytes(StandardCharsets.UTF_8))); diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java index 8c4584a125c1b..106b5a261271e 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java @@ -305,8 +305,7 @@ protected void completeCompaction(HoodieCommitMetadata metadata, JavaRDD writeStats = writeStatuses.map(WriteStatus::getStat).collect(); - writeTableMetadata(table, metadata, new HoodieInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.COMPACTION_ACTION, compactionCommitTime), - true); + writeTableMetadata(table, metadata, new HoodieInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.COMPACTION_ACTION, compactionCommitTime)); // commit to data table after committing to metadata table. finalizeWrite(table, compactionCommitTime, writeStats); LOG.info("Committing Compaction " + compactionCommitTime + ". Finished with result " + metadata); @@ -378,8 +377,7 @@ private void completeClustering(HoodieReplaceCommitMetadata metadata, JavaRDD s.getTotalWriteErrors() > 0L).map(s -> s.getFileId()).collect(Collectors.joining(","))); } - writeTableMetadata(table, metadata, new HoodieInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.REPLACE_COMMIT_ACTION, clusteringCommitTime), - true); + writeTableMetadata(table, metadata, new HoodieInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.REPLACE_COMMIT_ACTION, clusteringCommitTime)); finalizeWrite(table, clusteringCommitTime, writeStats); try { LOG.info("Committing Clustering " + clusteringCommitTime + ". Finished with result " + metadata); @@ -405,9 +403,10 @@ private void completeClustering(HoodieReplaceCommitMetadata metadata, JavaRDD>, JavaRDD, JavaRDD> table, HoodieCommitMetadata commitMetadata, - HoodieInstant hoodieInstant, boolean isTableService) { + HoodieInstant hoodieInstant) { try { this.txnManager.beginTransaction(Option.of(hoodieInstant), Option.empty()); + boolean isTableService = table.isTableService(hoodieInstant.getAction()); // Do not do any conflict resolution here as we do with regular writes. We take the lock here to ensure all writes to metadata table happens within a // single lock (single writer). Because more than one write to metadata table will result in conflicts since all of them updates the same partition. table.getMetadataWriter().ifPresent(w -> w.update(commitMetadata, hoodieInstant.getTimestamp(), isTableService)); @@ -476,13 +475,14 @@ private HoodieTable>, JavaRDD, JavaRDD ((HoodieTableMetadataWriter)w).update(metadata, instantTime, false)); + boolean isTableService = table.isTableService(hoodieInstant.getAction()); + table.getMetadataWriter().ifPresent(w -> ((HoodieTableMetadataWriter)w).update(metadata, hoodieInstant.getTimestamp(), isTableService)); } @Override diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/SparkBootstrapCommitActionExecutor.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/SparkBootstrapCommitActionExecutor.java index bdeb041b31479..9ca44f6fc5812 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/SparkBootstrapCommitActionExecutor.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/SparkBootstrapCommitActionExecutor.java @@ -246,7 +246,7 @@ protected void commit(Option> extraMetadata, HoodieWriteMeta metadata.addMetadata(HoodieCommitMetadata.SCHEMA_KEY, getSchemaToStoreInCommit()); metadata.setOperationType(operationType); - writeTableMetadata(metadata); + writeTableMetadata(metadata, actionType); try { activeTimeline.saveAsComplete(new HoodieInstant(true, actionType, instantTime), diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java index 0b673b8907d0a..2bcd6d787a268 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java @@ -267,7 +267,7 @@ protected void commit(Option> extraMetadata, HoodieWriteMeta HoodieActiveTimeline activeTimeline = table.getActiveTimeline(); HoodieCommitMetadata metadata = CommitUtils.buildMetadata(writeStats, result.getPartitionToReplaceFileIds(), extraMetadata, operationType, getSchemaToStoreInCommit(), getCommitActionType()); - writeTableMetadata(metadata); + writeTableMetadata(metadata, actionType); activeTimeline.saveAsComplete(new HoodieInstant(true, getCommitActionType(), instantTime), Option.of(metadata.toJsonString().getBytes(StandardCharsets.UTF_8))); LOG.info("Committed " + instantTime); From 24471e3fbeb0b60332f44f0a9ecb72a98bfccd43 Mon Sep 17 00:00:00 2001 From: Sivabalan Narayanan Date: Sun, 7 Nov 2021 19:23:56 -0500 Subject: [PATCH 4/4] Addressing more comments --- .../apache/hudi/client/AbstractHoodieWriteClient.java | 8 ++++---- .../metadata/HoodieBackedTableMetadataWriter.java | 6 +++--- .../hudi/metadata/HoodieTableMetadataWriter.java | 4 ++-- .../main/java/org/apache/hudi/table/HoodieTable.java | 11 ++++++----- .../apache/hudi/table/action/BaseActionExecutor.java | 3 +-- .../apache/hudi/client/HoodieFlinkWriteClient.java | 8 +++----- .../hudi/table/HoodieFlinkCopyOnWriteTable.java | 6 ++++++ .../hudi/table/HoodieFlinkMergeOnReadTable.java | 6 ++++++ .../apache/hudi/table/HoodieJavaCopyOnWriteTable.java | 6 ++++++ .../apache/hudi/table/HoodieJavaMergeOnReadTable.java | 6 ++++++ .../org/apache/hudi/client/SparkRDDWriteClient.java | 10 +++++----- .../hudi/table/HoodieSparkCopyOnWriteTable.java | 5 +++++ .../hudi/table/HoodieSparkMergeOnReadTable.java | 5 +++++ .../client/functional/TestHoodieBackedMetadata.java | 7 ++++--- 14 files changed, 62 insertions(+), 29 deletions(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java index 3c460d177a030..c1d43b94c2244 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java @@ -245,15 +245,15 @@ void emitCommitMetrics(String instantTime, HoodieCommitMetadata metadata, String /** * Any pre-commit actions like conflict resolution or updating metadata table goes here. - * @param hoodieInstant hoodie instant of inflight operation. + * @param inflightInstant instant of inflight operation. * @param metadata commit metadata for which pre commit is being invoked. */ - protected void preCommit(HoodieInstant hoodieInstant, HoodieCommitMetadata metadata) { + protected void preCommit(HoodieInstant inflightInstant, HoodieCommitMetadata metadata) { // Create a Hoodie table after starting the transaction which encapsulated the commits and files visible. // Important to create this after the lock to ensure latest commits show up in the timeline without need for reload HoodieTable table = createTable(config, hadoopConf); - boolean isTableService = table.isTableService(hoodieInstant.getAction()); - table.getMetadataWriter().ifPresent(w -> ((HoodieTableMetadataWriter)w).update(metadata, hoodieInstant.getTimestamp(), isTableService)); + table.getMetadataWriter().ifPresent(w -> ((HoodieTableMetadataWriter)w).update(metadata, inflightInstant.getTimestamp(), + table.isTableServiceAction(inflightInstant.getAction()))); } /** diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java index 7a21942e3e586..48d6b948c4133 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java @@ -536,11 +536,11 @@ private void processAndCommit(String instantTime, ConvertMetadataFunction co * Update from {@code HoodieCommitMetadata}. * @param commitMetadata {@code HoodieCommitMetadata} * @param instantTime Timestamp at which the commit was performed - * @param isTableService + * @param isTableServiceAction {@code true} if commit metadata is pertaining to a table service. {@code false} otherwise. */ @Override - public void update(HoodieCommitMetadata commitMetadata, String instantTime, boolean isTableService) { - processAndCommit(instantTime, () -> HoodieTableMetadataUtil.convertMetadataToRecords(commitMetadata, instantTime), !isTableService); + public void update(HoodieCommitMetadata commitMetadata, String instantTime, boolean isTableServiceAction) { + processAndCommit(instantTime, () -> HoodieTableMetadataUtil.convertMetadataToRecords(commitMetadata, instantTime), !isTableServiceAction); } /** diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataWriter.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataWriter.java index bc12af82a63ce..4f5ac027c91eb 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataWriter.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataWriter.java @@ -34,10 +34,10 @@ public interface HoodieTableMetadataWriter extends Serializable, AutoCloseable { * Update the metadata table due to a COMMIT operation. * @param commitMetadata commit metadata of the operation of interest. * @param instantTime instant time of the commit. - * @param isTableService true if caller is a table service. false otherwise. Only regular write operations can trigger metadata table services and this argument + * @param isTableServiceAction true if caller is a table service. false otherwise. Only regular write operations can trigger metadata table services and this argument * will assist in this. */ - void update(HoodieCommitMetadata commitMetadata, String instantTime, boolean isTableService); + void update(HoodieCommitMetadata commitMetadata, String instantTime, boolean isTableServiceAction); /** * Update the metadata table due to a CLEAN operation. diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java index 63faa2fdec4cc..abe7f76ffc7a0 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java @@ -41,7 +41,6 @@ import org.apache.hudi.common.model.HoodieFileFormat; import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodieRecordPayload; -import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.common.model.HoodieWriteStat; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.TableSchemaResolver; @@ -729,10 +728,12 @@ public final Option getMetadataWriter() { return getMetadataWriter(Option.empty()); } - public boolean isTableService(String actionType) { - return !((getMetaClient().getTableType() == HoodieTableType.COPY_ON_WRITE && actionType.equals(HoodieTimeline.COMMIT_ACTION)) - || (getMetaClient().getTableType() == HoodieTableType.MERGE_ON_READ && actionType.equals(HoodieTimeline.DELTA_COMMIT_ACTION))); - } + /** + * Check if action type is a table service. + * @param actionType action type of interest. + * @return true if action represents a table service. false otherwise. + */ + public abstract boolean isTableServiceAction(String actionType); /** * Get Table metadata writer. diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/BaseActionExecutor.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/BaseActionExecutor.java index 30ab915f7cec0..a22479b6bf341 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/BaseActionExecutor.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/BaseActionExecutor.java @@ -57,8 +57,7 @@ public BaseActionExecutor(HoodieEngineContext context, HoodieWriteConfig config, * @param metadata commit metadata of interest. */ protected final void writeTableMetadata(HoodieCommitMetadata metadata, String actionType) { - boolean isTableService = table.isTableService(actionType); - table.getMetadataWriter().ifPresent(w -> w.update(metadata, instantTime, isTableService)); + table.getMetadataWriter().ifPresent(w -> w.update(metadata, instantTime, table.isTableServiceAction(actionType))); } /** diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java index 22b16ce7e1531..68ce212527bb6 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java @@ -258,11 +258,10 @@ protected void preWrite(String instantTime, WriteOperationType writeOperationTyp } @Override - protected void preCommit(HoodieInstant hoodieInstant, HoodieCommitMetadata metadata) { + protected void preCommit(HoodieInstant inflightInstant, HoodieCommitMetadata metadata) { this.metadataWriterOption.ifPresent(w -> { w.initTableMetadata(); // refresh the timeline - boolean isTableService = getHoodieTable().isTableService(hoodieInstant.getAction()); - w.update(metadata, hoodieInstant.getTimestamp(), isTableService); + w.update(metadata, inflightInstant.getTimestamp(), getHoodieTable().isTableServiceAction(inflightInstant.getAction())); }); } @@ -407,8 +406,7 @@ private void writeTableMetadata(HoodieTable>, List w.update(commitMetadata, hoodieInstant.getTimestamp(), isTableService)); + table.getMetadataWriter().ifPresent(w -> w.update(commitMetadata, hoodieInstant.getTimestamp(), table.isTableServiceAction(hoodieInstant.getAction()))); } finally { this.txnManager.endTransaction(); } diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java index ae0ced2c819ff..e0b9c50dc938d 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java @@ -34,6 +34,7 @@ import org.apache.hudi.common.model.HoodieRecordPayload; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.timeline.HoodieInstant; +import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.util.Option; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieNotSupportedException; @@ -83,6 +84,11 @@ public HoodieFlinkCopyOnWriteTable(HoodieWriteConfig config, HoodieEngineContext super(config, context, metaClient); } + @Override + public boolean isTableServiceAction(String actionType) { + return !actionType.equals(HoodieTimeline.COMMIT_ACTION); + } + /** * Upsert a batch of new records into Hoodie table at the supplied instantTime. * diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkMergeOnReadTable.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkMergeOnReadTable.java index 56a14da4c3dff..5ad87e0831e97 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkMergeOnReadTable.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkMergeOnReadTable.java @@ -27,6 +27,7 @@ import org.apache.hudi.common.model.HoodieRecordPayload; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.timeline.HoodieInstant; +import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.ValidationUtils; import org.apache.hudi.config.HoodieWriteConfig; @@ -54,6 +55,11 @@ public class HoodieFlinkMergeOnReadTable super(config, context, metaClient); } + @Override + public boolean isTableServiceAction(String actionType) { + return !actionType.equals(HoodieTimeline.DELTA_COMMIT_ACTION); + } + @Override public HoodieWriteMetadata> upsert( HoodieEngineContext context, diff --git a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaCopyOnWriteTable.java b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaCopyOnWriteTable.java index 9d96ca1de99c4..4191497d1d09a 100644 --- a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaCopyOnWriteTable.java +++ b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaCopyOnWriteTable.java @@ -34,6 +34,7 @@ import org.apache.hudi.common.model.HoodieRecordPayload; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.timeline.HoodieInstant; +import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.util.Option; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieNotSupportedException; @@ -65,6 +66,11 @@ protected HoodieJavaCopyOnWriteTable(HoodieWriteConfig config, super(config, context, metaClient); } + @Override + public boolean isTableServiceAction(String actionType) { + return !actionType.equals(HoodieTimeline.COMMIT_ACTION); + } + @Override public HoodieWriteMetadata> upsert(HoodieEngineContext context, String instantTime, diff --git a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaMergeOnReadTable.java b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaMergeOnReadTable.java index a78b71b2402ba..b219ba1a99016 100644 --- a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaMergeOnReadTable.java +++ b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaMergeOnReadTable.java @@ -24,6 +24,7 @@ import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecordPayload; import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.util.Option; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.table.action.HoodieWriteMetadata; @@ -37,6 +38,11 @@ protected HoodieJavaMergeOnReadTable(HoodieWriteConfig config, HoodieEngineConte super(config, context, metaClient); } + @Override + public boolean isTableServiceAction(String actionType) { + return !actionType.equals(HoodieTimeline.DELTA_COMMIT_ACTION); + } + @Override public HoodieWriteMetadata> upsertPrepped(HoodieEngineContext context, String instantTime, diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java index 106b5a261271e..4bfc4c140aeb6 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java @@ -406,10 +406,10 @@ private void writeTableMetadata(HoodieTable>, JavaRDD HoodieInstant hoodieInstant) { try { this.txnManager.beginTransaction(Option.of(hoodieInstant), Option.empty()); - boolean isTableService = table.isTableService(hoodieInstant.getAction()); + boolean isTableServiceAction = table.isTableServiceAction(hoodieInstant.getAction()); // Do not do any conflict resolution here as we do with regular writes. We take the lock here to ensure all writes to metadata table happens within a // single lock (single writer). Because more than one write to metadata table will result in conflicts since all of them updates the same partition. - table.getMetadataWriter().ifPresent(w -> w.update(commitMetadata, hoodieInstant.getTimestamp(), isTableService)); + table.getMetadataWriter().ifPresent(w -> w.update(commitMetadata, hoodieInstant.getTimestamp(), isTableServiceAction)); } finally { this.txnManager.endTransaction(); } @@ -475,14 +475,14 @@ private HoodieTable>, JavaRDD, JavaRDD ((HoodieTableMetadataWriter)w).update(metadata, hoodieInstant.getTimestamp(), isTableService)); + table.getMetadataWriter().ifPresent(w -> ((HoodieTableMetadataWriter)w).update(metadata, inflightInstant.getTimestamp(), + table.isTableServiceAction(inflightInstant.getAction()))); } @Override diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkCopyOnWriteTable.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkCopyOnWriteTable.java index e458d845a817f..516a96349f9b3 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkCopyOnWriteTable.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkCopyOnWriteTable.java @@ -96,6 +96,11 @@ public HoodieSparkCopyOnWriteTable(HoodieWriteConfig config, HoodieEngineContext super(config, context, metaClient); } + @Override + public boolean isTableServiceAction(String actionType) { + return !actionType.equals(HoodieTimeline.COMMIT_ACTION); + } + @Override public HoodieWriteMetadata> upsert(HoodieEngineContext context, String instantTime, JavaRDD> records) { return new SparkUpsertCommitActionExecutor<>((HoodieSparkEngineContext) context, config, this, instantTime, records).execute(); diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkMergeOnReadTable.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkMergeOnReadTable.java index d0bc96924623b..9e053aaa0da44 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkMergeOnReadTable.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkMergeOnReadTable.java @@ -80,6 +80,11 @@ public class HoodieSparkMergeOnReadTable extends super(config, context, metaClient); } + @Override + public boolean isTableServiceAction(String actionType) { + return !actionType.equals(HoodieTimeline.DELTA_COMMIT_ACTION); + } + @Override public HoodieWriteMetadata> upsert(HoodieEngineContext context, String instantTime, JavaRDD> records) { return new SparkUpsertDeltaCommitActionExecutor<>((HoodieSparkEngineContext) context, config, this, instantTime, records).execute(); diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java index 145abf62f6a81..f8c1dfc87f79f 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java @@ -291,7 +291,6 @@ public void testMetadataTableServices() throws Exception { initWriteConfigAndMetatableWriter(writeConfig, true); doWriteOperation(testTable, "0000001", INSERT); - //doWriteOperation(testTable, "0000002"); doCleanAndValidate(testTable, "0000003", Arrays.asList("0000001")); HoodieTableMetadata tableMetadata = metadata(writeConfig, context); @@ -499,7 +498,8 @@ public void testFirstCommitRollback(HoodieTableType tableType) throws Exception init(tableType); HoodieSparkEngineContext engineContext = new HoodieSparkEngineContext(jsc); - try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, getWriteConfig(true, true))) { + try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, + getWriteConfigBuilder(true, true, false).withRollbackUsingMarkers(false).build())) { // Write 1 String commitTime = "0000001"; @@ -533,7 +533,8 @@ public void testTableOperationsWithRestore(HoodieTableType tableType) throws Exc init(tableType); HoodieSparkEngineContext engineContext = new HoodieSparkEngineContext(jsc); - try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, getWriteConfig(true, true))) { + try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, + getWriteConfigBuilder(true, true, false).withRollbackUsingMarkers(false).build())) { // Write 1 (Bulk insert) String newCommitTime = "0000001";