diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanActionExecutor.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanActionExecutor.java index a445fd3cc0907..173010e86672e 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanActionExecutor.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanActionExecutor.java @@ -205,31 +205,19 @@ private HoodieCleanMetadata runClean(HoodieTable table, HoodieInstan Option.of(timer.endTimer()), cleanStats ); - writeMetadata(metadata); + if (!skipLocking) { + this.txnManager.beginTransaction(Option.empty(), Option.empty()); + } + writeTableMetadata(metadata); table.getActiveTimeline().transitionCleanInflightToComplete(inflightInstant, TimelineMetadataUtils.serializeCleanMetadata(metadata)); LOG.info("Marked clean started on " + inflightInstant.getTimestamp() + " as complete"); return metadata; } catch (IOException e) { throw new HoodieIOException("Failed to clean up after commit", e); - } - } - - /** - * Update metadata table if available. Any update to metadata table happens within data table lock. - * @param cleanMetadata instance of {@link HoodieCleanMetadata} to be applied to metadata. - */ - private void writeMetadata(HoodieCleanMetadata cleanMetadata) { - if (config.isMetadataTableEnabled()) { - try { - if (!skipLocking) { - this.txnManager.beginTransaction(Option.empty(), Option.empty()); - } - writeTableMetadata(cleanMetadata); - } finally { - if (!skipLocking) { - this.txnManager.endTransaction(); - } + } finally { + if (!skipLocking) { + this.txnManager.endTransaction(); } } } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/BaseRollbackActionExecutor.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/BaseRollbackActionExecutor.java index ff50a2961eafb..54cb51f03d300 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/BaseRollbackActionExecutor.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/BaseRollbackActionExecutor.java @@ -255,30 +255,18 @@ protected List executeRollback(HoodieInstant instantToRollba protected void finishRollback(HoodieInstant inflightInstant, HoodieRollbackMetadata rollbackMetadata) throws HoodieIOException { try { - writeToMetadata(rollbackMetadata); + if (!skipLocking) { + this.txnManager.beginTransaction(Option.empty(), Option.empty()); + } + writeTableMetadata(rollbackMetadata); table.getActiveTimeline().transitionRollbackInflightToComplete(inflightInstant, TimelineMetadataUtils.serializeRollbackMetadata(rollbackMetadata)); LOG.info("Rollback of Commits " + rollbackMetadata.getCommitsRollback() + " is complete"); } catch (IOException e) { throw new HoodieIOException("Error executing rollback at instant " + instantTime, e); - } - } - - /** - * Update metadata table if available. Any update to metadata table happens within data table lock. - * @param rollbackMetadata instance of {@link HoodieRollbackMetadata} to be applied to metadata. - */ - private void writeToMetadata(HoodieRollbackMetadata rollbackMetadata) { - if (config.isMetadataTableEnabled()) { - try { - if (!skipLocking) { - this.txnManager.beginTransaction(Option.empty(), Option.empty()); - } - writeTableMetadata(rollbackMetadata); - } finally { - if (!skipLocking) { - this.txnManager.endTransaction(); - } + } finally { + if (!skipLocking) { + this.txnManager.endTransaction(); } } } diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java index 1229f58d57182..9f6243099f72b 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java @@ -362,12 +362,19 @@ public void completeCompaction( String compactionCommitTime) { this.context.setJobStatus(this.getClass().getSimpleName(), "Collect compaction write status and commit compaction"); List writeStats = writeStatuses.stream().map(WriteStatus::getStat).collect(Collectors.toList()); - finalizeWrite(table, compactionCommitTime, writeStats); - // commit to data table after committing to metadata table. - writeTableMetadata(table, metadata, new HoodieInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.COMPACTION_ACTION, compactionCommitTime)); - LOG.info("Committing Compaction {} finished with result {}.", compactionCommitTime, metadata); - CompactHelpers.getInstance().completeInflightCompaction(table, compactionCommitTime, metadata); - + try { + HoodieInstant compactionInstant = new HoodieInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.COMPACTION_ACTION, compactionCommitTime); + this.txnManager.beginTransaction(Option.of(compactionInstant), Option.empty()); + finalizeWrite(table, compactionCommitTime, writeStats); + // commit to data table after committing to metadata table. + // Do not do any conflict resolution here as we do with regular writes. We take the lock here to ensure all writes to metadata table happens within a + // single lock (single writer). Because more than one write to metadata table will result in conflicts since all of them updates the same partition. + table.getMetadataWriter().ifPresent(w -> w.update(metadata, compactionInstant.getTimestamp(), table.isTableServiceAction(compactionInstant.getAction()))); + LOG.info("Committing Compaction {} finished with result {}.", compactionCommitTime, metadata); + CompactHelpers.getInstance().completeInflightCompaction(table, compactionCommitTime, metadata); + } finally { + this.txnManager.endTransaction(); + } if (compactionTimer != null) { long durationInMs = metrics.getDurationInMs(compactionTimer.stop()); try { @@ -399,19 +406,6 @@ public HoodieWriteMetadata> cluster(final String clusteringIns throw new HoodieNotSupportedException("Clustering is not supported yet"); } - private void writeTableMetadata(HoodieTable>, List, List> table, - HoodieCommitMetadata commitMetadata, - HoodieInstant hoodieInstant) { - try { - this.txnManager.beginTransaction(Option.of(hoodieInstant), Option.empty()); - // Do not do any conflict resolution here as we do with regular writes. We take the lock here to ensure all writes to metadata table happens within a - // single lock (single writer). Because more than one write to metadata table will result in conflicts since all of them updates the same partition. - table.getMetadataWriter().ifPresent(w -> w.update(commitMetadata, hoodieInstant.getTimestamp(), table.isTableServiceAction(hoodieInstant.getAction()))); - } finally { - this.txnManager.endTransaction(); - } - } - @Override protected HoodieTable>, List, List> getTableAndInitCtx(WriteOperationType operationType, String instantTime) { HoodieTableMetaClient metaClient = createMetaClient(true); diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java index 81da1fbed0aa5..6672028a672f2 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java @@ -313,11 +313,17 @@ protected void completeCompaction(HoodieCommitMetadata metadata, JavaRDD writeStats = writeStatuses.map(WriteStatus::getStat).collect(); - finalizeWrite(table, compactionCommitTime, writeStats); - writeTableMetadataForTableServices(table, metadata, new HoodieInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.COMPACTION_ACTION, compactionCommitTime)); - // commit to data table after committing to metadata table. - LOG.info("Committing Compaction " + compactionCommitTime + ". Finished with result " + metadata); - CompactHelpers.getInstance().completeInflightCompaction(table, compactionCommitTime, metadata); + try { + HoodieInstant compactionInstant = new HoodieInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.COMPACTION_ACTION, compactionCommitTime); + this.txnManager.beginTransaction(Option.of(compactionInstant), Option.empty()); + finalizeWrite(table, compactionCommitTime, writeStats); + // commit to data table after committing to metadata table. + writeTableMetadataForTableServices(table, metadata, compactionInstant); + LOG.info("Committing Compaction " + compactionCommitTime + ". Finished with result " + metadata); + CompactHelpers.getInstance().completeInflightCompaction(table, compactionCommitTime, metadata); + } finally { + this.txnManager.endTransaction(); + } WriteMarkersFactory.get(config.getMarkersType(), table, compactionCommitTime) .quietDeleteMarkerDir(context, config.getMarkersDeleteParallelism()); if (compactionTimer != null) { @@ -385,9 +391,11 @@ private void completeClustering(HoodieReplaceCommitMetadata metadata, JavaRDD s.getTotalWriteErrors() > 0L).map(s -> s.getFileId()).collect(Collectors.joining(","))); } - finalizeWrite(table, clusteringCommitTime, writeStats); - writeTableMetadataForTableServices(table, metadata, new HoodieInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.REPLACE_COMMIT_ACTION, clusteringCommitTime)); try { + HoodieInstant clusteringInstant = new HoodieInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.REPLACE_COMMIT_ACTION, clusteringCommitTime); + this.txnManager.beginTransaction(Option.of(clusteringInstant), Option.empty()); + finalizeWrite(table, clusteringCommitTime, writeStats); + writeTableMetadataForTableServices(table, metadata,clusteringInstant); // try to save statistics info to hudi if (config.isDataSkippingEnabled() && config.isLayoutOptimizationEnabled() && !config.getClusteringSortColumns().isEmpty()) { table.updateStatistics(context, writeStats, clusteringCommitTime, true); @@ -398,6 +406,8 @@ private void completeClustering(HoodieReplaceCommitMetadata metadata, JavaRDD>, JavaRDD, JavaRDD> table, HoodieCommitMetadata commitMetadata, - HoodieInstant hoodieInstant) { - try { - this.txnManager.beginTransaction(Option.of(hoodieInstant), Option.empty()); - boolean isTableServiceAction = table.isTableServiceAction(hoodieInstant.getAction()); - // Do not do any conflict resolution here as we do with regular writes. We take the lock here to ensure all writes to metadata table happens within a - // single lock (single writer). Because more than one write to metadata table will result in conflicts since all of them updates the same partition. - table.getMetadataWriter().ifPresent(w -> w.update(commitMetadata, hoodieInstant.getTimestamp(), isTableServiceAction)); - } finally { - this.txnManager.endTransaction(); - } + HoodieInstant hoodieInstant) { + boolean isTableServiceAction = table.isTableServiceAction(hoodieInstant.getAction()); + // Do not do any conflict resolution here as we do with regular writes. We take the lock here to ensure all writes to metadata table happens within a + // single lock (single writer). Because more than one write to metadata table will result in conflicts since all of them updates the same partition. + table.getMetadataWriter().ifPresent(w -> w.update(commitMetadata, hoodieInstant.getTimestamp(), isTableServiceAction)); } @Override