diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java index 3e6b7ab490b7c..c1d43b94c2244 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java @@ -184,11 +184,12 @@ public boolean commitStats(String instantTime, List stats, Opti HoodieTable table = createTable(config, hadoopConf); HoodieCommitMetadata metadata = CommitUtils.buildMetadata(stats, partitionToReplaceFileIds, extraMetadata, operationType, config.getWriteSchema(), commitActionType); + HoodieInstant inflightInstant = new HoodieInstant(State.INFLIGHT, table.getMetaClient().getCommitActionType(), instantTime); HeartbeatUtils.abortIfHeartbeatExpired(instantTime, table, heartbeatClient, config); - this.txnManager.beginTransaction(Option.of(new HoodieInstant(State.INFLIGHT, table.getMetaClient().getCommitActionType(), instantTime)), + this.txnManager.beginTransaction(Option.of(inflightInstant), lastCompletedTxnAndMetadata.isPresent() ? Option.of(lastCompletedTxnAndMetadata.get().getLeft()) : Option.empty()); try { - preCommit(instantTime, metadata); + preCommit(inflightInstant, metadata); commit(table, commitActionType, instantTime, metadata, stats); postCommit(table, metadata, instantTime, extraMetadata); LOG.info("Committed " + instantTime); @@ -244,14 +245,15 @@ void emitCommitMetrics(String instantTime, HoodieCommitMetadata metadata, String /** * Any pre-commit actions like conflict resolution or updating metadata table goes here. - * @param instantTime commit instant time. + * @param inflightInstant instant of inflight operation. * @param metadata commit metadata for which pre commit is being invoked. */ - protected void preCommit(String instantTime, HoodieCommitMetadata metadata) { + protected void preCommit(HoodieInstant inflightInstant, HoodieCommitMetadata metadata) { // Create a Hoodie table after starting the transaction which encapsulated the commits and files visible. // Important to create this after the lock to ensure latest commits show up in the timeline without need for reload HoodieTable table = createTable(config, hadoopConf); - table.getMetadataWriter().ifPresent(w -> ((HoodieTableMetadataWriter)w).update(metadata, instantTime)); + table.getMetadataWriter().ifPresent(w -> ((HoodieTableMetadataWriter)w).update(metadata, inflightInstant.getTimestamp(), + table.isTableServiceAction(inflightInstant.getAction()))); } /** diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java index eb0c6ea899bcc..48d6b948c4133 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java @@ -409,7 +409,7 @@ private boolean bootstrapFromFilesystem(HoodieEngineContext engineContext, Hoodi }); LOG.info("Committing " + partitionToFileStatus.size() + " partitions and " + stats[0] + " files to metadata"); - update(commitMetadata, createInstantTime); + update(commitMetadata, createInstantTime, false); return true; } @@ -523,23 +523,24 @@ private interface ConvertMetadataFunction { * @param instantTime instant time of interest. * @param convertMetadataFunction converter function to convert the respective metadata to List of HoodieRecords to be written to metadata table. * @param type of commit metadata. + * @param canTriggerTableService true if table services can be triggered. false otherwise. */ - private void processAndCommit(String instantTime, ConvertMetadataFunction convertMetadataFunction) { + private void processAndCommit(String instantTime, ConvertMetadataFunction convertMetadataFunction, boolean canTriggerTableService) { if (enabled && metadata != null) { List records = convertMetadataFunction.convertMetadata(); - commit(records, MetadataPartitionType.FILES.partitionPath(), instantTime); + commit(records, MetadataPartitionType.FILES.partitionPath(), instantTime, canTriggerTableService); } } /** * Update from {@code HoodieCommitMetadata}. - * * @param commitMetadata {@code HoodieCommitMetadata} * @param instantTime Timestamp at which the commit was performed + * @param isTableServiceAction {@code true} if commit metadata is pertaining to a table service. {@code false} otherwise. */ @Override - public void update(HoodieCommitMetadata commitMetadata, String instantTime) { - processAndCommit(instantTime, () -> HoodieTableMetadataUtil.convertMetadataToRecords(commitMetadata, instantTime)); + public void update(HoodieCommitMetadata commitMetadata, String instantTime, boolean isTableServiceAction) { + processAndCommit(instantTime, () -> HoodieTableMetadataUtil.convertMetadataToRecords(commitMetadata, instantTime), !isTableServiceAction); } /** @@ -550,7 +551,8 @@ public void update(HoodieCommitMetadata commitMetadata, String instantTime) { */ @Override public void update(HoodieCleanMetadata cleanMetadata, String instantTime) { - processAndCommit(instantTime, () -> HoodieTableMetadataUtil.convertMetadataToRecords(cleanMetadata, instantTime)); + processAndCommit(instantTime, () -> HoodieTableMetadataUtil.convertMetadataToRecords(cleanMetadata, instantTime), + false); } /** @@ -562,7 +564,7 @@ public void update(HoodieCleanMetadata cleanMetadata, String instantTime) { @Override public void update(HoodieRestoreMetadata restoreMetadata, String instantTime) { processAndCommit(instantTime, () -> HoodieTableMetadataUtil.convertMetadataToRecords(metadataMetaClient.getActiveTimeline(), - restoreMetadata, instantTime, metadata.getSyncedInstantTime())); + restoreMetadata, instantTime, metadata.getSyncedInstantTime()), false); } /** @@ -588,7 +590,7 @@ public void update(HoodieRollbackMetadata rollbackMetadata, String instantTime) List records = HoodieTableMetadataUtil.convertMetadataToRecords(metadataMetaClient.getActiveTimeline(), rollbackMetadata, instantTime, metadata.getSyncedInstantTime(), wasSynced); - commit(records, MetadataPartitionType.FILES.partitionPath(), instantTime); + commit(records, MetadataPartitionType.FILES.partitionPath(), instantTime, false); } } @@ -601,12 +603,12 @@ public void close() throws Exception { /** * Commit the {@code HoodieRecord}s to Metadata Table as a new delta-commit. - * - * @param records The list of records to be written. + * @param records The list of records to be written. * @param partitionName The partition to which the records are to be written. * @param instantTime The timestamp to use for the deltacommit. + * @param canTriggerTableService true if table services can be scheduled and executed. false otherwise. */ - protected abstract void commit(List records, String partitionName, String instantTime); + protected abstract void commit(List records, String partitionName, String instantTime, boolean canTriggerTableService); /** * Perform a compaction on the Metadata Table. diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataWriter.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataWriter.java index f5c4d26d0ce4d..4f5ac027c91eb 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataWriter.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataWriter.java @@ -34,8 +34,10 @@ public interface HoodieTableMetadataWriter extends Serializable, AutoCloseable { * Update the metadata table due to a COMMIT operation. * @param commitMetadata commit metadata of the operation of interest. * @param instantTime instant time of the commit. + * @param isTableServiceAction true if caller is a table service. false otherwise. Only regular write operations can trigger metadata table services and this argument + * will assist in this. */ - void update(HoodieCommitMetadata commitMetadata, String instantTime); + void update(HoodieCommitMetadata commitMetadata, String instantTime, boolean isTableServiceAction); /** * Update the metadata table due to a CLEAN operation. diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java index a6c14e6d2aea3..abe7f76ffc7a0 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java @@ -728,6 +728,13 @@ public final Option getMetadataWriter() { return getMetadataWriter(Option.empty()); } + /** + * Check if action type is a table service. + * @param actionType action type of interest. + * @return true if action represents a table service. false otherwise. + */ + public abstract boolean isTableServiceAction(String actionType); + /** * Get Table metadata writer. * diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/BaseActionExecutor.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/BaseActionExecutor.java index cd32a5bc87307..a22479b6bf341 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/BaseActionExecutor.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/BaseActionExecutor.java @@ -56,8 +56,8 @@ public BaseActionExecutor(HoodieEngineContext context, HoodieWriteConfig config, * Writes commits metadata to table metadata. * @param metadata commit metadata of interest. */ - protected final void writeTableMetadata(HoodieCommitMetadata metadata) { - table.getMetadataWriter().ifPresent(w -> w.update(metadata, instantTime)); + protected final void writeTableMetadata(HoodieCommitMetadata metadata, String actionType) { + table.getMetadataWriter().ifPresent(w -> w.update(metadata, instantTime, table.isTableServiceAction(actionType))); } /** diff --git a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/common/testutils/HoodieMetadataTestTable.java b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/common/testutils/HoodieMetadataTestTable.java index bbaf073743b7d..fa0f5df61b183 100644 --- a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/common/testutils/HoodieMetadataTestTable.java +++ b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/common/testutils/HoodieMetadataTestTable.java @@ -77,7 +77,7 @@ public HoodieCommitMetadata doWriteOperation(String commitTime, WriteOperationTy HoodieCommitMetadata commitMetadata = super.doWriteOperation(commitTime, operationType, newPartitionsToAdd, partitionToFilesNameLengthMap, bootstrap, createInflightCommit); if (writer != null && !createInflightCommit) { - writer.update(commitMetadata, commitTime); + writer.update(commitMetadata, commitTime, false); } return commitMetadata; } @@ -86,7 +86,7 @@ public HoodieCommitMetadata doWriteOperation(String commitTime, WriteOperationTy public HoodieTestTable moveInflightCommitToComplete(String instantTime, HoodieCommitMetadata metadata) throws IOException { super.moveInflightCommitToComplete(instantTime, metadata); if (writer != null) { - writer.update(metadata, instantTime); + writer.update(metadata, instantTime, false); } return this; } @@ -94,7 +94,7 @@ public HoodieTestTable moveInflightCommitToComplete(String instantTime, HoodieCo public HoodieTestTable moveInflightCommitToComplete(String instantTime, HoodieCommitMetadata metadata, boolean ignoreWriter) throws IOException { super.moveInflightCommitToComplete(instantTime, metadata); if (!ignoreWriter && writer != null) { - writer.update(metadata, instantTime); + writer.update(metadata, instantTime, false); } return this; } @@ -103,7 +103,7 @@ public HoodieTestTable moveInflightCommitToComplete(String instantTime, HoodieCo public HoodieTestTable moveInflightCompactionToComplete(String instantTime, HoodieCommitMetadata metadata) throws IOException { super.moveInflightCompactionToComplete(instantTime, metadata); if (writer != null) { - writer.update(metadata, instantTime); + writer.update(metadata, instantTime, true); } return this; } @@ -120,7 +120,7 @@ public HoodieCleanMetadata doClean(String commitTime, Map parti public HoodieTestTable addCompaction(String instantTime, HoodieCommitMetadata commitMetadata) throws Exception { super.addCompaction(instantTime, commitMetadata); if (writer != null) { - writer.update(commitMetadata, instantTime); + writer.update(commitMetadata, instantTime, true); } return this; } @@ -151,7 +151,7 @@ public HoodieTestTable addReplaceCommit( HoodieReplaceCommitMetadata completeReplaceMetadata) throws Exception { super.addReplaceCommit(instantTime, requestedReplaceMetadata, inflightReplaceMetadata, completeReplaceMetadata); if (writer != null) { - writer.update(completeReplaceMetadata, instantTime); + writer.update(completeReplaceMetadata, instantTime, true); } return this; } diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java index c73de656a8d6c..68ce212527bb6 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java @@ -258,10 +258,10 @@ protected void preWrite(String instantTime, WriteOperationType writeOperationTyp } @Override - protected void preCommit(String instantTime, HoodieCommitMetadata metadata) { + protected void preCommit(HoodieInstant inflightInstant, HoodieCommitMetadata metadata) { this.metadataWriterOption.ifPresent(w -> { w.initTableMetadata(); // refresh the timeline - w.update(metadata, instantTime); + w.update(metadata, inflightInstant.getTimestamp(), getHoodieTable().isTableServiceAction(inflightInstant.getAction())); }); } @@ -406,7 +406,7 @@ private void writeTableMetadata(HoodieTable>, List w.update(commitMetadata, hoodieInstant.getTimestamp())); + table.getMetadataWriter().ifPresent(w -> w.update(commitMetadata, hoodieInstant.getTimestamp(), table.isTableServiceAction(hoodieInstant.getAction()))); } finally { this.txnManager.endTransaction(); } diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java index 9ae3e622d35da..8254d0b884616 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java @@ -90,7 +90,7 @@ protected void initialize(HoodieEngineContext eng } @Override - protected void commit(List records, String partitionName, String instantTime) { + protected void commit(List records, String partitionName, String instantTime, boolean canTriggerTableService) { ValidationUtils.checkState(enabled, "Metadata table cannot be committed to as it is not enabled"); List recordList = prepRecords(records, partitionName, 1); @@ -125,8 +125,10 @@ protected void commit(List records, String partitionName, String i // reload timeline metadataMetaClient.reloadActiveTimeline(); - compactIfNecessary(writeClient, instantTime); - doClean(writeClient, instantTime); + if (canTriggerTableService) { + compactIfNecessary(writeClient, instantTime); + doClean(writeClient, instantTime); + } } // Update total size of the metadata and count of base/log files diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java index ae0ced2c819ff..e0b9c50dc938d 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java @@ -34,6 +34,7 @@ import org.apache.hudi.common.model.HoodieRecordPayload; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.timeline.HoodieInstant; +import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.util.Option; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieNotSupportedException; @@ -83,6 +84,11 @@ public HoodieFlinkCopyOnWriteTable(HoodieWriteConfig config, HoodieEngineContext super(config, context, metaClient); } + @Override + public boolean isTableServiceAction(String actionType) { + return !actionType.equals(HoodieTimeline.COMMIT_ACTION); + } + /** * Upsert a batch of new records into Hoodie table at the supplied instantTime. * diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkMergeOnReadTable.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkMergeOnReadTable.java index 56a14da4c3dff..5ad87e0831e97 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkMergeOnReadTable.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkMergeOnReadTable.java @@ -27,6 +27,7 @@ import org.apache.hudi.common.model.HoodieRecordPayload; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.timeline.HoodieInstant; +import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.ValidationUtils; import org.apache.hudi.config.HoodieWriteConfig; @@ -54,6 +55,11 @@ public class HoodieFlinkMergeOnReadTable super(config, context, metaClient); } + @Override + public boolean isTableServiceAction(String actionType) { + return !actionType.equals(HoodieTimeline.DELTA_COMMIT_ACTION); + } + @Override public HoodieWriteMetadata> upsert( HoodieEngineContext context, diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/BaseFlinkCommitActionExecutor.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/BaseFlinkCommitActionExecutor.java index fce159ec8a408..5dfa511a8823f 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/BaseFlinkCommitActionExecutor.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/BaseFlinkCommitActionExecutor.java @@ -147,7 +147,7 @@ protected void commit(Option> extraMetadata, HoodieWriteMeta HoodieCommitMetadata metadata = CommitUtils.buildMetadata(writeStats, result.getPartitionToReplaceFileIds(), extraMetadata, operationType, getSchemaToStoreInCommit(), getCommitActionType()); - writeTableMetadata(metadata); + writeTableMetadata(metadata, actionType); activeTimeline.saveAsComplete(new HoodieInstant(true, getCommitActionType(), instantTime), Option.of(metadata.toJsonString().getBytes(StandardCharsets.UTF_8))); diff --git a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaCopyOnWriteTable.java b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaCopyOnWriteTable.java index 9d96ca1de99c4..4191497d1d09a 100644 --- a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaCopyOnWriteTable.java +++ b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaCopyOnWriteTable.java @@ -34,6 +34,7 @@ import org.apache.hudi.common.model.HoodieRecordPayload; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.timeline.HoodieInstant; +import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.util.Option; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieNotSupportedException; @@ -65,6 +66,11 @@ protected HoodieJavaCopyOnWriteTable(HoodieWriteConfig config, super(config, context, metaClient); } + @Override + public boolean isTableServiceAction(String actionType) { + return !actionType.equals(HoodieTimeline.COMMIT_ACTION); + } + @Override public HoodieWriteMetadata> upsert(HoodieEngineContext context, String instantTime, diff --git a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaMergeOnReadTable.java b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaMergeOnReadTable.java index a78b71b2402ba..b219ba1a99016 100644 --- a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaMergeOnReadTable.java +++ b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaMergeOnReadTable.java @@ -24,6 +24,7 @@ import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecordPayload; import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.util.Option; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.table.action.HoodieWriteMetadata; @@ -37,6 +38,11 @@ protected HoodieJavaMergeOnReadTable(HoodieWriteConfig config, HoodieEngineConte super(config, context, metaClient); } + @Override + public boolean isTableServiceAction(String actionType) { + return !actionType.equals(HoodieTimeline.DELTA_COMMIT_ACTION); + } + @Override public HoodieWriteMetadata> upsertPrepped(HoodieEngineContext context, String instantTime, diff --git a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/BaseJavaCommitActionExecutor.java b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/BaseJavaCommitActionExecutor.java index 79aad595fd997..66cb40758bdc0 100644 --- a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/BaseJavaCommitActionExecutor.java +++ b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/BaseJavaCommitActionExecutor.java @@ -208,7 +208,7 @@ protected void commit(Option> extraMetadata, HoodieWriteMeta HoodieCommitMetadata metadata = CommitUtils.buildMetadata(writeStats, result.getPartitionToReplaceFileIds(), extraMetadata, operationType, getSchemaToStoreInCommit(), getCommitActionType()); - writeTableMetadata(metadata); + writeTableMetadata(metadata, actionType); activeTimeline.saveAsComplete(new HoodieInstant(true, getCommitActionType(), instantTime), Option.of(metadata.toJsonString().getBytes(StandardCharsets.UTF_8))); diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java index 4100b0463e026..4bfc4c140aeb6 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java @@ -406,9 +406,10 @@ private void writeTableMetadata(HoodieTable>, JavaRDD HoodieInstant hoodieInstant) { try { this.txnManager.beginTransaction(Option.of(hoodieInstant), Option.empty()); + boolean isTableServiceAction = table.isTableServiceAction(hoodieInstant.getAction()); // Do not do any conflict resolution here as we do with regular writes. We take the lock here to ensure all writes to metadata table happens within a // single lock (single writer). Because more than one write to metadata table will result in conflicts since all of them updates the same partition. - table.getMetadataWriter().ifPresent(w -> w.update(commitMetadata, hoodieInstant.getTimestamp())); + table.getMetadataWriter().ifPresent(w -> w.update(commitMetadata, hoodieInstant.getTimestamp(), isTableServiceAction)); } finally { this.txnManager.endTransaction(); } @@ -474,13 +475,14 @@ private HoodieTable>, JavaRDD, JavaRDD ((HoodieTableMetadataWriter)w).update(metadata, instantTime)); + table.getMetadataWriter().ifPresent(w -> ((HoodieTableMetadataWriter)w).update(metadata, inflightInstant.getTimestamp(), + table.isTableServiceAction(inflightInstant.getAction()))); } @Override diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java index e59e195836149..95ab7dc79a202 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java @@ -103,7 +103,7 @@ protected void initialize(HoodieEngineContext eng } @Override - protected void commit(List records, String partitionName, String instantTime) { + protected void commit(List records, String partitionName, String instantTime, boolean canTriggerTableService) { ValidationUtils.checkState(enabled, "Metadata table cannot be committed to as it is not enabled"); JavaRDD recordRDD = prepRecords(records, partitionName, 1); @@ -132,8 +132,10 @@ protected void commit(List records, String partitionName, String i // reload timeline metadataMetaClient.reloadActiveTimeline(); - compactIfNecessary(writeClient, instantTime); - doClean(writeClient, instantTime); + if (canTriggerTableService) { + compactIfNecessary(writeClient, instantTime); + doClean(writeClient, instantTime); + } } // Update total size of the metadata and count of base/log files diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkCopyOnWriteTable.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkCopyOnWriteTable.java index e458d845a817f..516a96349f9b3 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkCopyOnWriteTable.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkCopyOnWriteTable.java @@ -96,6 +96,11 @@ public HoodieSparkCopyOnWriteTable(HoodieWriteConfig config, HoodieEngineContext super(config, context, metaClient); } + @Override + public boolean isTableServiceAction(String actionType) { + return !actionType.equals(HoodieTimeline.COMMIT_ACTION); + } + @Override public HoodieWriteMetadata> upsert(HoodieEngineContext context, String instantTime, JavaRDD> records) { return new SparkUpsertCommitActionExecutor<>((HoodieSparkEngineContext) context, config, this, instantTime, records).execute(); diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkMergeOnReadTable.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkMergeOnReadTable.java index d0bc96924623b..9e053aaa0da44 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkMergeOnReadTable.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkMergeOnReadTable.java @@ -80,6 +80,11 @@ public class HoodieSparkMergeOnReadTable extends super(config, context, metaClient); } + @Override + public boolean isTableServiceAction(String actionType) { + return !actionType.equals(HoodieTimeline.DELTA_COMMIT_ACTION); + } + @Override public HoodieWriteMetadata> upsert(HoodieEngineContext context, String instantTime, JavaRDD> records) { return new SparkUpsertDeltaCommitActionExecutor<>((HoodieSparkEngineContext) context, config, this, instantTime, records).execute(); diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/SparkBootstrapCommitActionExecutor.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/SparkBootstrapCommitActionExecutor.java index bdeb041b31479..9ca44f6fc5812 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/SparkBootstrapCommitActionExecutor.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/SparkBootstrapCommitActionExecutor.java @@ -246,7 +246,7 @@ protected void commit(Option> extraMetadata, HoodieWriteMeta metadata.addMetadata(HoodieCommitMetadata.SCHEMA_KEY, getSchemaToStoreInCommit()); metadata.setOperationType(operationType); - writeTableMetadata(metadata); + writeTableMetadata(metadata, actionType); try { activeTimeline.saveAsComplete(new HoodieInstant(true, actionType, instantTime), diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java index 0b673b8907d0a..2bcd6d787a268 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java @@ -267,7 +267,7 @@ protected void commit(Option> extraMetadata, HoodieWriteMeta HoodieActiveTimeline activeTimeline = table.getActiveTimeline(); HoodieCommitMetadata metadata = CommitUtils.buildMetadata(writeStats, result.getPartitionToReplaceFileIds(), extraMetadata, operationType, getSchemaToStoreInCommit(), getCommitActionType()); - writeTableMetadata(metadata); + writeTableMetadata(metadata, actionType); activeTimeline.saveAsComplete(new HoodieInstant(true, getCommitActionType(), instantTime), Option.of(metadata.toJsonString().getBytes(StandardCharsets.UTF_8))); LOG.info("Committed " + instantTime); diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java index de757a0800905..f8c1dfc87f79f 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java @@ -273,6 +273,37 @@ public void testInsertUpsertCluster(HoodieTableType tableType) throws Exception validateMetadata(testTable, emptyList(), true); } + /** + * Tests that table services in data table won't trigger table services in metadata table. + * @throws Exception + */ + @Test + public void testMetadataTableServices() throws Exception { + HoodieTableType tableType = COPY_ON_WRITE; + init(tableType, false); + writeConfig = getWriteConfigBuilder(true, true, false) + .withMetadataConfig(HoodieMetadataConfig.newBuilder() + .enable(true) + .enableFullScan(true) + .enableMetrics(false) + .withMaxNumDeltaCommitsBeforeCompaction(3) // after 3 delta commits for regular writer operations, compaction should kick in. + .build()).build(); + initWriteConfigAndMetatableWriter(writeConfig, true); + + doWriteOperation(testTable, "0000001", INSERT); + doCleanAndValidate(testTable, "0000003", Arrays.asList("0000001")); + + HoodieTableMetadata tableMetadata = metadata(writeConfig, context); + // since clean was the last commit, table servives should not get triggered in metadata table. + assertFalse(tableMetadata.getLatestCompactionTime().isPresent()); + + doWriteOperation(testTable, "0000004", UPSERT); + // this should have triggered compaction in metadata table + tableMetadata = metadata(writeConfig, context); + assertTrue(tableMetadata.getLatestCompactionTime().isPresent()); + assertEquals(tableMetadata.getLatestCompactionTime().get(), "0000004001"); + } + /** * Test rollback of various table operations sync to Metadata Table correctly. */ @@ -467,7 +498,8 @@ public void testFirstCommitRollback(HoodieTableType tableType) throws Exception init(tableType); HoodieSparkEngineContext engineContext = new HoodieSparkEngineContext(jsc); - try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, getWriteConfig(true, true))) { + try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, + getWriteConfigBuilder(true, true, false).withRollbackUsingMarkers(false).build())) { // Write 1 String commitTime = "0000001"; @@ -501,7 +533,8 @@ public void testTableOperationsWithRestore(HoodieTableType tableType) throws Exc init(tableType); HoodieSparkEngineContext engineContext = new HoodieSparkEngineContext(jsc); - try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, getWriteConfig(true, true))) { + try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, + getWriteConfigBuilder(true, true, false).withRollbackUsingMarkers(false).build())) { // Write 1 (Bulk insert) String newCommitTime = "0000001"; @@ -685,8 +718,8 @@ public void testMultiWriterForDoubleLocking() throws Exception { /** * Lets say clustering commit succeeded in metadata table, but failed before committing to datatable. - * Next time, when clustering kicks in, hudi will rollback pending clustering and re-attempt the clustering with same instant time. - * So, this test ensures the 2nd attempt succeeds with metadata enabled. + * Next time, when clustering kicks in, hudi will rollback pending clustering (in data table) and re-attempt the clustering with same + * instant time. So, this test ensures the 2nd attempt succeeds with metadata enabled. * This is applicable to any table service where instant time is fixed. So, how many ever times the operation fails, re attempt will * be made with same commit time. * Tests uses clustering to test out the scenario. diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiveLog.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiveLog.java index 53f0cdde3da86..7cb9740a8c6cc 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiveLog.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiveLog.java @@ -305,18 +305,20 @@ public void testNoArchivalWithInflightCompactionInMiddle(boolean enableMetadata) } } - // move inflight compaction to complete. archival should archive more commits. - // before this move, timeline 2_inflight_compaction, 3,4,5,6,7. - // after this move. 6,7. (2,3,4,5 will be archived) + // move inflight compaction to complete and add one regular write commit. archival should archive more commits. + // an extra one commit is required, bcoz compaction in data table will not trigger table services in metadata table. + // before this move, timeline : 2_inflight_compaction, 3,4,5,6,7. + // after this move: 6,7,8 (2,3,4,5 will be archived) testTable.moveInflightCompactionToComplete("00000002", inflightCompactionMetadata); + testTable.doWriteOperation("00000008", WriteOperationType.UPSERT, Arrays.asList("p1", "p2"), 2); + Pair, List> commitsList = archiveAndGetCommitsList(writeConfig); - List originalCommits = commitsList.getKey(); List commitsAfterArchival = commitsList.getValue(); - List archivedInstants = getAllArchivedCommitInstants(Arrays.asList("00000001", "00000003", "00000004", "00000005"), HoodieTimeline.DELTA_COMMIT_ACTION); + List archivedInstants = getAllArchivedCommitInstants(Arrays.asList("00000001", "00000003", "00000004", "00000005", "00000006"), HoodieTimeline.DELTA_COMMIT_ACTION); archivedInstants.add(new HoodieInstant(State.REQUESTED, HoodieTimeline.COMPACTION_ACTION, "00000002")); archivedInstants.add(new HoodieInstant(State.COMPLETED, HoodieTimeline.COMMIT_ACTION, "00000002")); - verifyArchival(archivedInstants, getActiveCommitInstants(Arrays.asList("00000006", "00000007"), HoodieTimeline.DELTA_COMMIT_ACTION), commitsAfterArchival); + verifyArchival(archivedInstants, getActiveCommitInstants(Arrays.asList("00000007", "00000008"), HoodieTimeline.DELTA_COMMIT_ACTION), commitsAfterArchival); } @Test @@ -379,7 +381,8 @@ public void testConvertCommitMetadata() throws Exception { public void testArchiveTableWithCleanCommits(boolean enableMetadata) throws Exception { HoodieWriteConfig writeConfig = initTestTableAndGetWriteConfig(enableMetadata, 2, 4, 2); - // min archival commits is 2 and max archival commits is 4(either clean commits has to be > 4 or commits has to be greater than 4. and so, after 5th commit, 3 commits will be archived. + // min archival commits is 2 and max archival commits is 4(either clean commits has to be > 4 or commits has to be greater than 4. + // and so, after 5th commit, 3 commits will be archived. // 1,2,3,4,5,6 : after archival -> 1,5,6 (because, 2,3,4,5 and 6 are clean commits and are eligible for archival) // after 7th and 8th commit no-op wrt archival. Map cleanStats = new HashMap<>(); @@ -400,13 +403,35 @@ public void testArchiveTableWithCleanCommits(boolean enableMetadata) throws Exce if (i < 6) { assertEquals(originalCommits, commitsAfterArchival); } else if (i == 6) { - // 1,2,3,4,5,6 : after archival -> 1,5,6 (bcoz, 2,3,4,5 and 6 are clean commits and are eligible for archival) - List expectedActiveInstants = new ArrayList<>(); - expectedActiveInstants.addAll(getActiveCommitInstants(Arrays.asList("00000001"))); - expectedActiveInstants.addAll(getActiveCommitInstants(Arrays.asList("00000005", "00000006"), HoodieTimeline.CLEAN_ACTION)); - verifyArchival(getAllArchivedCommitInstants(Arrays.asList("00000002", "00000003", "00000004"), HoodieTimeline.CLEAN_ACTION), expectedActiveInstants, commitsAfterArchival); + if (!enableMetadata) { + // 1,2,3,4,5,6 : after archival -> 1,5,6 (bcoz, 2,3,4,5 and 6 are clean commits and are eligible for archival) + List expectedActiveInstants = new ArrayList<>(); + expectedActiveInstants.addAll(getActiveCommitInstants(Arrays.asList("00000001"))); + expectedActiveInstants.addAll(getActiveCommitInstants(Arrays.asList("00000005", "00000006"), HoodieTimeline.CLEAN_ACTION)); + verifyArchival(getAllArchivedCommitInstants(Arrays.asList("00000002", "00000003", "00000004"), HoodieTimeline.CLEAN_ACTION), expectedActiveInstants, commitsAfterArchival); + } else { + // with metadata enabled, archival in data table is fenced based on compaction in metadata table. Clean commits in data table will not trigger compaction in + // metadata table. + List expectedActiveInstants = new ArrayList<>(); + expectedActiveInstants.addAll(getActiveCommitInstants(Arrays.asList("00000001"))); + expectedActiveInstants.addAll(getActiveCommitInstants(Arrays.asList("00000002", "00000003", "00000004", "00000005", "00000006"), HoodieTimeline.CLEAN_ACTION)); + verifyArchival(getAllArchivedCommitInstants(Collections.emptyList(), HoodieTimeline.CLEAN_ACTION), expectedActiveInstants, commitsAfterArchival); + } } else { - assertEquals(originalCommits, commitsAfterArchival); + if (!enableMetadata) { + assertEquals(originalCommits, commitsAfterArchival); + } else { + if (i == 7) { + // when i == 7 compaction in metadata table will be triggered and hence archival in datatable will kick in. + // 1,2,3,4,5,6 : after archival -> 1,5,6 (bcoz, 2,3,4,5 and 6 are clean commits and are eligible for archival) + List expectedActiveInstants = new ArrayList<>(); + expectedActiveInstants.addAll(getActiveCommitInstants(Arrays.asList("00000001", "00000007"))); + expectedActiveInstants.addAll(getActiveCommitInstants(Arrays.asList("00000005", "00000006"), HoodieTimeline.CLEAN_ACTION)); + verifyArchival(getAllArchivedCommitInstants(Arrays.asList("00000002", "00000003", "00000004"), HoodieTimeline.CLEAN_ACTION), expectedActiveInstants, commitsAfterArchival); + } else { + assertEquals(originalCommits, commitsAfterArchival); + } + } } } }