diff --git a/hadoop-hdds/rocksdb-checkpoint-differ/pom.xml b/hadoop-hdds/rocksdb-checkpoint-differ/pom.xml
index fd8aae95c135..df94ff35af88 100644
--- a/hadoop-hdds/rocksdb-checkpoint-differ/pom.xml
+++ b/hadoop-hdds/rocksdb-checkpoint-differ/pom.xml
@@ -84,7 +84,7 @@
org.apache.hadoop
hadoop-common
- test
+ provided
org.apache.ozone
diff --git a/hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdiff/RocksDBCheckpointDiffer.java b/hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdiff/RocksDBCheckpointDiffer.java
index 583a3bb7bc1f..45cd4d6a79b2 100644
--- a/hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdiff/RocksDBCheckpointDiffer.java
+++ b/hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdiff/RocksDBCheckpointDiffer.java
@@ -172,6 +172,7 @@ public class RocksDBCheckpointDiffer implements AutoCloseable,
= new BootstrapStateHandler.Lock();
private static final int SST_READ_AHEAD_SIZE = 2 * 1024 * 1024;
private int pruneSSTFileBatchSize;
+ private SSTFilePruningMetrics sstFilePruningMetrics;
private ColumnFamilyHandle snapshotInfoTableCFHandle;
private static final String DAG_PRUNING_SERVICE_NAME = "CompactionDagPruningService";
private AtomicBoolean suspended;
@@ -239,10 +240,11 @@ public class RocksDBCheckpointDiffer implements AutoCloseable,
this.pruneSSTFileBatchSize = configuration.getInt(
OZONE_OM_SNAPSHOT_PRUNE_COMPACTION_BACKUP_BATCH_SIZE,
OZONE_OM_SNAPSHOT_PRUNE_COMPACTION_BACKUP_BATCH_SIZE_DEFAULT);
+ this.sstFilePruningMetrics = SSTFilePruningMetrics.create(activeDBLocationName);
try {
if (configuration.getBoolean(OZONE_OM_SNAPSHOT_LOAD_NATIVE_LIB, OZONE_OM_SNAPSHOT_LOAD_NATIVE_LIB_DEFAULT)
&& ManagedRawSSTFileReader.loadLibrary()) {
- pruneQueue = new ConcurrentLinkedQueue<>();
+ this.pruneQueue = new ConcurrentLinkedQueue<>();
}
} catch (NativeLibraryNotLoadedException e) {
LOG.warn("Native Library for raw sst file reading loading failed." +
@@ -338,6 +340,9 @@ public void close() {
LOG.info("Shutting down {}.", DAG_PRUNING_SERVICE_NAME);
scheduler.close();
}
+ if (sstFilePruningMetrics != null) {
+ sstFilePruningMetrics.unRegister();
+ }
}
}
}
@@ -532,6 +537,7 @@ public void onCompactionCompleted(RocksDB db,
// so that the backup input sst files can be pruned.
if (pruneQueue != null) {
pruneQueue.offer(key);
+ sstFilePruningMetrics.updateQueueSize(pruneQueue.size());
}
}
};
@@ -751,6 +757,10 @@ private void loadCompactionDagFromDB() {
}
} catch (InvalidProtocolBufferException e) {
throw new RuntimeException(e);
+ } finally {
+ if (pruneQueue != null) {
+ sstFilePruningMetrics.updateQueueSize(pruneQueue.size());
+ }
}
}
@@ -1258,13 +1268,16 @@ public void pruneSstFileValues() {
if (!shouldRun()) {
return;
}
+ long batchStartTime = System.nanoTime();
+ int filesPrunedInBatch = 0;
+ int filesSkippedInBatch = 0;
+ int batchCounter = 0;
Path sstBackupDirPath = Paths.get(sstBackupDir);
Path prunedSSTFilePath = sstBackupDirPath.resolve(PRUNED_SST_FILE_TEMP);
try (ManagedOptions managedOptions = new ManagedOptions();
ManagedEnvOptions envOptions = new ManagedEnvOptions()) {
byte[] compactionLogEntryKey;
- int batchCounter = 0;
while ((compactionLogEntryKey = pruneQueue.peek()) != null && ++batchCounter <= pruneSSTFileBatchSize) {
CompactionLogEntry compactionLogEntry;
// Get the compaction log entry.
@@ -1289,6 +1302,7 @@ public void pruneSstFileValues() {
if (Files.notExists(sstFilePath)) {
LOG.debug("Skipping pruning SST file {} as it does not exist in backup directory.", sstFilePath);
updatedFileInfoList.add(fileInfo);
+ filesSkippedInBatch++;
continue;
}
@@ -1306,6 +1320,7 @@ public void pruneSstFileValues() {
fileInfo.setPruned();
updatedFileInfoList.add(fileInfo);
LOG.debug("Completed pruning OMKeyInfo from {}", sstFilePath);
+ filesPrunedInBatch++;
}
// Update compaction log entry in table.
@@ -1325,6 +1340,12 @@ public void pruneSstFileValues() {
}
} catch (IOException | InterruptedException e) {
LOG.error("Could not prune source OMKeyInfo from backup SST files.", e);
+ sstFilePruningMetrics.incrPruningFailures();
+ } finally {
+ LOG.info("Completed pruning OMKeyInfo from backup SST files in {}ms.",
+ (System.nanoTime() - batchStartTime) / 1_000_000);
+ sstFilePruningMetrics.updateBatchLevelMetrics(filesPrunedInBatch, filesSkippedInBatch,
+ batchCounter, pruneQueue.size());
}
}
@@ -1428,4 +1449,9 @@ private Map toFileInfoList(List sstFiles, Ro
ConcurrentMap getInflightCompactions() {
return inflightCompactions;
}
+
+ @VisibleForTesting
+ public SSTFilePruningMetrics getPruningMetrics() {
+ return sstFilePruningMetrics;
+ }
}
diff --git a/hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdiff/SSTFilePruningMetrics.java b/hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdiff/SSTFilePruningMetrics.java
new file mode 100644
index 000000000000..46eb2c0889b5
--- /dev/null
+++ b/hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdiff/SSTFilePruningMetrics.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ozone.rocksdiff;
+
+import org.apache.hadoop.metrics2.MetricsCollector;
+import org.apache.hadoop.metrics2.MetricsRecordBuilder;
+import org.apache.hadoop.metrics2.MetricsSource;
+import org.apache.hadoop.metrics2.annotation.Metric;
+import org.apache.hadoop.metrics2.annotation.Metrics;
+import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
+import org.apache.hadoop.metrics2.lib.MetricsRegistry;
+import org.apache.hadoop.metrics2.lib.MutableCounterLong;
+import org.apache.hadoop.metrics2.lib.MutableGaugeLong;
+import org.apache.hadoop.ozone.OzoneConsts;
+
+/**
+ * Class contains metrics for monitoring SST file pruning operations in RocksDBCheckpointDiffer.
+ */
+@Metrics(about = "SST File Pruning Metrics", context = OzoneConsts.OZONE)
+public final class SSTFilePruningMetrics implements MetricsSource {
+
+ private static final String METRICS_SOURCE_NAME_PREFIX = SSTFilePruningMetrics.class.getSimpleName();
+ private final String metricSourceName;
+ private final MetricsRegistry registry;
+
+ /*
+ * Pruning Throughput Metrics.
+ */
+ @Metric("Total no. of SST files pruned")
+ private MutableCounterLong filesPrunedTotal;
+ @Metric("No. of SST files pruned in the last batch")
+ private MutableGaugeLong filesPrunedLast;
+ @Metric("Total no. of SST files removed")
+ private MutableCounterLong filesSkippedTotal;
+ @Metric("Total no. of compactions processed")
+ private MutableCounterLong compactionsProcessed;
+ @Metric("No. of pending pruning jobs in queue")
+ private MutableGaugeLong pruneQueueSize;
+
+ /*
+ * Pruning failure Metrics.
+ */
+ @Metric("No. of pruning job failures")
+ private MutableCounterLong pruningFailures;
+
+ private SSTFilePruningMetrics(String sourceName) {
+ this.metricSourceName = sourceName;
+ this.registry = new MetricsRegistry(metricSourceName);
+ }
+
+ /**
+ * Creates and returns SSTFilePruningMetrics instance.
+ *
+ * @return SSTFilePruningMetrics
+ */
+ public static SSTFilePruningMetrics create(String dbLocation) {
+ String sourceName = METRICS_SOURCE_NAME_PREFIX +
+ (dbLocation == null || dbLocation.isEmpty() ? "" : "-" + dbLocation.replaceAll("[/\\:\\s]", "_"));
+ return DefaultMetricsSystem.instance().register(sourceName, "SST File Pruning Metrics",
+ new SSTFilePruningMetrics(sourceName));
+ }
+
+ /**
+ * Unregister the metrics instance.
+ */
+ public void unRegister() {
+ DefaultMetricsSystem.instance().unregisterSource(metricSourceName);
+ }
+
+ public void updateQueueSize(long queueSize) {
+ pruneQueueSize.set(queueSize);
+ }
+
+ public void updateBatchLevelMetrics(long filesPruned, long filesSkipped, int compactions, long queueSize) {
+ filesPrunedTotal.incr(filesPruned);
+ filesPrunedLast.set(filesPruned);
+ filesSkippedTotal.incr(filesSkipped);
+ compactionsProcessed.incr(compactions);
+ updateQueueSize(queueSize);
+ }
+
+ public void incrPruningFailures() {
+ pruningFailures.incr();
+ }
+
+ public long getFilesPrunedTotal() {
+ return filesPrunedTotal.value();
+ }
+
+ public long getFilesPrunedLast() {
+ return filesPrunedLast.value();
+ }
+
+ public long getFilesRemovedTotal() {
+ return filesSkippedTotal.value();
+ }
+
+ public long getCompactionsProcessed() {
+ return compactionsProcessed.value();
+ }
+
+ public long getPruneQueueSize() {
+ return pruneQueueSize.value();
+ }
+
+ public long getPruningFailures() {
+ return pruningFailures.value();
+ }
+
+ @Override
+ public void getMetrics(MetricsCollector collector, boolean all) {
+ MetricsRecordBuilder recordBuilder = collector.addRecord(metricSourceName);
+ filesPrunedTotal.snapshot(recordBuilder, all);
+ filesPrunedLast.snapshot(recordBuilder, all);
+ filesSkippedTotal.snapshot(recordBuilder, all);
+ compactionsProcessed.snapshot(recordBuilder, all);
+ pruneQueueSize.snapshot(recordBuilder, all);
+ pruningFailures.snapshot(recordBuilder, all);
+ }
+}
diff --git a/hadoop-hdds/rocksdb-checkpoint-differ/src/test/java/org/apache/ozone/rocksdiff/TestRocksDBCheckpointDiffer.java b/hadoop-hdds/rocksdb-checkpoint-differ/src/test/java/org/apache/ozone/rocksdiff/TestRocksDBCheckpointDiffer.java
index 1a329647cc9f..5b6fc39f2378 100644
--- a/hadoop-hdds/rocksdb-checkpoint-differ/src/test/java/org/apache/ozone/rocksdiff/TestRocksDBCheckpointDiffer.java
+++ b/hadoop-hdds/rocksdb-checkpoint-differ/src/test/java/org/apache/ozone/rocksdiff/TestRocksDBCheckpointDiffer.java
@@ -1935,6 +1935,12 @@ private static Stream casesGetSSTDiffListWithoutDB2() {
*/
@Test
public void testPruneSSTFileValues() throws Exception {
+ SSTFilePruningMetrics sstFilePruningMetrics = rocksDBCheckpointDiffer.getPruningMetrics();
+ assertEquals(0L, sstFilePruningMetrics.getPruneQueueSize());
+ assertEquals(0L, sstFilePruningMetrics.getFilesPrunedTotal());
+ assertEquals(0L, sstFilePruningMetrics.getFilesPrunedLast());
+ assertEquals(0L, sstFilePruningMetrics.getCompactionsProcessed());
+ assertEquals(0L, sstFilePruningMetrics.getFilesRemovedTotal());
List> keys = new ArrayList>();
keys.add(Pair.of("key1".getBytes(UTF_8), Integer.valueOf(1)));
@@ -1962,8 +1968,9 @@ public void testPruneSSTFileValues() throws Exception {
);
byte[] compactionLogEntryKey = rocksDBCheckpointDiffer.addToCompactionLogTable(compactionLogEntry);
rocksDBCheckpointDiffer.loadAllCompactionLogs();
+ assertEquals(1L, sstFilePruningMetrics.getPruneQueueSize());
- // Pruning should not fail a source SST file has been removed by a another pruner.
+ // Pruning should not fail a source SST file has been removed by another pruner.
Files.delete(sstBackUpDir.toPath().resolve(inputFile73 + SST_FILE_EXTENSION));
// Run the SST file pruner.
ManagedRawSSTFileIterator mockedRawSSTFileItr = mock(ManagedRawSSTFileIterator.class);
@@ -2011,6 +2018,12 @@ public void testPruneSSTFileValues() throws Exception {
// Verify 000073.sst pruning has been skipped
assertFalse(fileInfo73.isPruned());
+
+ assertEquals(0L, sstFilePruningMetrics.getPruneQueueSize());
+ assertEquals(1L, sstFilePruningMetrics.getFilesPrunedTotal());
+ assertEquals(1L, sstFilePruningMetrics.getFilesPrunedLast());
+ assertEquals(1L, sstFilePruningMetrics.getCompactionsProcessed());
+ assertEquals(1L, sstFilePruningMetrics.getFilesRemovedTotal());
}
private void createSSTFileWithKeys(String filePath, List> keys)