diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java index 112b811100480..789cb4bc16d06 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java @@ -783,9 +783,14 @@ protected void archive(HoodieTable table) { return; } try { + final Timer.Context timerContext = metrics.getArchiveCtx(); // We cannot have unbounded commit files. Archive commits if we have to archive HoodieTimelineArchiver archiver = new HoodieTimelineArchiver(config, table); - archiver.archiveIfRequired(context, true); + int instantsToArchive = archiver.archiveIfRequired(context, true); + if (timerContext != null) { + long durationMs = metrics.getDurationInMs(timerContext.stop()); + this.metrics.updateArchiveMetrics(durationMs, instantsToArchive); + } } catch (IOException ioe) { throw new HoodieIOException("Failed to archive", ioe); } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/timeline/HoodieTimelineArchiver.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/timeline/HoodieTimelineArchiver.java index 3277039f31bd5..cb0b748dd5d70 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/timeline/HoodieTimelineArchiver.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/timeline/HoodieTimelineArchiver.java @@ -37,6 +37,7 @@ import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieException; import org.apache.hudi.metadata.HoodieTableMetadata; +import org.apache.hudi.metrics.HoodieMetrics; import org.apache.hudi.table.HoodieTable; import org.apache.hudi.table.action.compact.CompactionTriggerStrategy; import org.apache.hudi.table.marker.WriteMarkers; @@ -74,6 +75,7 @@ public class HoodieTimelineArchiver { private final TransactionManager txnManager; private final LSMTimelineWriter timelineWriter; + private final HoodieMetrics metrics; public HoodieTimelineArchiver(HoodieWriteConfig config, HoodieTable table) { this.config = config; @@ -84,16 +86,17 @@ public HoodieTimelineArchiver(HoodieWriteConfig config, HoodieTable Pair minAndMaxInstants = getMinAndMaxInstantsToKeep(table, metaClient); this.minInstantsToKeep = minAndMaxInstants.getLeft(); this.maxInstantsToKeep = minAndMaxInstants.getRight(); + this.metrics = new HoodieMetrics(config); } - public boolean archiveIfRequired(HoodieEngineContext context) throws IOException { + public int archiveIfRequired(HoodieEngineContext context) throws IOException { return archiveIfRequired(context, false); } /** * Check if commits need to be archived. If yes, archive commits. */ - public boolean archiveIfRequired(HoodieEngineContext context, boolean acquireLock) throws IOException { + public int archiveIfRequired(HoodieEngineContext context, boolean acquireLock) throws IOException { try { if (acquireLock) { // there is no owner or instant time per se for archival. @@ -101,7 +104,6 @@ public boolean archiveIfRequired(HoodieEngineContext context, boolean acquireLoc } // Sort again because the cleaning and rollback instants could break the sequence. List instantsToArchive = getInstantsToArchive().sorted().collect(Collectors.toList()); - boolean success = true; if (!instantsToArchive.isEmpty()) { LOG.info("Archiving instants " + instantsToArchive); Consumer exceptionHandler = e -> { @@ -111,13 +113,13 @@ public boolean archiveIfRequired(HoodieEngineContext context, boolean acquireLoc }; this.timelineWriter.write(instantsToArchive, Option.of(action -> deleteAnyLeftOverMarkers(context, action)), Option.of(exceptionHandler)); LOG.info("Deleting archived instants " + instantsToArchive); - success = deleteArchivedInstants(instantsToArchive, context); + deleteArchivedInstants(instantsToArchive, context); // triggers compaction and cleaning only after archiving action this.timelineWriter.compactAndClean(context); } else { LOG.info("No Instants to archive"); } - return success; + return instantsToArchive.size(); } finally { if (acquireLock) { txnManager.endTransaction(Option.empty()); diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/HoodieMetrics.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/HoodieMetrics.java index 792d0cd084421..1fc23cf1e8de9 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/HoodieMetrics.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/HoodieMetrics.java @@ -53,11 +53,28 @@ public class HoodieMetrics { public static final String TOTAL_RECORDS_DELETED = "totalRecordsDeleted"; public static final String TOTAL_CORRUPTED_LOG_BLOCKS_STR = "totalCorruptedLogBlocks"; public static final String TOTAL_ROLLBACK_LOG_BLOCKS_STR = "totalRollbackLogBlocks"; - + public static final String DURATION_STR = "duration"; + public static final String DELETE_FILES_NUM_STR = "numFilesDeleted"; + public static final String DELETE_INSTANTS_NUM_STR = "numInstantsArchived"; + public static final String FINALIZED_FILES_NUM_STR = "numFilesFinalized"; + public static final String CONFLICT_RESOLUTION_STR = "conflict_resolution"; + public static final String COMMIT_LATENCY_IN_MS_STR = "commitLatencyInMs"; + public static final String COMMIT_FRESHNESS_IN_MS_STR = "commitFreshnessInMs"; + public static final String COMMIT_TIME_STR = "commitTime"; + public static final String SUCCESS_EXTENSION = ".success"; + public static final String FAILURE_EXTENSION = ".failure"; + + public static final String TIMER_ACTION = "timer"; + public static final String COUNTER_ACTION = "counter"; + public static final String ARCHIVE_ACTION = "archive"; + public static final String FINALIZE_ACTION = "finalize"; + public static final String INDEX_ACTION = "index"; + private Metrics metrics; // Some timers public String rollbackTimerName = null; public String cleanTimerName = null; + public String archiveTimerName = null; public String commitTimerName = null; public String logCompactionTimerName = null; public String deltaCommitTimerName = null; @@ -74,6 +91,7 @@ public class HoodieMetrics { private String tableName; private Timer rollbackTimer = null; private Timer cleanTimer = null; + private Timer archiveTimer = null; private Timer commitTimer = null; private Timer deltaCommitTimer = null; private Timer finalizeTimer = null; @@ -92,20 +110,21 @@ public HoodieMetrics(HoodieWriteConfig config) { this.tableName = config.getTableName(); if (config.isMetricsOn()) { metrics = Metrics.getInstance(config); - this.rollbackTimerName = getMetricsName("timer", HoodieTimeline.ROLLBACK_ACTION); - this.cleanTimerName = getMetricsName("timer", HoodieTimeline.CLEAN_ACTION); - this.commitTimerName = getMetricsName("timer", HoodieTimeline.COMMIT_ACTION); - this.deltaCommitTimerName = getMetricsName("timer", HoodieTimeline.DELTA_COMMIT_ACTION); - this.replaceCommitTimerName = getMetricsName("timer", HoodieTimeline.REPLACE_COMMIT_ACTION); - this.finalizeTimerName = getMetricsName("timer", "finalize"); - this.compactionTimerName = getMetricsName("timer", HoodieTimeline.COMPACTION_ACTION); - this.logCompactionTimerName = getMetricsName("timer", HoodieTimeline.LOG_COMPACTION_ACTION); - this.indexTimerName = getMetricsName("timer", "index"); - this.conflictResolutionTimerName = getMetricsName("timer", "conflict_resolution"); - this.conflictResolutionSuccessCounterName = getMetricsName("counter", "conflict_resolution.success"); - this.conflictResolutionFailureCounterName = getMetricsName("counter", "conflict_resolution.failure"); - this.compactionRequestedCounterName = getMetricsName("counter", "compaction.requested"); - this.compactionCompletedCounterName = getMetricsName("counter", "compaction.completed"); + this.rollbackTimerName = getMetricsName(TIMER_ACTION, HoodieTimeline.ROLLBACK_ACTION); + this.cleanTimerName = getMetricsName(TIMER_ACTION, HoodieTimeline.CLEAN_ACTION); + this.archiveTimerName = getMetricsName(TIMER_ACTION, ARCHIVE_ACTION); + this.commitTimerName = getMetricsName(TIMER_ACTION, HoodieTimeline.COMMIT_ACTION); + this.deltaCommitTimerName = getMetricsName(TIMER_ACTION, HoodieTimeline.DELTA_COMMIT_ACTION); + this.replaceCommitTimerName = getMetricsName(TIMER_ACTION, HoodieTimeline.REPLACE_COMMIT_ACTION); + this.finalizeTimerName = getMetricsName(TIMER_ACTION, FINALIZE_ACTION); + this.compactionTimerName = getMetricsName(TIMER_ACTION, HoodieTimeline.COMPACTION_ACTION); + this.logCompactionTimerName = getMetricsName(TIMER_ACTION, HoodieTimeline.LOG_COMPACTION_ACTION); + this.indexTimerName = getMetricsName(TIMER_ACTION, INDEX_ACTION); + this.conflictResolutionTimerName = getMetricsName(TIMER_ACTION, CONFLICT_RESOLUTION_STR); + this.conflictResolutionSuccessCounterName = getMetricsName(COUNTER_ACTION, CONFLICT_RESOLUTION_STR + SUCCESS_EXTENSION); + this.conflictResolutionFailureCounterName = getMetricsName(COUNTER_ACTION, CONFLICT_RESOLUTION_STR + FAILURE_EXTENSION); + this.compactionRequestedCounterName = getMetricsName(COUNTER_ACTION, HoodieTimeline.COMPACTION_ACTION + HoodieTimeline.REQUESTED_EXTENSION); + this.compactionCompletedCounterName = getMetricsName(COUNTER_ACTION, HoodieTimeline.COMPACTION_ACTION + HoodieTimeline.COMPLETED_EXTENSION); } } @@ -152,6 +171,13 @@ public Timer.Context getCleanCtx() { return cleanTimer == null ? null : cleanTimer.time(); } + public Timer.Context getArchiveCtx() { + if (config.isMetricsOn() && archiveTimer == null) { + archiveTimer = createTimer(archiveTimerName); + } + return archiveTimer == null ? null : archiveTimer.time(); + } + public Timer.Context getCommitCtx() { if (config.isMetricsOn() && commitTimer == null) { commitTimer = createTimer(commitTimerName); @@ -255,48 +281,60 @@ private void updateCommitTimingMetrics(long commitEpochTimeInMs, long durationIn Pair, Option> eventTimePairMinMax = metadata.getMinAndMaxEventTime(); if (eventTimePairMinMax.getLeft().isPresent()) { long commitLatencyInMs = commitEpochTimeInMs + durationInMs - eventTimePairMinMax.getLeft().get(); - metrics.registerGauge(getMetricsName(actionType, "commitLatencyInMs"), commitLatencyInMs); + metrics.registerGauge(getMetricsName(actionType, COMMIT_LATENCY_IN_MS_STR), commitLatencyInMs); } if (eventTimePairMinMax.getRight().isPresent()) { long commitFreshnessInMs = commitEpochTimeInMs + durationInMs - eventTimePairMinMax.getRight().get(); - metrics.registerGauge(getMetricsName(actionType, "commitFreshnessInMs"), commitFreshnessInMs); + metrics.registerGauge(getMetricsName(actionType, COMMIT_FRESHNESS_IN_MS_STR), commitFreshnessInMs); } - metrics.registerGauge(getMetricsName(actionType, "commitTime"), commitEpochTimeInMs); - metrics.registerGauge(getMetricsName(actionType, "duration"), durationInMs); + metrics.registerGauge(getMetricsName(actionType, COMMIT_TIME_STR), commitEpochTimeInMs); + metrics.registerGauge(getMetricsName(actionType, DURATION_STR), durationInMs); } } public void updateRollbackMetrics(long durationInMs, long numFilesDeleted) { if (config.isMetricsOn()) { LOG.info( - String.format("Sending rollback metrics (duration=%d, numFilesDeleted=%d)", durationInMs, numFilesDeleted)); - metrics.registerGauge(getMetricsName("rollback", "duration"), durationInMs); - metrics.registerGauge(getMetricsName("rollback", "numFilesDeleted"), numFilesDeleted); + String.format("Sending rollback metrics (%s=%d, %s=%d)", DURATION_STR, durationInMs, + DELETE_FILES_NUM_STR, numFilesDeleted)); + metrics.registerGauge(getMetricsName(HoodieTimeline.ROLLBACK_ACTION, DURATION_STR), durationInMs); + metrics.registerGauge(getMetricsName(HoodieTimeline.ROLLBACK_ACTION, DELETE_FILES_NUM_STR), numFilesDeleted); } } public void updateCleanMetrics(long durationInMs, int numFilesDeleted) { if (config.isMetricsOn()) { LOG.info( - String.format("Sending clean metrics (duration=%d, numFilesDeleted=%d)", durationInMs, numFilesDeleted)); - metrics.registerGauge(getMetricsName("clean", "duration"), durationInMs); - metrics.registerGauge(getMetricsName("clean", "numFilesDeleted"), numFilesDeleted); + String.format("Sending clean metrics (%s=%d, %s=%d)", DURATION_STR, durationInMs, + DELETE_FILES_NUM_STR, numFilesDeleted)); + metrics.registerGauge(getMetricsName(HoodieTimeline.CLEAN_ACTION, DURATION_STR), durationInMs); + metrics.registerGauge(getMetricsName(HoodieTimeline.CLEAN_ACTION, DELETE_FILES_NUM_STR), numFilesDeleted); + } + } + + public void updateArchiveMetrics(long durationInMs, int numInstantsArchived) { + if (config.isMetricsOn()) { + LOG.info( + String.format("Sending archive metrics (%s=%d, %s=%d)", DURATION_STR, durationInMs, + DELETE_INSTANTS_NUM_STR, numInstantsArchived)); + metrics.registerGauge(getMetricsName(ARCHIVE_ACTION, DURATION_STR), durationInMs); + metrics.registerGauge(getMetricsName(ARCHIVE_ACTION, DELETE_INSTANTS_NUM_STR), numInstantsArchived); } } public void updateFinalizeWriteMetrics(long durationInMs, long numFilesFinalized) { if (config.isMetricsOn()) { - LOG.info(String.format("Sending finalize write metrics (duration=%d, numFilesFinalized=%d)", durationInMs, - numFilesFinalized)); - metrics.registerGauge(getMetricsName("finalize", "duration"), durationInMs); - metrics.registerGauge(getMetricsName("finalize", "numFilesFinalized"), numFilesFinalized); + LOG.info(String.format("Sending finalize write metrics (%s=%d, %s=%d)", DURATION_STR, durationInMs, + FINALIZED_FILES_NUM_STR, numFilesFinalized)); + metrics.registerGauge(getMetricsName(FINALIZE_ACTION, DURATION_STR), durationInMs); + metrics.registerGauge(getMetricsName(FINALIZE_ACTION, FINALIZED_FILES_NUM_STR), numFilesFinalized); } } public void updateIndexMetrics(final String action, final long durationInMs) { if (config.isMetricsOn()) { - LOG.info(String.format("Sending index metrics (%s.duration, %d)", action, durationInMs)); - metrics.registerGauge(getMetricsName("index", String.format("%s.duration", action)), durationInMs); + LOG.info(String.format("Sending index metrics (%s.%s, %d)", action, DURATION_STR, durationInMs)); + metrics.registerGauge(getMetricsName(INDEX_ACTION, String.format("%s.%s", action, DURATION_STR)), durationInMs); } } @@ -306,7 +344,7 @@ public String getMetricsName(String action, String metric) { } public void updateClusteringFileCreationMetrics(long durationInMs) { - reportMetrics("replacecommit", "fileCreationTime", durationInMs); + reportMetrics(HoodieTimeline.REPLACE_COMMIT_ACTION, "fileCreationTime", durationInMs); } /** diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiver.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiver.java index be8702582299c..c03b08dcdec39 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiver.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiver.java @@ -20,6 +20,7 @@ import org.apache.hudi.avro.model.HoodieRollbackMetadata; import org.apache.hudi.avro.model.HoodieSavepointMetadata; +import org.apache.hudi.client.BaseHoodieWriteClient; import org.apache.hudi.client.timeline.HoodieTimelineArchiver; import org.apache.hudi.client.timeline.LSMTimelineWriter; import org.apache.hudi.client.transaction.lock.InProcessLockProvider; @@ -56,10 +57,12 @@ import org.apache.hudi.config.HoodieCompactionConfig; import org.apache.hudi.config.HoodieLockConfig; import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.config.metrics.HoodieMetricsConfig; import org.apache.hudi.exception.HoodieException; import org.apache.hudi.metadata.HoodieTableMetadata; import org.apache.hudi.metadata.HoodieTableMetadataWriter; import org.apache.hudi.metadata.SparkHoodieBackedTableMetadataWriter; +import org.apache.hudi.metrics.HoodieMetrics; import org.apache.hudi.table.HoodieSparkTable; import org.apache.hudi.table.HoodieTable; import org.apache.hudi.testutils.HoodieSparkClientTestHarness; @@ -109,6 +112,9 @@ import static org.apache.hudi.common.util.StringUtils.getUTF8Bytes; import static org.apache.hudi.config.HoodieArchivalConfig.ARCHIVE_BEYOND_SAVEPOINT; import static org.apache.hudi.metadata.HoodieTableMetadata.SOLO_COMMIT_TIMESTAMP; +import static org.apache.hudi.metrics.HoodieMetrics.ARCHIVE_ACTION; +import static org.apache.hudi.metrics.HoodieMetrics.DELETE_INSTANTS_NUM_STR; +import static org.apache.hudi.metrics.HoodieMetrics.DURATION_STR; import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; @@ -256,8 +262,8 @@ public void testArchiveEmptyTable() throws Exception { metaClient = HoodieTableMetaClient.reload(metaClient); HoodieTable table = HoodieSparkTable.create(cfg, context, metaClient); HoodieTimelineArchiver archiver = new HoodieTimelineArchiver(cfg, table); - boolean result = archiver.archiveIfRequired(context); - assertTrue(result); + int result = archiver.archiveIfRequired(context); + assertEquals(0, result); } @ParameterizedTest @@ -767,7 +773,7 @@ public void testArchiveCommitSavepointNoHole(boolean enableMetadataTable, boolea HoodieTimeline timeline = metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants(); assertEquals(6, timeline.countInstants(), "Loaded 6 commits and the count should match"); - assertTrue(archiver.archiveIfRequired(context)); + archiver.archiveIfRequired(context); timeline = metaClient.getActiveTimeline().reload().getCommitsTimeline().filterCompletedInstants(); if (archiveBeyondSavepoint) { // commits in active timeline = 101 and 105. @@ -953,8 +959,7 @@ public void testArchiveCommitTimeline(boolean enableMetadataTable) throws Except HoodieTable table = HoodieSparkTable.create(cfg, context, metaClient); HoodieTimelineArchiver archiver = new HoodieTimelineArchiver(cfg, table); - boolean result = archiver.archiveIfRequired(context); - assertTrue(result); + archiver.archiveIfRequired(context); HoodieArchivedTimeline archivedTimeline = metaClient.getArchivedTimeline(); List archivedInstants = Arrays.asList(instant1, instant2, instant3); assertEquals(new HashSet<>(archivedInstants), @@ -962,6 +967,25 @@ public void testArchiveCommitTimeline(boolean enableMetadataTable) throws Except assertFalse(wrapperFs.exists(markerPath)); } + @Test + public void testArchiveMetrics() throws Exception { + init(); + HoodieWriteConfig cfg = + HoodieWriteConfig.newBuilder().withPath(basePath) + .withMetricsConfig(HoodieMetricsConfig + .newBuilder() + .on(true) + .withReporterType("INMEMORY") + .build()) + .withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA) + .withParallelism(2, 2).forTable("test-trip-table").build(); + HoodieMetrics metrics = new HoodieMetrics(cfg); + BaseHoodieWriteClient client = getHoodieWriteClient(cfg); + client.archive(); + assertTrue(metrics.getMetrics().getRegistry().getNames().contains(metrics.getMetricsName(ARCHIVE_ACTION, DURATION_STR))); + assertTrue(metrics.getMetrics().getRegistry().getNames().contains(metrics.getMetricsName(ARCHIVE_ACTION, DELETE_INSTANTS_NUM_STR))); + } + private void verifyInflightInstants(HoodieTableMetaClient metaClient, int expectedTotalInstants) { HoodieTimeline timeline = metaClient.getActiveTimeline().reload() .getTimelineOfActions(Collections.singleton(HoodieTimeline.CLEAN_ACTION)).filterInflights(); @@ -1360,10 +1384,9 @@ public void testGetCommitInstantsToArchiveDuringInflightCommits() throws Excepti // Run archival HoodieTable table = HoodieSparkTable.create(cfg, context, metaClient); HoodieTimelineArchiver archiver = new HoodieTimelineArchiver(cfg, table); - boolean result = archiver.archiveIfRequired(context); + archiver.archiveIfRequired(context); expectedInstants.remove("1000"); expectedInstants.remove("1001"); - assertTrue(result); timeline = metaClient.reloadActiveTimeline().getWriteTimeline(); // Check the count of instants after archive it should have 2 less instants @@ -1383,7 +1406,7 @@ public void testGetCommitInstantsToArchiveDuringInflightCommits() throws Excepti metaClient.reloadActiveTimeline(); // Run archival - assertTrue(archiver.archiveIfRequired(context)); + archiver.archiveIfRequired(context); timeline = metaClient.reloadActiveTimeline().getWriteTimeline(); expectedInstants.removeAll(Arrays.asList("1002", "1003", "1004", "1005")); @@ -1417,8 +1440,7 @@ public void testWithOldestReplaceCommit() throws Exception { HoodieTimeline timeline = metaClient.reloadActiveTimeline(); assertEquals(9, timeline.countInstants(), "Loaded 9 commits and the count should match"); - boolean result = archiver.archiveIfRequired(context); - assertTrue(result); + archiver.archiveIfRequired(context); timeline = metaClient.reloadActiveTimeline(); assertEquals(9, timeline.countInstants(), "Since we have a pending replacecommit at 1001, we should never archive any commit after 1001");