diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/HoodieMetrics.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/HoodieMetrics.java index 4c849edcb6ccf..dac680a5c4023 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/HoodieMetrics.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/HoodieMetrics.java @@ -21,6 +21,7 @@ import org.apache.hudi.common.model.HoodieCommitMetadata; import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.VisibleForTesting; import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.config.HoodieWriteConfig; @@ -50,6 +51,8 @@ public class HoodieMetrics { private String conflictResolutionTimerName = null; private String conflictResolutionSuccessCounterName = null; private String conflictResolutionFailureCounterName = null; + private String compactionRequestedCounterName = null; + private String compactionCompletedCounterName = null; private HoodieWriteConfig config; private String tableName; private Timer rollbackTimer = null; @@ -64,6 +67,8 @@ public class HoodieMetrics { private Timer conflictResolutionTimer = null; private Counter conflictResolutionSuccessCounter = null; private Counter conflictResolutionFailureCounter = null; + private Counter compactionRequestedCounter = null; + private Counter compactionCompletedCounter = null; public HoodieMetrics(HoodieWriteConfig config) { this.config = config; @@ -82,6 +87,8 @@ public HoodieMetrics(HoodieWriteConfig config) { this.conflictResolutionTimerName = getMetricsName("timer", "conflict_resolution"); this.conflictResolutionSuccessCounterName = getMetricsName("counter", "conflict_resolution.success"); this.conflictResolutionFailureCounterName = getMetricsName("counter", "conflict_resolution.failure"); + this.compactionRequestedCounterName = getMetricsName("counter", "compaction.requested"); + this.compactionCompletedCounterName = getMetricsName("counter", "compaction.completed"); } } @@ -270,7 +277,8 @@ public void updateIndexMetrics(final String action, final long durationInMs) { } } - String getMetricsName(String action, String metric) { + @VisibleForTesting + public String getMetricsName(String action, String metric) { return config == null ? null : String.format("%s.%s.%s", config.getMetricReporterMetricsNamePrefix(), action, metric); } @@ -308,6 +316,20 @@ public void emitConflictResolutionFailed() { } } + public void emitCompactionRequested() { + if (config.isMetricsOn()) { + compactionRequestedCounter = getCounter(compactionRequestedCounter, compactionRequestedCounterName); + compactionRequestedCounter.inc(); + } + } + + public void emitCompactionCompleted() { + if (config.isMetricsOn()) { + compactionCompletedCounter = getCounter(compactionCompletedCounter, compactionCompletedCounterName); + compactionCompletedCounter.inc(); + } + } + private Counter getCounter(Counter counter, String name) { if (counter == null) { return metrics.getRegistry().counter(name); diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/RunCompactionActionExecutor.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/RunCompactionActionExecutor.java index 5bd1894f26dee..055cdb5910bfe 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/RunCompactionActionExecutor.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/RunCompactionActionExecutor.java @@ -35,11 +35,15 @@ import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieCompactionException; import org.apache.hudi.internal.schema.utils.SerDeHelper; +import org.apache.hudi.metrics.HoodieMetrics; import org.apache.hudi.table.HoodieCompactionHandler; import org.apache.hudi.table.HoodieTable; import org.apache.hudi.table.action.BaseActionExecutor; import org.apache.hudi.table.action.HoodieWriteMetadata; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import java.util.List; import static org.apache.hudi.common.util.ValidationUtils.checkArgument; @@ -48,10 +52,14 @@ public class RunCompactionActionExecutor extends BaseActionExecutor>, HoodieData, HoodieData, HoodieWriteMetadata>> { + private static final Logger LOG = LoggerFactory.getLogger(RunCompactionActionExecutor.class); + private final HoodieCompactor compactor; private final HoodieCompactionHandler compactionHandler; private WriteOperationType operationType; + private final HoodieMetrics metrics; + public RunCompactionActionExecutor(HoodieEngineContext context, HoodieWriteConfig config, HoodieTable table, @@ -65,10 +73,14 @@ public RunCompactionActionExecutor(HoodieEngineContext context, this.operationType = operationType; checkArgument(operationType == WriteOperationType.COMPACT || operationType == WriteOperationType.LOG_COMPACT, "Only COMPACT and LOG_COMPACT is supported"); + metrics = new HoodieMetrics(config); } @Override public HoodieWriteMetadata> execute() { + LOG.info("Compaction requested. Instant time: {}.", instantTime); + metrics.emitCompactionRequested(); + HoodieTimeline pendingMajorOrMinorCompactionTimeline = WriteOperationType.COMPACT.equals(operationType) ? table.getActiveTimeline().filterPendingCompactionTimeline() : table.getActiveTimeline().filterPendingLogCompactionTimeline(); @@ -117,6 +129,8 @@ public HoodieWriteMetadata> execute() { throw new HoodieCompactionException("Could not compact " + config.getBasePath(), e); } + LOG.info("Compaction completed. Instant time: {}.", instantTime); + metrics.emitCompactionCompleted(); return compactionMetadata; } } diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/TestHoodieCompactor.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/TestHoodieCompactor.java index c0e62631664a8..2fe4e146ae5bf 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/TestHoodieCompactor.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/TestHoodieCompactor.java @@ -40,14 +40,17 @@ import org.apache.hudi.config.HoodieIndexConfig; import org.apache.hudi.config.HoodieMemoryConfig; import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.config.metrics.HoodieMetricsConfig; import org.apache.hudi.exception.HoodieNotSupportedException; import org.apache.hudi.index.HoodieIndex; import org.apache.hudi.index.bloom.HoodieBloomIndex; import org.apache.hudi.index.bloom.SparkHoodieBloomIndexHelper; +import org.apache.hudi.metrics.HoodieMetrics; import org.apache.hudi.table.HoodieSparkTable; import org.apache.hudi.table.HoodieTable; import org.apache.hudi.testutils.HoodieClientTestHarness; +import com.codahale.metrics.Counter; import org.apache.hadoop.conf.Configuration; import org.apache.spark.api.java.JavaRDD; import org.junit.jupiter.api.AfterEach; @@ -56,6 +59,7 @@ import java.io.IOException; import java.util.List; +import java.util.SortedMap; import java.util.stream.Collectors; import static org.junit.jupiter.api.Assertions.assertEquals; @@ -89,9 +93,22 @@ public void tearDown() throws Exception { private HoodieWriteConfig getConfig() { return getConfigBuilder() .withCompactionConfig(HoodieCompactionConfig.newBuilder().withMaxNumDeltaCommitsBeforeCompaction(1).build()) + .withMetricsConfig(getMetricsConfig()) .build(); } + private static HoodieMetricsConfig getMetricsConfig() { + return HoodieMetricsConfig.newBuilder().on(true).withReporterType("INMEMORY").build(); + } + + private long getCompactionMetricCount(String metric) { + HoodieMetrics metrics = writeClient.getMetrics(); + String metricName = metrics.getMetricsName("counter", metric); + SortedMap counters = metrics.getMetrics().getRegistry().getCounters(); + + return counters.containsKey(metricName) ? counters.get(metricName).getCount() : 0; + } + private HoodieWriteConfig.Builder getConfigBuilder() { return HoodieWriteConfig.newBuilder().withPath(basePath).withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA) .withParallelism(2, 2) @@ -106,12 +123,18 @@ private HoodieWriteConfig.Builder getConfigBuilder() { @Test public void testCompactionOnCopyOnWriteFail() throws Exception { metaClient = HoodieTestUtils.init(hadoopConf, basePath, HoodieTableType.COPY_ON_WRITE); - HoodieTable table = HoodieSparkTable.create(getConfig(), context, metaClient); - String compactionInstantTime = HoodieActiveTimeline.createNewInstantTime(); - assertThrows(HoodieNotSupportedException.class, () -> { - table.scheduleCompaction(context, compactionInstantTime, Option.empty()); - table.compact(context, compactionInstantTime); - }); + try (SparkRDDWriteClient writeClient = getHoodieWriteClient(getConfig());) { + HoodieTable table = HoodieSparkTable.create(getConfig(), context, metaClient); + String compactionInstantTime = HoodieActiveTimeline.createNewInstantTime(); + assertThrows(HoodieNotSupportedException.class, () -> { + table.scheduleCompaction(context, compactionInstantTime, Option.empty()); + table.compact(context, compactionInstantTime); + }); + + // Verify compaction.requested, compaction.completed metrics counts. + assertEquals(0, getCompactionMetricCount(HoodieTimeline.REQUESTED_COMPACTION_SUFFIX)); + assertEquals(0, getCompactionMetricCount(HoodieTimeline.COMPLETED_COMPACTION_SUFFIX)); + } } @Test @@ -129,6 +152,10 @@ public void testCompactionEmpty() { String compactionInstantTime = HoodieActiveTimeline.createNewInstantTime(); Option plan = table.scheduleCompaction(context, compactionInstantTime, Option.empty()); assertFalse(plan.isPresent(), "If there is nothing to compact, result will be empty"); + + // Verify compaction.requested, compaction.completed metrics counts. + assertEquals(0, getCompactionMetricCount(HoodieTimeline.REQUESTED_COMPACTION_SUFFIX)); + assertEquals(0, getCompactionMetricCount(HoodieTimeline.COMPLETED_COMPACTION_SUFFIX)); } } @@ -148,7 +175,7 @@ public void testScheduleCompactionWithInflightInstant() { newCommitTime = "102"; writeClient.startCommitWithTime(newCommitTime); metaClient.getActiveTimeline().transitionRequestedToInflight(new HoodieInstant(State.REQUESTED, - HoodieTimeline.DELTA_COMMIT_ACTION, newCommitTime), Option.empty()); + HoodieTimeline.DELTA_COMMIT_ACTION, newCommitTime), Option.empty()); // create one compaction instance before exist inflight instance. String compactionTime = "101"; @@ -161,6 +188,7 @@ public void testWriteStatusContentsAfterCompaction() throws Exception { // insert 100 records HoodieWriteConfig config = getConfigBuilder() .withCompactionConfig(HoodieCompactionConfig.newBuilder().withMaxNumDeltaCommitsBeforeCompaction(1).build()) + .withMetricsConfig(getMetricsConfig()) .build(); try (SparkRDDWriteClient writeClient = getHoodieWriteClient(config)) { String newCommitTime = "100"; @@ -180,6 +208,10 @@ public void testWriteStatusContentsAfterCompaction() throws Exception { HoodieData result = compact(writeClient, compactionInstantTime); verifyCompaction(result); + + // Verify compaction.requested, compaction.completed metrics counts. + assertEquals(1, getCompactionMetricCount(HoodieTimeline.REQUESTED_COMPACTION_SUFFIX)); + assertEquals(1, getCompactionMetricCount(HoodieTimeline.COMPLETED_COMPACTION_SUFFIX)); } } @@ -190,7 +222,9 @@ public void testSpillingWhenCompaction() throws Exception { .withCompactionConfig(HoodieCompactionConfig.newBuilder().withMaxNumDeltaCommitsBeforeCompaction(1).build()) .withMemoryConfig(HoodieMemoryConfig.newBuilder() .withMaxMemoryMaxSize(1L, 1L).build()) // force spill + .withMetricsConfig(getMetricsConfig()) .build(); + try (SparkRDDWriteClient writeClient = getHoodieWriteClient(config)) { String newCommitTime = "100"; writeClient.startCommitWithTime(newCommitTime); @@ -210,6 +244,10 @@ public void testSpillingWhenCompaction() throws Exception { HoodieData result = compact(writeClient, "10" + (i + 1)); verifyCompaction(result); + + // Verify compaction.requested, compaction.completed metrics counts. + assertEquals(i / 2 + 1, getCompactionMetricCount(HoodieTimeline.REQUESTED_COMPACTION_SUFFIX)); + assertEquals(i / 2 + 1, getCompactionMetricCount(HoodieTimeline.COMPLETED_COMPACTION_SUFFIX)); } } } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieTimeline.java b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieTimeline.java index 2cfcffb623d49..72a6e910b76b6 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieTimeline.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieTimeline.java @@ -56,11 +56,11 @@ public interface HoodieTimeline extends Serializable { String COMPACTION_ACTION = "compaction"; String LOG_COMPACTION_ACTION = "logcompaction"; String REQUESTED_EXTENSION = ".requested"; + String COMPLETED_EXTENSION = ".completed"; String RESTORE_ACTION = "restore"; String INDEXING_ACTION = "indexing"; // only for schema save String SCHEMA_COMMIT_ACTION = "schemacommit"; - String[] VALID_ACTIONS_IN_TIMELINE = {COMMIT_ACTION, DELTA_COMMIT_ACTION, CLEAN_ACTION, SAVEPOINT_ACTION, RESTORE_ACTION, ROLLBACK_ACTION, COMPACTION_ACTION, REPLACE_COMMIT_ACTION, INDEXING_ACTION}; @@ -81,6 +81,7 @@ public interface HoodieTimeline extends Serializable { String REQUESTED_ROLLBACK_EXTENSION = "." + ROLLBACK_ACTION + REQUESTED_EXTENSION; String INFLIGHT_SAVEPOINT_EXTENSION = "." + SAVEPOINT_ACTION + INFLIGHT_EXTENSION; String REQUESTED_COMPACTION_SUFFIX = StringUtils.join(COMPACTION_ACTION, REQUESTED_EXTENSION); + String COMPLETED_COMPACTION_SUFFIX = StringUtils.join(COMPACTION_ACTION, COMPLETED_EXTENSION); String REQUESTED_COMPACTION_EXTENSION = StringUtils.join(".", REQUESTED_COMPACTION_SUFFIX); String INFLIGHT_COMPACTION_EXTENSION = StringUtils.join(".", COMPACTION_ACTION, INFLIGHT_EXTENSION); String REQUESTED_RESTORE_EXTENSION = "." + RESTORE_ACTION + REQUESTED_EXTENSION;