Skip to content
Merged
9 changes: 9 additions & 0 deletions hadoop-hdds/common/src/main/resources/ozone-default.xml
Original file line number Diff line number Diff line change
Expand Up @@ -1561,6 +1561,15 @@
</description>
</property>

<property>
<name>ozone.om.snapshot.compact.non.snapshot.diff.tables</name>
<value>false</value>
<tag>OZONE, OM, PERFORMANCE</tag>
<description>
Enable or disable compaction of tables that are not tracked by snapshot diff when their snapshots are evicted from cache.
</description>
</property>

<property>
<name>ozone.om.snapshot.rocksdb.metrics.enabled</name>
<value>false</value>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.hadoop.hdds.annotation.InterfaceStability;
import org.apache.hadoop.hdds.utils.db.cache.TableCache;
import org.apache.hadoop.hdds.utils.db.cache.TableCache.CacheType;
import org.apache.hadoop.hdds.utils.db.managed.ManagedCompactRangeOptions;
import org.apache.ozone.rocksdiff.RocksDBCheckpointDiffer;

/**
Expand Down Expand Up @@ -98,6 +99,23 @@ <KEY, VALUE> TypedTable<KEY, VALUE> getTable(
*/
void compactDB() throws IOException;

/**
* Compact the specific table.
*
* @param tableName - Name of the table to compact.
* @throws IOException on Failure
*/
void compactTable(String tableName) throws IOException;

/**
* Compact the specific table.
*
* @param tableName - Name of the table to compact.
* @param options - Options for the compact operation.
* @throws IOException on Failure
*/
void compactTable(String tableName, ManagedCompactRangeOptions options) throws IOException;

/**
* Moves a key from the Source Table to the destination Table.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,22 @@ public void compactDB() throws IOException {
}
}

@Override
public void compactTable(String tableName) throws IOException {
try (ManagedCompactRangeOptions options = new ManagedCompactRangeOptions()) {
compactTable(tableName, options);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not synchronous. Even if we call this function from snapshot cache to compact db, it wouldn't be helpful since the compaction would be cancelled. We would have to wait for all the compactions to finish before we close it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider ManagedCompactRangeOptions.setExclusiveManualCompaction(true)

https://github.com/facebook/rocksdb/wiki/Manual-Compaction

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider ManagedCompactRangeOptions.setExclusiveManualCompaction(true)

https://github.com/facebook/rocksdb/wiki/Manual-Compaction

This seems default to true.
https://github.com/facebook/rocksdb/wiki/Manual-Compaction#compactrange

CompactRangeOptions::exclusive_manual_compaction When set to true, no other compaction will run when this manual compaction is running. Default value is true

}
}

@Override
public void compactTable(String tableName, ManagedCompactRangeOptions options) throws IOException {
RocksDatabase.ColumnFamily columnFamily = db.getColumnFamily(tableName);
if (columnFamily == null) {
throw new IOException("Table not found: " + tableName);
}
db.compactRange(columnFamily, null, null, options);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should set options.setMaxCompactionBytes() when we open the snapshot rocksdb this could be useful so that we ensure one sub compaction doesn't take up a lot of memory. Also look into making ManagedCompactRangeOptions.setMaxSubCompactions() configurable so that we don't use a lot of CPU for this operation. It is ok if the compactions take time.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

}

@Override
public void close() throws IOException {
if (metrics != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,23 @@ public void compactDB() throws Exception {

}

@Test
public void compactTable() throws Exception {
assertNotNull(rdbStore, "DBStore cannot be null");

for (int j = 0; j <= 20; j++) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have an interesting observation here.

When I play with the number of insertions. I noticed that metaSizeBeforeCompact always fluctuates between 2 to 5.

It looks like once it is about to reach 6, it does some compaction before the new insertion.

insertRandomData(rdbStore, 0);
rdbStore.flushDB();
}

int metaSizeBeforeCompact = rdbStore.getDb().getLiveFilesMetaDataSize();
rdbStore.compactTable(StringUtils.bytes2String(RocksDB.DEFAULT_COLUMN_FAMILY));
int metaSizeAfterCompact = rdbStore.getDb().getLiveFilesMetaDataSize();

assertThat(metaSizeAfterCompact).isLessThan(metaSizeBeforeCompact);
assertThat(metaSizeAfterCompact).isEqualTo(1);
}

@Test
public void close() throws Exception {
assertNotNull(rdbStore, "DBStore cannot be null");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -637,6 +637,13 @@ public final class OMConfigKeys {
public static final String OZONE_OM_COMPACTION_SERVICE_COLUMNFAMILIES_DEFAULT =
"keyTable,fileTable,directoryTable,deletedTable,deletedDirectoryTable,multipartInfoTable";

/**
* Configuration to enable/disable non-snapshot diff table compaction when snapshots are evicted from cache.
*/
public static final String OZONE_OM_SNAPSHOT_COMPACT_NON_SNAPSHOT_DIFF_TABLES =
"ozone.om.snapshot.compact.non.snapshot.diff.tables";
public static final boolean OZONE_OM_SNAPSHOT_COMPACT_NON_SNAPSHOT_DIFF_TABLES_DEFAULT = false;

/**
* Never constructed.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@
import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_SNAPSHOT_CACHE_CLEANUP_SERVICE_RUN_INTERVAL_DEFAULT;
import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_SNAPSHOT_CACHE_MAX_SIZE;
import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_SNAPSHOT_CACHE_MAX_SIZE_DEFAULT;
import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_SNAPSHOT_COMPACT_NON_SNAPSHOT_DIFF_TABLES;
import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_SNAPSHOT_COMPACT_NON_SNAPSHOT_DIFF_TABLES_DEFAULT;
import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_SNAPSHOT_DB_MAX_OPEN_FILES;
import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_SNAPSHOT_DB_MAX_OPEN_FILES_DEFAULT;
import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_SNAPSHOT_DIFF_CLEANUP_SERVICE_RUN_INTERVAL;
Expand Down Expand Up @@ -289,8 +291,11 @@ public OmSnapshotManager(OzoneManager ozoneManager) {
.getTimeDuration(OZONE_OM_SNAPSHOT_CACHE_CLEANUP_SERVICE_RUN_INTERVAL,
OZONE_OM_SNAPSHOT_CACHE_CLEANUP_SERVICE_RUN_INTERVAL_DEFAULT,
TimeUnit.MILLISECONDS);
boolean compactNonSnapshotDiffTables = ozoneManager.getConfiguration()
.getBoolean(OZONE_OM_SNAPSHOT_COMPACT_NON_SNAPSHOT_DIFF_TABLES,
OZONE_OM_SNAPSHOT_COMPACT_NON_SNAPSHOT_DIFF_TABLES_DEFAULT);
this.snapshotCache = new SnapshotCache(loader, softCacheSize, ozoneManager.getMetrics(),
cacheCleanupServiceInterval);
cacheCleanupServiceInterval, compactNonSnapshotDiffTables);

this.snapshotDiffManager = new SnapshotDiffManager(snapshotDiffDb, differ,
ozoneManager, snapDiffJobCf, snapDiffReportCf,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.hadoop.ozone.om.snapshot;

import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.FILE_NOT_FOUND;
import static org.apache.ozone.rocksdiff.RocksDBCheckpointDiffer.COLUMN_FAMILIES_TO_TRACK_IN_DAG;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.cache.CacheLoader;
Expand All @@ -28,6 +29,8 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.hadoop.hdds.utils.Scheduler;
import org.apache.hadoop.hdds.utils.db.Table;
import org.apache.hadoop.ozone.om.OMMetadataManager;
import org.apache.hadoop.ozone.om.OMMetrics;
import org.apache.hadoop.ozone.om.OmSnapshot;
import org.apache.hadoop.ozone.om.exceptions.OMException;
Expand Down Expand Up @@ -57,16 +60,47 @@ public class SnapshotCache implements ReferenceCountedCallback, AutoCloseable {
private final Scheduler scheduler;
private static final String SNAPSHOT_CACHE_CLEANUP_SERVICE =
"SnapshotCacheCleanupService";
private final boolean compactNonSnapshotDiffTables;

private final OMMetrics omMetrics;

private boolean shouldCompactTable(String tableName) {
return !COLUMN_FAMILIES_TO_TRACK_IN_DAG.contains(tableName);
}

/**
* Compacts the RocksDB tables in the given snapshot that are not part of the snapshot diff DAG.
* This operation is performed outside of the main snapshot operations to avoid blocking reads.
* Only tables that are not tracked in the DAG (determined by {@link #shouldCompactTable}) will be compacted.
*
* @param snapshot The OmSnapshot instance whose tables need to be compacted
* @throws IOException if there is an error accessing the metadata manager
*/
private void compactSnapshotDB(OmSnapshot snapshot) throws IOException {
if (!compactNonSnapshotDiffTables) {
return;
}
OMMetadataManager metadataManager = snapshot.getMetadataManager();
for (Table<?, ?> table : metadataManager.getStore().listTables()) {
if (shouldCompactTable(table.getName())) {
try {
metadataManager.getStore().compactTable(table.getName());
} catch (IOException e) {
LOG.warn("Failed to compact table {} in snapshot {}: {}",
table.getName(), snapshot.getSnapshotID(), e.getMessage());
}
}
}
}

public SnapshotCache(CacheLoader<UUID, OmSnapshot> cacheLoader, int cacheSizeLimit, OMMetrics omMetrics,
long cleanupInterval) {
long cleanupInterval, boolean compactNonSnapshotDiffTables) {
this.dbMap = new ConcurrentHashMap<>();
this.cacheLoader = cacheLoader;
this.cacheSizeLimit = cacheSizeLimit;
this.omMetrics = omMetrics;
this.pendingEvictionQueue = ConcurrentHashMap.newKeySet();
this.compactNonSnapshotDiffTables = compactNonSnapshotDiffTables;
if (cleanupInterval > 0) {
this.scheduler = new Scheduler(SNAPSHOT_CACHE_CLEANUP_SERVICE,
true, 1);
Expand Down Expand Up @@ -224,6 +258,16 @@ public void release(UUID key) {
void cleanup() {
if (dbMap.size() > cacheSizeLimit) {
for (UUID evictionKey : pendingEvictionQueue) {
ReferenceCounted<OmSnapshot> snapshot = dbMap.get(evictionKey);
if (snapshot != null && snapshot.getTotalRefCount() == 0) {
try {
compactSnapshotDB(snapshot.get());
} catch (IOException e) {
LOG.warn("Failed to compact snapshot DB for snapshotId {}: {}",
evictionKey, e.getMessage());
}
}

dbMap.compute(evictionKey, (k, v) -> {
pendingEvictionQueue.remove(k);
if (v == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,27 @@
package org.apache.hadoop.ozone.om.snapshot;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.fail;
import static org.mockito.Mockito.any;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

import com.google.common.cache.CacheLoader;
import java.io.IOException;
import java.util.ArrayList;
import java.util.UUID;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hadoop.hdds.utils.db.Table;
import org.apache.hadoop.ozone.om.OMMetadataManager;
import org.apache.hadoop.ozone.om.OMMetrics;
import org.apache.hadoop.ozone.om.OmSnapshot;
import org.apache.ozone.test.GenericTestUtils;
Expand Down Expand Up @@ -68,6 +77,23 @@ static void beforeAll() throws Exception {
when(omSnapshot.getSnapshotTableKey()).thenReturn(snapshotID.toString());
when(omSnapshot.getSnapshotID()).thenReturn(snapshotID);

OMMetadataManager metadataManager = mock(OMMetadataManager.class);
org.apache.hadoop.hdds.utils.db.DBStore store = mock(org.apache.hadoop.hdds.utils.db.DBStore.class);
when(omSnapshot.getMetadataManager()).thenReturn(metadataManager);
when(metadataManager.getStore()).thenReturn(store);

Table<?, ?> table1 = mock(Table.class);
Table<?, ?> table2 = mock(Table.class);
Table<?, ?> keyTable = mock(Table.class);
when(table1.getName()).thenReturn("table1");
when(table2.getName()).thenReturn("table2");
when(keyTable.getName()).thenReturn("keyTable"); // This is in COLUMN_FAMILIES_TO_TRACK_IN_DAG
ArrayList tables = new ArrayList();
tables.add(table1);
tables.add(table2);
tables.add(keyTable);
when(store.listTables()).thenReturn(tables);

return omSnapshot;
}
);
Expand All @@ -80,7 +106,7 @@ static void beforeAll() throws Exception {
void setUp() {
// Reset cache for each test case
omMetrics = OMMetrics.create();
snapshotCache = new SnapshotCache(cacheLoader, CACHE_SIZE_LIMIT, omMetrics, 50);
snapshotCache = new SnapshotCache(cacheLoader, CACHE_SIZE_LIMIT, omMetrics, 50, true);
}

@AfterEach
Expand Down Expand Up @@ -209,7 +235,7 @@ private void assertEntryExistence(UUID key, boolean shouldExist) {
void testEviction1() throws IOException, InterruptedException, TimeoutException {

final UUID dbKey1 = UUID.randomUUID();
snapshotCache.get(dbKey1);
UncheckedAutoCloseableSupplier<OmSnapshot> snapshot1 = snapshotCache.get(dbKey1);
assertEquals(1, snapshotCache.size());
assertEquals(1, omMetrics.getNumSnapshotCacheSize());
snapshotCache.release(dbKey1);
Expand Down Expand Up @@ -240,6 +266,13 @@ void testEviction1() throws IOException, InterruptedException, TimeoutException
assertEquals(1, snapshotCache.size());
assertEquals(1, omMetrics.getNumSnapshotCacheSize());
assertEntryExistence(dbKey1, false);

// Verify compaction was called on the tables
org.apache.hadoop.hdds.utils.db.DBStore store1 = snapshot1.get().getMetadataManager().getStore();
verify(store1, times(1)).compactTable("table1");
verify(store1, times(1)).compactTable("table2");
// Verify compaction was NOT called on the reserved table
verify(store1, times(0)).compactTable("keyTable");
}

@Test
Expand Down Expand Up @@ -333,4 +366,54 @@ void testEviction3WithClose() throws IOException, InterruptedException, TimeoutE
assertEquals(1, snapshotCache.size());
assertEquals(1, omMetrics.getNumSnapshotCacheSize());
}

@Test
@DisplayName("Snapshot operations not blocked during compaction")
void testSnapshotOperationsNotBlockedDuringCompaction() throws IOException, InterruptedException, TimeoutException {
omMetrics = OMMetrics.create();
snapshotCache = new SnapshotCache(cacheLoader, 1, omMetrics, 50, true);
final UUID dbKey1 = UUID.randomUUID();
UncheckedAutoCloseableSupplier<OmSnapshot> snapshot1 = snapshotCache.get(dbKey1);
assertEquals(1, snapshotCache.size());
assertEquals(1, omMetrics.getNumSnapshotCacheSize());
snapshotCache.release(dbKey1);
assertEquals(1, snapshotCache.size());
assertEquals(1, omMetrics.getNumSnapshotCacheSize());

// Simulate compaction blocking
final Semaphore compactionLock = new Semaphore(1);
final AtomicBoolean table1Compacting = new AtomicBoolean(false);
final AtomicBoolean table1CompactedFinish = new AtomicBoolean(false);
org.apache.hadoop.hdds.utils.db.DBStore store1 = snapshot1.get().getMetadataManager().getStore();
doAnswer(invocation -> {
table1Compacting.set(true);
// Simulate compaction lock
compactionLock.acquire();
table1CompactedFinish.set(true);
return null;
}).when(store1).compactTable("table1");
compactionLock.acquire();

final UUID dbKey2 = UUID.randomUUID();
snapshotCache.get(dbKey2);
assertEquals(2, snapshotCache.size());
assertEquals(2, omMetrics.getNumSnapshotCacheSize());
snapshotCache.release(dbKey2);
assertEquals(2, snapshotCache.size());
assertEquals(2, omMetrics.getNumSnapshotCacheSize());

// wait for compaction to start
GenericTestUtils.waitFor(() -> table1Compacting.get(), 50, 3000);

snapshotCache.get(dbKey1); // this should not be blocked

// wait for compaction to finish
assertFalse(table1CompactedFinish.get());
compactionLock.release();
GenericTestUtils.waitFor(() -> table1CompactedFinish.get(), 50, 3000);

verify(store1, times(1)).compactTable("table1");
verify(store1, times(1)).compactTable("table2");
verify(store1, times(0)).compactTable("keyTable");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -370,7 +370,7 @@ public void init() throws RocksDBException, IOException, ExecutionException {

omSnapshotManager = mock(OmSnapshotManager.class);
when(ozoneManager.getOmSnapshotManager()).thenReturn(omSnapshotManager);
SnapshotCache snapshotCache = new SnapshotCache(mockCacheLoader(), 10, omMetrics, 0);
SnapshotCache snapshotCache = new SnapshotCache(mockCacheLoader(), 10, omMetrics, 0, true);

when(omSnapshotManager.getActiveSnapshot(anyString(), anyString(), anyString()))
.thenAnswer(invocationOnMock -> {
Expand Down