Skip to content
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.hadoop.hdds.annotation.InterfaceStability;
import org.apache.hadoop.hdds.utils.db.cache.TableCache;
import org.apache.hadoop.hdds.utils.db.cache.TableCache.CacheType;
import org.apache.hadoop.hdds.utils.db.managed.ManagedCompactRangeOptions;
import org.apache.ozone.rocksdiff.RocksDBCheckpointDiffer;

/**
Expand Down Expand Up @@ -98,6 +99,23 @@ <KEY, VALUE> TypedTable<KEY, VALUE> getTable(
*/
void compactDB() throws IOException;

/**
* Compact the specific table.
*
* @param tableName - Name of the table to compact.
* @throws IOException on Failure
*/
void compactTable(String tableName) throws IOException;

/**
* Compact the specific table.
*
* @param tableName - Name of the table to compact.
* @param options - Options for the compact operation.
* @throws IOException on Failure
*/
void compactTable(String tableName, ManagedCompactRangeOptions options) throws IOException;

/**
* Moves a key from the Source Table to the destination Table.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,22 @@ public void compactDB() throws IOException {
}
}

@Override
public void compactTable(String tableName) throws IOException {
try (ManagedCompactRangeOptions options = new ManagedCompactRangeOptions()) {
compactTable(tableName, options);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not synchronous. Even if we call this function from snapshot cache to compact db, it wouldn't be helpful since the compaction would be cancelled. We would have to wait for all the compactions to finish before we close it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider ManagedCompactRangeOptions.setExclusiveManualCompaction(true)

https://github.com/facebook/rocksdb/wiki/Manual-Compaction

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider ManagedCompactRangeOptions.setExclusiveManualCompaction(true)

https://github.com/facebook/rocksdb/wiki/Manual-Compaction

This seems default to true.
https://github.com/facebook/rocksdb/wiki/Manual-Compaction#compactrange

CompactRangeOptions::exclusive_manual_compaction When set to true, no other compaction will run when this manual compaction is running. Default value is true

}
}

@Override
public void compactTable(String tableName, ManagedCompactRangeOptions options) throws IOException {
RocksDatabase.ColumnFamily columnFamily = db.getColumnFamily(tableName);
if (columnFamily == null) {
throw new IOException("Table not found: " + tableName);
}
db.compactRange(columnFamily, null, null, options);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should set options.setMaxCompactionBytes() when we open the snapshot rocksdb this could be useful so that we ensure one sub compaction doesn't take up a lot of memory. Also look into making ManagedCompactRangeOptions.setMaxSubCompactions() configurable so that we don't use a lot of CPU for this operation. It is ok if the compactions take time.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

}

@Override
public void close() throws IOException {
if (metrics != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,23 @@ public void compactDB() throws Exception {

}

@Test
public void compactTable() throws Exception {
assertNotNull(rdbStore, "DBStore cannot be null");

for (int j = 0; j <= 20; j++) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have an interesting observation here.

When I play with the number of insertions. I noticed that metaSizeBeforeCompact always fluctuates between 2 to 5.

It looks like once it is about to reach 6, it does some compaction before the new insertion.

insertRandomData(rdbStore, 0);
rdbStore.flushDB();
}

int metaSizeBeforeCompact = rdbStore.getDb().getLiveFilesMetaDataSize();
rdbStore.compactTable(StringUtils.bytes2String(RocksDB.DEFAULT_COLUMN_FAMILY));
int metaSizeAfterCompact = rdbStore.getDb().getLiveFilesMetaDataSize();

assertThat(metaSizeAfterCompact).isLessThan(metaSizeBeforeCompact);
assertThat(metaSizeAfterCompact).isEqualTo(1);
}

@Test
public void close() throws Exception {
assertNotNull(rdbStore, "DBStore cannot be null");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.hadoop.ozone.om.snapshot;

import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.FILE_NOT_FOUND;
import static org.apache.ozone.rocksdiff.RocksDBCheckpointDiffer.COLUMN_FAMILIES_TO_TRACK_IN_DAG;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.cache.CacheLoader;
Expand All @@ -27,6 +28,8 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.hdds.utils.Scheduler;
import org.apache.hadoop.hdds.utils.db.Table;
import org.apache.hadoop.ozone.om.OMMetadataManager;
import org.apache.hadoop.ozone.om.OMMetrics;
import org.apache.hadoop.ozone.om.OmSnapshot;
import org.apache.hadoop.ozone.om.exceptions.OMException;
Expand Down Expand Up @@ -58,6 +61,24 @@ public class SnapshotCache implements ReferenceCountedCallback, AutoCloseable {

private final OMMetrics omMetrics;

private boolean shouldCompactTable(String tableName) {
return !COLUMN_FAMILIES_TO_TRACK_IN_DAG.contains(tableName);
}

private void compactSnapshotDB(OmSnapshot snapshot) throws IOException {
OMMetadataManager metadataManager = snapshot.getMetadataManager();
for (Table<?, ?> table : metadataManager.getStore().listTables()) {
if (shouldCompactTable(table.getName())) {
try {
metadataManager.getStore().compactTable(table.getName());
} catch (IOException e) {
LOG.warn("Failed to compact table {} in snapshot {}: {}",
table.getName(), snapshot.getSnapshotID(), e.getMessage());
}
}
}
}

public SnapshotCache(CacheLoader<UUID, OmSnapshot> cacheLoader, int cacheSizeLimit, OMMetrics omMetrics,
long cleanupInterval) {
this.dbMap = new ConcurrentHashMap<>();
Expand Down Expand Up @@ -220,6 +241,8 @@ void cleanup() {
LOG.debug("Closing SnapshotId {}. It is not being referenced anymore.", k);
// Close the instance, which also closes its DB handle.
try {
compactSnapshotDB(v.get());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should perform this outside dbMap.compute since this can block snapshot reads. dbMap.compute is blocking. We can just move this above line number 235 same level as the dbMap.compute()

Copy link
Contributor

@swamirishi swamirishi May 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jojochuang @smengcl This can block other read operations. Let us perform the snapshot compaction while we still allow reads to go through. Compactions shouldn't block active snapshot operations both background services and active user reads. @peterxcli please write a test case for this as well so that we don't inadvertently block any snapshot operations.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. please take another look, thanks!

// Close the instance, which also closes its DB handle.
v.get().close();
} catch (IOException ex) {
throw new IllegalStateException("Error while closing snapshot DB.", ex);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,17 @@
import static org.junit.jupiter.api.Assertions.fail;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

import com.google.common.cache.CacheLoader;
import java.io.IOException;
import java.util.ArrayList;
import java.util.UUID;
import java.util.concurrent.TimeoutException;
import org.apache.hadoop.hdds.utils.db.Table;
import org.apache.hadoop.ozone.om.OMMetadataManager;
import org.apache.hadoop.ozone.om.OMMetrics;
import org.apache.hadoop.ozone.om.OmSnapshot;
import org.apache.ozone.test.GenericTestUtils;
Expand Down Expand Up @@ -67,6 +72,23 @@ static void beforeAll() throws Exception {
when(omSnapshot.getSnapshotTableKey()).thenReturn(snapshotID.toString());
when(omSnapshot.getSnapshotID()).thenReturn(snapshotID);

OMMetadataManager metadataManager = mock(OMMetadataManager.class);
org.apache.hadoop.hdds.utils.db.DBStore store = mock(org.apache.hadoop.hdds.utils.db.DBStore.class);
when(omSnapshot.getMetadataManager()).thenReturn(metadataManager);
when(metadataManager.getStore()).thenReturn(store);

Table<?, ?> table1 = mock(Table.class);
Table<?, ?> table2 = mock(Table.class);
Table<?, ?> keyTable = mock(Table.class);
when(table1.getName()).thenReturn("table1");
when(table2.getName()).thenReturn("table2");
when(keyTable.getName()).thenReturn("keyTable"); // This is in COLUMN_FAMILIES_TO_TRACK_IN_DAG
ArrayList tables = new ArrayList();
tables.add(table1);
tables.add(table2);
tables.add(keyTable);
when(store.listTables()).thenReturn(tables);

return omSnapshot;
}
);
Expand Down Expand Up @@ -209,7 +231,7 @@ private void assertEntryExistence(UUID key, boolean shouldExist) {
void testEviction1() throws IOException, InterruptedException, TimeoutException {

final UUID dbKey1 = UUID.randomUUID();
snapshotCache.get(dbKey1);
ReferenceCounted<OmSnapshot> snapshot1 = snapshotCache.get(dbKey1);
assertEquals(1, snapshotCache.size());
assertEquals(1, omMetrics.getNumSnapshotCacheSize());
snapshotCache.release(dbKey1);
Expand Down Expand Up @@ -240,6 +262,14 @@ void testEviction1() throws IOException, InterruptedException, TimeoutException
assertEquals(1, snapshotCache.size());
assertEquals(1, omMetrics.getNumSnapshotCacheSize());
assertEntryExistence(dbKey1, false);

// Verify compaction was called on the tables
OMMetadataManager metadataManager1 = snapshot1.get().getMetadataManager();
org.apache.hadoop.hdds.utils.db.DBStore store1 = metadataManager1.getStore();
verify(store1, times(1)).compactTable("table1");
verify(store1, times(1)).compactTable("table2");
// Verify compaction was NOT called on the reserved table
verify(store1, times(0)).compactTable("keyTable");
}

@Test
Expand Down
Loading