Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion hadoop-hdds/rocksdb-checkpoint-differ/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<scope>test</scope>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.ozone</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,7 @@ public class RocksDBCheckpointDiffer implements AutoCloseable,
= new BootstrapStateHandler.Lock();
private static final int SST_READ_AHEAD_SIZE = 2 * 1024 * 1024;
private int pruneSSTFileBatchSize;
private SSTFilePruningMetrics sstFilePruningMetrics;
private ColumnFamilyHandle snapshotInfoTableCFHandle;
private static final String DAG_PRUNING_SERVICE_NAME = "CompactionDagPruningService";
private AtomicBoolean suspended;
Expand Down Expand Up @@ -239,10 +240,11 @@ public class RocksDBCheckpointDiffer implements AutoCloseable,
this.pruneSSTFileBatchSize = configuration.getInt(
OZONE_OM_SNAPSHOT_PRUNE_COMPACTION_BACKUP_BATCH_SIZE,
OZONE_OM_SNAPSHOT_PRUNE_COMPACTION_BACKUP_BATCH_SIZE_DEFAULT);
this.sstFilePruningMetrics = SSTFilePruningMetrics.create(activeDBLocationName);
try {
if (configuration.getBoolean(OZONE_OM_SNAPSHOT_LOAD_NATIVE_LIB, OZONE_OM_SNAPSHOT_LOAD_NATIVE_LIB_DEFAULT)
&& ManagedRawSSTFileReader.loadLibrary()) {
pruneQueue = new ConcurrentLinkedQueue<>();
this.pruneQueue = new ConcurrentLinkedQueue<>();
}
} catch (NativeLibraryNotLoadedException e) {
LOG.warn("Native Library for raw sst file reading loading failed." +
Expand Down Expand Up @@ -338,6 +340,9 @@ public void close() {
LOG.info("Shutting down {}.", DAG_PRUNING_SERVICE_NAME);
scheduler.close();
}
if (sstFilePruningMetrics != null) {
sstFilePruningMetrics.unRegister();
}
}
}
}
Expand Down Expand Up @@ -532,6 +537,7 @@ public void onCompactionCompleted(RocksDB db,
// so that the backup input sst files can be pruned.
if (pruneQueue != null) {
pruneQueue.offer(key);
sstFilePruningMetrics.updateQueueSize(pruneQueue.size());
}
}
};
Expand Down Expand Up @@ -751,6 +757,10 @@ private void loadCompactionDagFromDB() {
}
} catch (InvalidProtocolBufferException e) {
throw new RuntimeException(e);
} finally {
if (pruneQueue != null) {
sstFilePruningMetrics.updateQueueSize(pruneQueue.size());
}
}
}

Expand Down Expand Up @@ -1258,13 +1268,16 @@ public void pruneSstFileValues() {
if (!shouldRun()) {
return;
}
long batchStartTime = System.nanoTime();
int filesPrunedInBatch = 0;
int filesSkippedInBatch = 0;
int batchCounter = 0;

Path sstBackupDirPath = Paths.get(sstBackupDir);
Path prunedSSTFilePath = sstBackupDirPath.resolve(PRUNED_SST_FILE_TEMP);
try (ManagedOptions managedOptions = new ManagedOptions();
ManagedEnvOptions envOptions = new ManagedEnvOptions()) {
byte[] compactionLogEntryKey;
int batchCounter = 0;
while ((compactionLogEntryKey = pruneQueue.peek()) != null && ++batchCounter <= pruneSSTFileBatchSize) {
CompactionLogEntry compactionLogEntry;
// Get the compaction log entry.
Expand All @@ -1289,6 +1302,7 @@ public void pruneSstFileValues() {
if (Files.notExists(sstFilePath)) {
LOG.debug("Skipping pruning SST file {} as it does not exist in backup directory.", sstFilePath);
updatedFileInfoList.add(fileInfo);
filesSkippedInBatch++;
continue;
}

Expand All @@ -1306,6 +1320,7 @@ public void pruneSstFileValues() {
fileInfo.setPruned();
updatedFileInfoList.add(fileInfo);
LOG.debug("Completed pruning OMKeyInfo from {}", sstFilePath);
filesPrunedInBatch++;
}

// Update compaction log entry in table.
Expand All @@ -1325,6 +1340,12 @@ public void pruneSstFileValues() {
}
} catch (IOException | InterruptedException e) {
LOG.error("Could not prune source OMKeyInfo from backup SST files.", e);
sstFilePruningMetrics.incrPruningFailures();
} finally {
LOG.info("Completed pruning OMKeyInfo from backup SST files in {}ms.",
(System.nanoTime() - batchStartTime) / 1_000_000);
sstFilePruningMetrics.updateBatchLevelMetrics(filesPrunedInBatch, filesSkippedInBatch,
batchCounter, pruneQueue.size());
}
}

Expand Down Expand Up @@ -1428,4 +1449,9 @@ private Map<String, CompactionFileInfo> toFileInfoList(List<String> sstFiles, Ro
ConcurrentMap<String, CompactionFileInfo> getInflightCompactions() {
return inflightCompactions;
}

@VisibleForTesting
public SSTFilePruningMetrics getPruningMetrics() {
return sstFilePruningMetrics;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.ozone.rocksdiff;

import org.apache.hadoop.metrics2.MetricsCollector;
import org.apache.hadoop.metrics2.MetricsRecordBuilder;
import org.apache.hadoop.metrics2.MetricsSource;
import org.apache.hadoop.metrics2.annotation.Metric;
import org.apache.hadoop.metrics2.annotation.Metrics;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.metrics2.lib.MetricsRegistry;
import org.apache.hadoop.metrics2.lib.MutableCounterLong;
import org.apache.hadoop.metrics2.lib.MutableGaugeLong;
import org.apache.hadoop.ozone.OzoneConsts;

/**
* Class contains metrics for monitoring SST file pruning operations in RocksDBCheckpointDiffer.
*/
@Metrics(about = "SST File Pruning Metrics", context = OzoneConsts.OZONE)
public final class SSTFilePruningMetrics implements MetricsSource {

private static final String METRICS_SOURCE_NAME_PREFIX = SSTFilePruningMetrics.class.getSimpleName();
private final String metricSourceName;
private final MetricsRegistry registry;

/*
* Pruning Throughput Metrics.
*/
@Metric("Total no. of SST files pruned")
private MutableCounterLong filesPrunedTotal;
@Metric("No. of SST files pruned in the last batch")
private MutableGaugeLong filesPrunedLast;
@Metric("Total no. of SST files removed")
private MutableCounterLong filesSkippedTotal;
@Metric("Total no. of compactions processed")
private MutableCounterLong compactionsProcessed;
@Metric("No. of pending pruning jobs in queue")
private MutableGaugeLong pruneQueueSize;

/*
* Pruning failure Metrics.
*/
@Metric("No. of pruning job failures")
private MutableCounterLong pruningFailures;

private SSTFilePruningMetrics(String sourceName) {
this.metricSourceName = sourceName;
this.registry = new MetricsRegistry(metricSourceName);
}

/**
* Creates and returns SSTFilePruningMetrics instance.
*
* @return SSTFilePruningMetrics
*/
public static SSTFilePruningMetrics create(String dbLocation) {
String sourceName = METRICS_SOURCE_NAME_PREFIX +
(dbLocation == null || dbLocation.isEmpty() ? "" : "-" + dbLocation.replaceAll("[/\\:\\s]", "_"));
return DefaultMetricsSystem.instance().register(sourceName, "SST File Pruning Metrics",
new SSTFilePruningMetrics(sourceName));
}

/**
* Unregister the metrics instance.
*/
public void unRegister() {
DefaultMetricsSystem.instance().unregisterSource(metricSourceName);
}

public void updateQueueSize(long queueSize) {
pruneQueueSize.set(queueSize);
}

public void updateBatchLevelMetrics(long filesPruned, long filesSkipped, int compactions, long queueSize) {
filesPrunedTotal.incr(filesPruned);
filesPrunedLast.set(filesPruned);
filesSkippedTotal.incr(filesSkipped);
compactionsProcessed.incr(compactions);
updateQueueSize(queueSize);
}

public void incrPruningFailures() {
pruningFailures.incr();
}

public long getFilesPrunedTotal() {
return filesPrunedTotal.value();
}

public long getFilesPrunedLast() {
return filesPrunedLast.value();
}

public long getFilesRemovedTotal() {
return filesSkippedTotal.value();
}

public long getCompactionsProcessed() {
return compactionsProcessed.value();
}

public long getPruneQueueSize() {
return pruneQueueSize.value();
}

public long getPruningFailures() {
return pruningFailures.value();
}

@Override
public void getMetrics(MetricsCollector collector, boolean all) {
MetricsRecordBuilder recordBuilder = collector.addRecord(metricSourceName);
filesPrunedTotal.snapshot(recordBuilder, all);
filesPrunedLast.snapshot(recordBuilder, all);
filesSkippedTotal.snapshot(recordBuilder, all);
compactionsProcessed.snapshot(recordBuilder, all);
pruneQueueSize.snapshot(recordBuilder, all);
pruningFailures.snapshot(recordBuilder, all);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1935,6 +1935,12 @@ private static Stream<Arguments> casesGetSSTDiffListWithoutDB2() {
*/
@Test
public void testPruneSSTFileValues() throws Exception {
SSTFilePruningMetrics sstFilePruningMetrics = rocksDBCheckpointDiffer.getPruningMetrics();
assertEquals(0L, sstFilePruningMetrics.getPruneQueueSize());
assertEquals(0L, sstFilePruningMetrics.getFilesPrunedTotal());
assertEquals(0L, sstFilePruningMetrics.getFilesPrunedLast());
assertEquals(0L, sstFilePruningMetrics.getCompactionsProcessed());
assertEquals(0L, sstFilePruningMetrics.getFilesRemovedTotal());

List<Pair<byte[], Integer>> keys = new ArrayList<Pair<byte[], Integer>>();
keys.add(Pair.of("key1".getBytes(UTF_8), Integer.valueOf(1)));
Expand Down Expand Up @@ -1962,8 +1968,9 @@ public void testPruneSSTFileValues() throws Exception {
);
byte[] compactionLogEntryKey = rocksDBCheckpointDiffer.addToCompactionLogTable(compactionLogEntry);
rocksDBCheckpointDiffer.loadAllCompactionLogs();
assertEquals(1L, sstFilePruningMetrics.getPruneQueueSize());

// Pruning should not fail a source SST file has been removed by a another pruner.
// Pruning should not fail a source SST file has been removed by another pruner.
Files.delete(sstBackUpDir.toPath().resolve(inputFile73 + SST_FILE_EXTENSION));
// Run the SST file pruner.
ManagedRawSSTFileIterator mockedRawSSTFileItr = mock(ManagedRawSSTFileIterator.class);
Expand Down Expand Up @@ -2011,6 +2018,12 @@ public void testPruneSSTFileValues() throws Exception {

// Verify 000073.sst pruning has been skipped
assertFalse(fileInfo73.isPruned());

assertEquals(0L, sstFilePruningMetrics.getPruneQueueSize());
assertEquals(1L, sstFilePruningMetrics.getFilesPrunedTotal());
assertEquals(1L, sstFilePruningMetrics.getFilesPrunedLast());
assertEquals(1L, sstFilePruningMetrics.getCompactionsProcessed());
assertEquals(1L, sstFilePruningMetrics.getFilesRemovedTotal());
}

private void createSSTFileWithKeys(String filePath, List<Pair<byte[], Integer>> keys)
Expand Down