diff --git a/.github/workflows/check.yml b/.github/workflows/check.yml index 6d06484d9444..0891c0775197 100644 --- a/.github/workflows/check.yml +++ b/.github/workflows/check.yml @@ -136,6 +136,8 @@ env: OZONE_IMAGE: ghcr.io/apache/ozone OZONE_RUNNER_IMAGE: ghcr.io/apache/ozone-runner OZONE_VOLUME_OWNER: 1000 + GITHUB_ACTOR: ${{ github.actor }} + GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} jobs: check: diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index e8477c99ab22..3f35e274ddb5 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -34,6 +34,9 @@ env: MAVEN_ARGS: --batch-mode --settings ${{ github.workspace }}/dev-support/ci/maven-settings.xml MAVEN_OPTS: -Dhttp.keepAlive=false -Dmaven.wagon.http.pool=false -Dmaven.wagon.http.retryHandler.class=standard -Dmaven.wagon.http.retryHandler.count=3 OZONE_WITH_COVERAGE: ${{ github.event_name == 'push' }} + GITHUB_ACTOR: ${{ github.actor }} + GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} + jobs: build-info: diff --git a/dev-support/ci/maven-settings.xml b/dev-support/ci/maven-settings.xml index 43fa07bb52ba..2cecade99f49 100644 --- a/dev-support/ci/maven-settings.xml +++ b/dev-support/ci/maven-settings.xml @@ -32,4 +32,12 @@ true + + + + github + ${env.GITHUB_ACTOR} + ${env.GITHUB_TOKEN} + + diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/StringUtils.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/StringUtils.java index e2c50a2da740..eb85d8e7dc11 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/StringUtils.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/StringUtils.java @@ -97,4 +97,16 @@ public static String bytes2String(byte[] bytes) { public static byte[] string2Bytes(String str) { return str.getBytes(UTF8); } + + public static String getKeyPrefixUpperBound(String key) { + return key.substring(0, key.length() - 1) + (char)(key.charAt(key.length() - 1) + 1); + } + + public static String max(String str1, String str2) { + return str1.compareTo(str2) > 0 ? str1 : str2; + } + + public static String min(String str1, String str2) { + return str1.compareTo(str2) < 0 ? str1 : str2; + } } diff --git a/hadoop-hdds/common/src/main/resources/ozone-default.xml b/hadoop-hdds/common/src/main/resources/ozone-default.xml index 24037125f574..15ff5e5481f5 100644 --- a/hadoop-hdds/common/src/main/resources/ozone-default.xml +++ b/hadoop-hdds/common/src/main/resources/ozone-default.xml @@ -4819,4 +4819,54 @@ warm up edek cache if none of key successful on OM start up. + + + + ozone.om.range.compaction.service.enabled + false + OZONE, OM, PERFORMANCE + Enable or disable the range compaction service that compacts specific ranges of data based on tombstone ratio. This service helps improve performance and reduce storage overhead by compacting ranges with high tombstone density. + + + + ozone.om.range.compaction.service.interval + 30s + OZONE, OM, PERFORMANCE + The interval at which the range compaction service checks for ranges that need compaction. The service will scan for ranges with high tombstone density at this interval. + + + + ozone.om.range.compaction.service.max.compaction.entries + 1000000 + OZONE, OM, PERFORMANCE + The maximum number of entries that can be compacted in a single range compaction operation. This limit helps prevent long-running compaction operations that could impact system performance. + + + + ozone.om.range.compaction.service.min.tombstones + 10000 + OZONE, OM, PERFORMANCE + The minimum number of tombstones required in a range before it is considered for compaction. This threshold helps ensure that only ranges with significant tombstone density are compacted. + + + + ozone.om.range.compaction.service.ranges.per.run + 20 + OZONE, OM, PERFORMANCE + The number of ranges to process in each run of the range compaction service. This controls the granularity of compaction operations and helps balance system load. + + + + ozone.om.range.compaction.service.timeout + 10m + OZONE, OM, PERFORMANCE + The timeout for range compaction operations. If a compaction operation takes longer than this, it will be cancelled to prevent long-running operations from impacting system performance. + + + + ozone.om.range.compaction.service.tombstone.ratio + 0.3 + OZONE, OM, PERFORMANCE + The ratio of tombstones to total entries that triggers compaction of a range. When the ratio exceeds this threshold, the range will be considered for compaction to improve storage efficiency. + diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/DBStore.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/DBStore.java index 0fb91f42d90a..7f20efb9463e 100644 --- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/DBStore.java +++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/DBStore.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hdds.utils.db; import java.io.File; +import java.io.IOException; import java.util.List; import java.util.Map; import org.apache.hadoop.hdds.annotation.InterfaceStability; @@ -26,6 +27,7 @@ import org.apache.hadoop.hdds.utils.db.managed.ManagedCompactRangeOptions; import org.apache.ozone.rocksdiff.RocksDBCheckpointDiffer; import org.apache.ratis.util.UncheckedAutoCloseable; +import org.rocksdb.TableProperties; /** * The DBStore interface provides the ability to create Tables, which store @@ -108,6 +110,33 @@ TypedTable getTable( */ void compactTable(String tableName, ManagedCompactRangeOptions options) throws RocksDatabaseException; + /** + * Compact a range of keys in the database. + * + * @param tableName - The table name to compact. + * @param startKey - The starting key of the range to compact. + * @param endKey - The ending key of the range to compact. + * @throws IOException on Failure + */ + void compactTable(String tableName, String startKey, String endKey) throws IOException; + + void compactTable(String tableName, String startKey, String endKey, + ManagedCompactRangeOptions options) throws IOException; + + /** + * Get the properties of a column family in a range. + * @param tableName - The table name to get the properties of. + * @param startKey - The starting key of the range. + * @param endKey - The ending key of the range. + * @return - The properties of the column family in the range. + * @throws IOException on Failure + */ + Map getPropertiesOfTableInRange(String tableName, String startKey, + String endKey) throws IOException; + + Map getPropertiesOfTableInRange(String tableName, List ranges) + throws IOException; + /** * Returns an estimated count of keys in this DB. * diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/KeyRange.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/KeyRange.java new file mode 100644 index 000000000000..103707e29e14 --- /dev/null +++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/KeyRange.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdds.utils.db; + +import org.apache.hadoop.hdds.StringUtils; + +/** + * Represents a range of keys for compaction. + */ +public class KeyRange { + private final String startKey; + private final String endKey; + + public KeyRange(String startKey, String endKey) { + this.startKey = startKey; + this.endKey = endKey; + } + + public KeyRange(byte[] startKey, byte[] endKey) { + this.startKey = StringUtils.bytes2String(startKey); + this.endKey = StringUtils.bytes2String(endKey); + } + + public String getStartKey() { + return startKey; + } + + public String getEndKey() { + return endKey; + } + + @Override + public String toString() { + return "KeyRange{" + + "startKey='" + startKey + '\'' + + ", endKey='" + endKey + '\'' + + '}'; + } +} diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBStore.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBStore.java index e3853a84211c..5d7e13c0afe9 100644 --- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBStore.java +++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBStore.java @@ -34,9 +34,11 @@ import java.nio.file.Paths; import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Set; +import org.apache.hadoop.hdds.StringUtils; import org.apache.hadoop.hdds.conf.ConfigurationSource; import org.apache.hadoop.hdds.utils.IOUtils; import org.apache.hadoop.hdds.utils.RocksDBStoreMetrics; @@ -44,12 +46,15 @@ import org.apache.hadoop.hdds.utils.db.cache.TableCache; import org.apache.hadoop.hdds.utils.db.managed.ManagedCompactRangeOptions; import org.apache.hadoop.hdds.utils.db.managed.ManagedDBOptions; +import org.apache.hadoop.hdds.utils.db.managed.ManagedSlice; import org.apache.hadoop.hdds.utils.db.managed.ManagedStatistics; import org.apache.hadoop.hdds.utils.db.managed.ManagedTransactionLogIterator; import org.apache.hadoop.hdds.utils.db.managed.ManagedWriteOptions; import org.apache.ozone.rocksdiff.RocksDBCheckpointDiffer; import org.apache.ozone.rocksdiff.RocksDBCheckpointDiffer.RocksDBCheckpointDifferHolder; +import org.rocksdb.Range; import org.rocksdb.RocksDBException; +import org.rocksdb.TableProperties; import org.rocksdb.TransactionLogIterator.BatchResult; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -220,6 +225,15 @@ public void compactTable(String tableName) throws RocksDatabaseException { } } + @Override + public void compactTable(String tableName, String startKey, String endKey) throws IOException { + LOG.info("Starting table compaction for table: {}, range: [{} - {}]", tableName, startKey, endKey); + try (ManagedCompactRangeOptions options = new ManagedCompactRangeOptions()) { + compactTable(tableName, startKey, endKey, options); + } + LOG.info("Completed table compaction for table: {}, range: [{} - {}]", tableName, startKey, endKey); + } + @Override public void compactTable(String tableName, ManagedCompactRangeOptions options) throws RocksDatabaseException { RocksDatabase.ColumnFamily columnFamily = db.getColumnFamily(tableName); @@ -229,6 +243,67 @@ public void compactTable(String tableName, ManagedCompactRangeOptions options) t db.compactRange(columnFamily, null, null, options); } + @Override + public void compactTable(String tableName, String startKey, String endKey, + ManagedCompactRangeOptions options) throws IOException { + LOG.info("Starting table compaction with options for table: {}, range: [{} - {}]", tableName, startKey, endKey); + ColumnFamily columnFamily = db.getColumnFamily(tableName); + if (columnFamily == null) { + LOG.error("Table not found for compaction: {}", tableName); + throw new IOException("No such table in this DB. TableName : " + tableName); + } + db.compactRange(columnFamily, StringUtils.string2Bytes(startKey), + StringUtils.string2Bytes(endKey), options); + LOG.info("Completed table compaction with options for table: {}, range: [{} - {}]", tableName, startKey, endKey); + } + + @Override + public Map getPropertiesOfTableInRange(String tableName, String startKey, + String endKey) throws IOException { + LOG.info("Getting table properties for table: {}, range: [{} - {}]", tableName, startKey, endKey); + Map result = getPropertiesOfTableInRange(tableName, + Collections.singletonList(new KeyRange(startKey, endKey))); + LOG.info("Retrieved {} table properties for table: {}, range: [{} - {}]", + result.size(), tableName, startKey, endKey); + return result; + } + + @Override + public Map getPropertiesOfTableInRange(String tableName, + List ranges) throws IOException { + LOG.info("Getting table properties for table: {}, number of ranges: {}", tableName, ranges.size()); + ColumnFamily columnFamily = db.getColumnFamily(tableName); + if (columnFamily == null) { + LOG.error("Table not found for getting properties: {}", tableName); + throw new IOException("No such table in this DB. TableName : " + tableName); + } + + List rocksRanges = new ArrayList<>(); + List managedSlices = new ArrayList<>(); + try { + for (KeyRange t : ranges) { + ManagedSlice start = new ManagedSlice(StringUtils.string2Bytes(t.getStartKey())); + ManagedSlice end = new ManagedSlice(StringUtils.string2Bytes(t.getEndKey())); + managedSlices.add(start); + managedSlices.add(end); + rocksRanges.add(new Range(start, end)); + } + LOG.info("Converted {} ranges to RocksDB ranges, start to get properties", rocksRanges.size()); + return db.getPropertiesOfColumnFamilyInRange(columnFamily, rocksRanges); + } catch (RocksDatabaseException e) { + throw new IOException("Failed to get properties of table in range", e); + } finally { + // Close all ManagedSlice objects + for (ManagedSlice slice : managedSlices) { + try { + slice.close(); + } catch (Exception e) { + LOG.warn("Failed to close ManagedSlice", e); + } + } + } + } + @Override public void close() { if (metrics != null) { diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RocksDatabase.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RocksDatabase.java index 24f1971dccbe..1f53c83c6c64 100644 --- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RocksDatabase.java +++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RocksDatabase.java @@ -63,7 +63,9 @@ import org.rocksdb.Holder; import org.rocksdb.KeyMayExist; import org.rocksdb.LiveFileMetaData; +import org.rocksdb.Range; import org.rocksdb.RocksDBException; +import org.rocksdb.TableProperties; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -891,6 +893,16 @@ public void deleteFilesNotMatchingPrefix(Map prefixPairs) throws } } + public Map getPropertiesOfColumnFamilyInRange(ColumnFamily columnFamily, + List ranges) throws RocksDatabaseException { + try (UncheckedAutoCloseable ignored = acquire()) { + return db.get().getPropertiesOfTablesInRange(columnFamily.getHandle(), ranges); + } catch (RocksDBException e) { + closeOnError(e); + throw toRocksDatabaseException(this, "getPropertiesOfColumnFamilyInRange", e); + } + } + @Override protected void finalize() throws Throwable { if (!isClosed()) { diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/Table.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/Table.java index 7f5d74ad4ee4..c8432f90e90f 100644 --- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/Table.java +++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/Table.java @@ -265,6 +265,14 @@ default void cleanupCache(List epochs) { throw new NotImplementedException("cacheIterator is not implemented"); } + /** + * Return cache iterator maintained for this table. + */ + default Iterator, CacheValue>> + cacheIterator(KEY startKey) { + throw new NotImplementedException("cacheIterator with startKey is not implemented"); + } + /** * Create the metrics datasource that emits table cache metrics. */ diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/TypedTable.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/TypedTable.java index cd02c91ecb30..4f99596fc68b 100644 --- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/TypedTable.java +++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/TypedTable.java @@ -436,6 +436,11 @@ public Iterator, CacheValue>> cacheIterator() { return cache.iterator(); } + @Override + public Iterator, CacheValue>> cacheIterator(KEY startKey) { + return cache.iterator(startKey); + } + @Override public TableCacheMetrics createCacheMetrics() { return TableCacheMetrics.create(cache, getName()); diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/cache/FullTableCache.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/cache/FullTableCache.java index 2f60d0cf4d24..3badd4d7b0ec 100644 --- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/cache/FullTableCache.java +++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/cache/FullTableCache.java @@ -53,7 +53,7 @@ public class FullTableCache implements TableCache { private static final Logger LOG = LoggerFactory.getLogger(FullTableCache.class); - private final Map, CacheValue> cache; + private final NavigableMap, CacheValue> cache; private final NavigableMap>> epochEntries; private final ScheduledExecutorService executorService; private final Queue epochCleanupQueue = new ConcurrentLinkedQueue<>(); @@ -150,6 +150,15 @@ public Iterator, CacheValue>> iterator() { return cache.entrySet().iterator(); } + @Override + public Iterator, CacheValue>> iterator(KEY startKey) { + statsRecorder.recordIteration(); + if (startKey == null) { + return cache.entrySet().iterator(); + } + return cache.tailMap(new CacheKey<>(startKey)).entrySet().iterator(); + } + @VisibleForTesting @Override public void evictCache(List epochs) { diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/cache/PartialTableCache.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/cache/PartialTableCache.java index 9b7240364947..0c9fc528504e 100644 --- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/cache/PartialTableCache.java +++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/cache/PartialTableCache.java @@ -19,6 +19,7 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.util.concurrent.ThreadFactoryBuilder; +import java.util.Collections; import java.util.HashSet; import java.util.Iterator; import java.util.List; @@ -117,6 +118,11 @@ public Iterator, CacheValue>> iterator() { return cache.entrySet().iterator(); } + @Override + public Iterator, CacheValue>> iterator(KEY startKey) { + return Collections.emptyIterator(); + } + @VisibleForTesting @Override public void evictCache(List epochs) { diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/cache/TableCache.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/cache/TableCache.java index 5d3782f76e16..24bdfefef7ae 100644 --- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/cache/TableCache.java +++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/cache/TableCache.java @@ -78,6 +78,8 @@ public interface TableCache { */ Iterator, CacheValue>> iterator(); + Iterator, CacheValue>> iterator(KEY startKey); + /** * Check key exist in cache or not. * diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/cache/TableNoCache.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/cache/TableNoCache.java index b3ec98619ce6..90adf09e3946 100644 --- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/cache/TableNoCache.java +++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/cache/TableNoCache.java @@ -73,6 +73,11 @@ public Iterator, CacheValue>> iterator() { return Collections.emptyIterator(); } + @Override + public Iterator, CacheValue>> iterator(KEY startKey) { + return Collections.emptyIterator(); + } + @VisibleForTesting @Override public void evictCache(List epochs) { diff --git a/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/TestRDBStore.java b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/TestRDBStore.java index 44da33f99214..ff573e56c07a 100644 --- a/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/TestRDBStore.java +++ b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/TestRDBStore.java @@ -50,6 +50,7 @@ import org.rocksdb.RocksDB; import org.rocksdb.Statistics; import org.rocksdb.StatsLevel; +import org.rocksdb.TableProperties; /** * RDBStore Tests. @@ -403,4 +404,43 @@ private void compareSstWithSameName(File checkpoint1, File checkpoint2) } } } + + @Test + public void testGetPropertiesOfTableInRange() throws Exception { + // Insert 100 keys with predictable names into the "First" column family + String tableName = families.get(1); + Table table = rdbStore.getTable(tableName); + for (int i = 0; i < 100; i++) { + String keyStr = String.format("key%03d", i); + byte[] key = keyStr.getBytes(StandardCharsets.UTF_8); + byte[] value = ("value" + i).getBytes(StandardCharsets.UTF_8); + table.put(key, value); + // Flush periodically to make sure keys are written to SST files so that + // RocksDB can report properties. + if (i % 10 == 9) { + rdbStore.flushDB(); + } + } + + // Ensure any remaining data is flushed. + rdbStore.flushDB(); + + // Fetch table properties for the full key range we just inserted. + Map propsMap = rdbStore.getPropertiesOfTableInRange( + tableName, "key000", "key099"); + + assertNotNull(propsMap, "Properties map should not be null"); + assertFalse(propsMap.isEmpty(), "Properties map should not be empty"); + + long totalEntries = 0; + long totalDeletions = 0; + for (TableProperties props : propsMap.values()) { + totalEntries += props.getNumEntries(); + totalDeletions += props.getNumDeletions(); + } + + // We inserted exactly 100 keys and did not delete any. + assertEquals(100, totalEntries, "Total number of entries reported should match the inserted keys"); + assertEquals(0, totalDeletions, "There should be no deletion entries reported"); + } } diff --git a/hadoop-hdds/rocks-native/pom.xml b/hadoop-hdds/rocks-native/pom.xml index 0c7e8fa7e2da..d3d028c6fd91 100644 --- a/hadoop-hdds/rocks-native/pom.xml +++ b/hadoop-hdds/rocks-native/pom.xml @@ -194,7 +194,7 @@ generate-sources - https://github.com/facebook/rocksdb/archive/refs/tags/v${rocksdb.version}.tar.gz + https://github.com/peterxcli/rocksdb/archive/refs/tags/v${rocksdb.version}.tar.gz rocksdb-v${rocksdb.version}.tar.gz ${project.build.directory}/rocksdb @@ -255,6 +255,22 @@ + + + diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java index 2bfe30efab0f..218263a9dbbb 100644 --- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java +++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java @@ -661,6 +661,29 @@ public final class OMConfigKeys { "ozone.om.snapshot.compact.non.snapshot.diff.tables"; public static final boolean OZONE_OM_SNAPSHOT_COMPACT_NON_SNAPSHOT_DIFF_TABLES_DEFAULT = false; + /** + * Configuration keys for RocksDB range compaction service. + */ + public static final String OZONE_OM_RANGE_COMPACTION_SERVICE_ENABLED = "ozone.om.range.compaction.service.enabled"; + public static final boolean OZONE_OM_RANGE_COMPACTION_SERVICE_ENABLED_DEFAULT = false; + public static final String OZONE_OM_RANGE_COMPACTION_SERVICE_INTERVAL = "ozone.om.range.compaction.service.interval"; + public static final String OZONE_OM_RANGE_COMPACTION_SERVICE_INTERVAL_DEFAULT = "30s"; + public static final String OZONE_OM_RANGE_COMPACTION_SERVICE_TIMEOUT = "ozone.om.range.compaction.service.timeout"; + public static final String OZONE_OM_RANGE_COMPACTION_SERVICE_TIMEOUT_DEFAULT = "10m"; + + public static final String OZONE_OM_RANGE_COMPACTION_SERVICE_MAX_COMPACTION_ENTRIES + = "ozone.om.range.compaction.service.max.compaction.entries"; + public static final long OZONE_OM_RANGE_COMPACTION_SERVICE_MAX_COMPACTION_ENTRIES_DEFAULT = 1_000_000; + public static final String OZONE_OM_RANGE_COMPACTION_SERVICE_RANGES_PER_RUN + = "ozone.om.range.compaction.service.ranges.per.run"; + public static final int OZONE_OM_RANGE_COMPACTION_SERVICE_RANGES_PER_RUN_DEFAULT = 20; + public static final String OZONE_OM_RANGE_COMPACTION_SERVICE_TOMBSTONE_RATIO + = "ozone.om.range.compaction.service.tombstone.ratio"; + public static final double OZONE_OM_RANGE_COMPACTION_SERVICE_TOMBSTONE_RATIO_DEFAULT = 0.3; + public static final String OZONE_OM_RANGE_COMPACTION_SERVICE_MIN_TOMBSTONES + = "ozone.om.range.compaction.service.min.tombstones"; + public static final int OZONE_OM_RANGE_COMPACTION_SERVICE_MIN_TOMBSTONES_DEFAULT = 10_000; + /** * Never constructed. */ diff --git a/hadoop-ozone/freon/src/main/java/org/apache/hadoop/ozone/freon/OmKeyBenchGenerator.java b/hadoop-ozone/freon/src/main/java/org/apache/hadoop/ozone/freon/OmKeyBenchGenerator.java new file mode 100644 index 000000000000..d3883de7ac98 --- /dev/null +++ b/hadoop-ozone/freon/src/main/java/org/apache/hadoop/ozone/freon/OmKeyBenchGenerator.java @@ -0,0 +1,314 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.freon; + +import static java.util.Collections.emptyMap; + +import com.codahale.metrics.MetricFilter; +import com.codahale.metrics.Timer; +import java.io.OutputStream; +import java.time.Duration; +import java.time.Instant; +import java.util.Arrays; +import java.util.EnumMap; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.ConcurrentSkipListSet; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.atomic.LongAdder; +import java.util.function.Supplier; +import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.hdds.cli.HddsVersionProvider; +import org.apache.hadoop.hdds.client.ReplicationConfig; +import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdds.conf.StorageSize; +import org.apache.hadoop.ozone.client.OzoneBucket; +import org.apache.hadoop.ozone.client.OzoneClient; +import org.apache.hadoop.ozone.om.exceptions.OMException; +import org.apache.hadoop.ozone.om.helpers.OmKeyArgs; +import org.apache.hadoop.ozone.om.protocol.OzoneManagerProtocol; +import org.kohsuke.MetaInfServices; +import picocli.CommandLine.Command; +import picocli.CommandLine.Mixin; +import picocli.CommandLine.Option; + +/** + * Key‑level benchmark that supports CREATE, DELETE and LIST with user‑defined + * weights. It now keeps a concurrent set of *live* keys so DELETE/LIST always + * operate on existing entries; when the set is empty those two ops are skipped. + */ +@Command(name = "omkeybench", + description = "Generate CREATE/DELETE/LIST load toward OM with weighted mix.", + versionProvider = HddsVersionProvider.class, + mixinStandardHelpOptions = true, + showDefaultValues = true) +@MetaInfServices(FreonSubcommand.class) +public class OmKeyBenchGenerator extends BaseFreonGenerator implements Runnable { + + // ============ CLI ============ + + @Option(names = { "-v", "--volume" }, defaultValue = "vol1", description = "Volume name (created if absent)") + private String volume; + + @Option(names = { "-b", "--bucket" }, defaultValue = "bucket1", description = "Bucket name (created if absent)") + private String bucketName; + + @Option(names = "--weights", description = "Comma‑separated weights, e.g. create:5,delete:3,list:2") + private String weightSpec = "create:1,delete:1,list:1"; + + @Option(names = { "--batch-size" }, defaultValue = "1000", description = "Batch size for LIST_KEYS request") + private int batchSize; + + @Option(names = { "-s", "--size" }, defaultValue = "0", description = "Payload size when creating a key (bytes) " + + StorageSizeConverter.STORAGE_SIZE_DESCRIPTION, converter = StorageSizeConverter.class) + private StorageSize dataSize; + + @Option(names = { "--max-live-keys" }, defaultValue = "100000", + description = "Maximum number of live keys to maintain for DELETE/LIST operations") + private int maxLiveKeys; + + @Mixin + private FreonReplicationOptions replicationOpt; + + // ------------ runtime state ------------ + private final EnumMap weights = new EnumMap<>(Op.class); + private double pCreate; // cumulative boundaries in [0,1) + private double pDelete; + + private OzoneBucket bucket; + private ReplicationConfig replConf; + private OzoneManagerProtocol om; + + private ContentGenerator contentGen; + private final LongAdder createdCounter = new LongAdder(); + private final LongAdder deletedCounter = new LongAdder(); + + /** Set of currently *live* keys for quick random sampling. */ + private final ConcurrentSkipListSet liveKeys = new ConcurrentSkipListSet<>(); + + @Override + public void run() { + try { + parseWeights(); + init(); + contentGen = new ContentGenerator(dataSize.toBytes(), 4096); + + OzoneConfiguration conf = createOzoneConfiguration(); + replConf = replicationOpt.fromParamsOrConfig(conf); + + try (OzoneClient client = createOzoneClient(null, conf)) { + ensureVolumeAndBucketExist(client, volume, bucketName); + bucket = client.getObjectStore().getVolume(volume).getBucket(bucketName); + om = createOmClient(conf, null); + runTests(this::oneIteration); + } finally { + if (om != null) { + om.close(); + } + } + } catch (Exception ex) { + throw new RuntimeException(ex); + } + } + + /* ---------------- weight parsing ---------------- */ + + private void parseWeights() { + Arrays.stream(weightSpec.split(",")) + .forEach(pair -> { + String[] kv = pair.trim().split(":"); + if (kv.length != 2) { + throw new IllegalArgumentException("Bad --weights element: " + pair); + } + Op op = Op.valueOf(kv[0].toUpperCase()); + int w = Integer.parseInt(kv[1]); + if (w < 0) { + throw new IllegalArgumentException("Negative weight: " + pair); + } + weights.put(op, w); + }); + if (weights.isEmpty()) { + throw new IllegalArgumentException("No weights specified"); + } + int total = weights.values().stream().mapToInt(Integer::intValue).sum(); + pCreate = weights.getOrDefault(Op.CREATE, 0) / (double) total; + pDelete = pCreate + weights.getOrDefault(Op.DELETE, 0) / (double) total; + } + + /* ---------------- main loop ---------------- */ + + private void oneIteration(long globalCounter) throws Exception { + double r = ThreadLocalRandom.current().nextDouble(); + Op op = (r < pCreate) ? Op.CREATE : (r < pDelete ? Op.DELETE : Op.LIST); + + switch (op) { + case CREATE: + createKey(globalCounter); + break; + case DELETE: + deleteRandomKey(); + break; + case LIST: + listRandom(); + break; + default: + throw new IllegalStateException(); + } + } + + /* ---------------- operations ---------------- */ + + /** + * Creates a key and optionally adds it to the liveKeys set for future DELETE/LIST operations. + * When the liveKeys set reaches maxLiveKeys limit, new keys are still created but not added + * to the set, preventing performance degradation from large set operations. + */ + private void createKey(long counter) throws Exception { + String key = formatKey(counter); + Timer timer = getMetrics().timer(Op.CREATE.name()); + timer.time(() -> { + try (OutputStream os = bucket.createKey(key, dataSize.toBytes(), replConf, emptyMap())) { + contentGen.write(os); + } + createdCounter.increment(); + + // Only add to liveKeys if we haven't reached the limit + if (liveKeys.size() < maxLiveKeys) { + liveKeys.add(key); + } + return null; + }); + } + + private void deleteRandomKey() throws Exception { + if (liveKeys.isEmpty()) { + return; // nothing to delete now + } + String key = pickRandomKey(); + if (key == null) { + return; // race condition; skip + } + OmKeyArgs keyArgs = new OmKeyArgs.Builder() + .setVolumeName(volume) + .setBucketName(bucketName) + .setKeyName(key) + .build(); + Timer timer = getMetrics().timer(Op.DELETE.name()); + timer.time(() -> { + try { + om.deleteKey(keyArgs); + if (liveKeys.remove(key)) { + deletedCounter.increment(); + } + } catch (OMException ex) { + if (ex.getResult() != OMException.ResultCodes.KEY_NOT_FOUND) { + throw ex; + } + } + return null; + }); + } + + private void listRandom() throws Exception { + if (liveKeys.isEmpty()) { + return; // nothing to list + } + String start = pickRandomKey(); + if (start == null) { + return; + } + Timer timer = getMetrics().timer(Op.LIST.name()); + timer.time(() -> { + om.listKeys(volume, bucketName, start, "", batchSize); + return null; // ignore size check when data sparse + }); + } + /* ---------------- helpers ---------------- */ + + private String pickRandomKey() { + int size = liveKeys.size(); + if (size == 0) { + return null; + } + + // Convert to array for O(1) random access - more efficient than stream().skip() + String[] keysArray = liveKeys.toArray(new String[0]); + if (keysArray.length == 0) { + return null; // Race condition check + } + + int index = ThreadLocalRandom.current().nextInt(keysArray.length); + return keysArray[index]; + } + + private static String formatKey(long n) { + return StringUtils.leftPad(Long.toString(n), 19, '0'); + } + + @Override + public Supplier realTimeStatusSupplier() { + final Map maxValueRecorder = new HashMap<>(); + final Map valueRecorder = new HashMap<>(); + final Map instantsRecorder = new HashMap<>(); + return () -> { + StringBuilder sb = new StringBuilder(); + int currentLiveKeys = liveKeys.size(); + sb.append(String.format("live=%d/%d created=%d deleted=%d", currentLiveKeys, maxLiveKeys, + createdCounter.sum(), deletedCounter.sum())); + + if (currentLiveKeys >= maxLiveKeys) { + sb.append(" [LIMIT_REACHED]"); + } + + // Add rate information for each operation type + for (Map.Entry entry + : getMetrics().getTimers(MetricFilter.ALL).entrySet()) { + String name = entry.getKey(); + long maxValue = maxValueRecorder.getOrDefault(name, -1L); + long preValue = valueRecorder.getOrDefault(name, 0L); + Instant preInstant = instantsRecorder.getOrDefault(name, Instant.now()); + + long curValue = entry.getValue().getCount(); + Instant now = Instant.now(); + long duration = Duration.between(preInstant, now).getSeconds(); + long rate = ((curValue - preValue) / (duration == 0 ? 1 : duration)); + maxValue = Math.max(rate, maxValue); + + maxValueRecorder.put(name, maxValue); + valueRecorder.put(name, curValue); + instantsRecorder.put(name, now); + sb.append(' ') + .append(name) + .append(": rate ") + .append(rate) + .append(" max ") + .append(maxValue); + } + return sb.toString(); + }; + } + + @Override + public boolean allowEmptyPrefix() { + return true; + } + + enum Op { + CREATE, DELETE, LIST + } +} diff --git a/hadoop-ozone/interface-storage/src/main/java/org/apache/hadoop/ozone/om/OMMetadataManager.java b/hadoop-ozone/interface-storage/src/main/java/org/apache/hadoop/ozone/om/OMMetadataManager.java index ec9e34cec720..b43392af5c12 100644 --- a/hadoop-ozone/interface-storage/src/main/java/org/apache/hadoop/ozone/om/OMMetadataManager.java +++ b/hadoop-ozone/interface-storage/src/main/java/org/apache/hadoop/ozone/om/OMMetadataManager.java @@ -533,6 +533,9 @@ List getMultipartUploadKeys(String volumeName, Iterator, CacheValue>> getBucketIterator(); + Iterator, CacheValue>> + getBucketIterator(String startKey); + TableIterator> getKeyIterator() throws IOException; diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java index 809974d9d7b4..60158c0c9258 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java @@ -56,6 +56,12 @@ import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_OPEN_KEY_CLEANUP_SERVICE_INTERVAL_DEFAULT; import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_OPEN_KEY_CLEANUP_SERVICE_TIMEOUT; import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_OPEN_KEY_CLEANUP_SERVICE_TIMEOUT_DEFAULT; +import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_RANGE_COMPACTION_SERVICE_ENABLED; +import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_RANGE_COMPACTION_SERVICE_ENABLED_DEFAULT; +import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_RANGE_COMPACTION_SERVICE_INTERVAL; +import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_RANGE_COMPACTION_SERVICE_INTERVAL_DEFAULT; +import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_RANGE_COMPACTION_SERVICE_TIMEOUT; +import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_RANGE_COMPACTION_SERVICE_TIMEOUT_DEFAULT; import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_SNAPSHOT_DEEP_CLEANING_ENABLED; import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_SNAPSHOT_DEEP_CLEANING_ENABLED_DEFAULT; import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_SNAPSHOT_SST_FILTERING_SERVICE_INTERVAL; @@ -167,6 +173,7 @@ import org.apache.hadoop.ozone.om.service.KeyDeletingService; import org.apache.hadoop.ozone.om.service.MultipartUploadCleanupService; import org.apache.hadoop.ozone.om.service.OpenKeyCleanupService; +import org.apache.hadoop.ozone.om.service.RangeCompactionService; import org.apache.hadoop.ozone.om.service.SnapshotDeletingService; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.ExpiredMultipartUploadsBucket; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.PartKeyInfo; @@ -210,6 +217,7 @@ public class KeyManagerImpl implements KeyManager { private BackgroundService multipartUploadCleanupService; private DNSToSwitchMapping dnsToSwitchMapping; private CompactionService compactionService; + private RangeCompactionService rangeCompactionService; public KeyManagerImpl(OzoneManager om, ScmClient scmClient, OzoneConfiguration conf, OMPerformanceMetrics metrics) { @@ -243,6 +251,10 @@ public void start(OzoneConfiguration configuration) { OZONE_OM_COMPACTION_SERVICE_ENABLED_DEFAULT); startCompactionService(configuration, isCompactionEnabled); + boolean isRangeCompactionEnabled = configuration.getBoolean(OZONE_OM_RANGE_COMPACTION_SERVICE_ENABLED, + OZONE_OM_RANGE_COMPACTION_SERVICE_ENABLED_DEFAULT); + startRangeCompactionService(configuration, isRangeCompactionEnabled); + boolean isSnapshotDeepCleaningEnabled = configuration.getBoolean(OZONE_SNAPSHOT_DEEP_CLEANING_ENABLED, OZONE_SNAPSHOT_DEEP_CLEANING_ENABLED_DEFAULT); if (keyDeletingService == null) { @@ -412,6 +424,23 @@ private void startCompactionService(OzoneConfiguration configuration, } } + private void startRangeCompactionService(OzoneConfiguration configuration, + boolean isRangeCompactionEnabled) { + if (rangeCompactionService == null && isRangeCompactionEnabled) { + long serviceIntervalMs = configuration.getTimeDuration( + OZONE_OM_RANGE_COMPACTION_SERVICE_INTERVAL, + OZONE_OM_RANGE_COMPACTION_SERVICE_INTERVAL_DEFAULT, + TimeUnit.MILLISECONDS); + long serviceTimeoutMs = configuration.getTimeDuration( + OZONE_OM_RANGE_COMPACTION_SERVICE_TIMEOUT, + OZONE_OM_RANGE_COMPACTION_SERVICE_TIMEOUT_DEFAULT, + TimeUnit.MILLISECONDS); + rangeCompactionService = new RangeCompactionService(configuration, ozoneManager, + serviceIntervalMs, serviceTimeoutMs); + rangeCompactionService.start(); + } + } + KeyProviderCryptoExtension getKMSProvider() { return kmsProvider; } @@ -446,6 +475,10 @@ public void stop() throws IOException { compactionService.shutdown(); compactionService = null; } + if (rangeCompactionService != null) { + rangeCompactionService.shutdown(); + rangeCompactionService = null; + } } private OmBucketInfo getBucketInfo(String volumeName, String bucketName) diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java index 04e0998219fa..c33e42a92704 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java @@ -914,6 +914,12 @@ public List listBuckets(final String volumeName, return bucketTable.cacheIterator(); } + @Override + public Iterator, CacheValue>> + getBucketIterator(String startKey) { + return bucketTable.cacheIterator(startKey); + } + @Override public TableIterator> getKeyIterator() throws IOException { diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/codec/OMDBDefinition.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/codec/OMDBDefinition.java index 9894e8f5d6bf..f44551a0e477 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/codec/OMDBDefinition.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/codec/OMDBDefinition.java @@ -18,6 +18,8 @@ package org.apache.hadoop.ozone.om.codec; import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; import java.util.List; import java.util.Map; import org.apache.hadoop.hdds.utils.TransactionInfo; @@ -28,6 +30,7 @@ import org.apache.hadoop.hdds.utils.db.StringCodec; import org.apache.hadoop.ozone.OzoneConsts; import org.apache.hadoop.ozone.om.OMConfigKeys; +import org.apache.hadoop.ozone.om.helpers.BucketLayout; import org.apache.hadoop.ozone.om.helpers.OmBucketInfo; import org.apache.hadoop.ozone.om.helpers.OmDBAccessIdInfo; import org.apache.hadoop.ozone.om.helpers.OmDBTenantState; @@ -341,6 +344,27 @@ public final class OMDBDefinition extends DBDefinition.WithMap { USER_TABLE_DEF, VOLUME_TABLE_DEF); + /** + * Mapping from table names to their corresponding bucket layouts. + * This is used to determine which type of compactor to use for each table. + */ + private static final Map TABLE_BUCKET_LAYOUT_MAP; + + static { + Map map = new HashMap<>(); + // FSO tables + map.put(FILE_TABLE_DEF.getName(), BucketLayout.FILE_SYSTEM_OPTIMIZED); + map.put(OPEN_FILE_TABLE_DEF.getName(), BucketLayout.FILE_SYSTEM_OPTIMIZED); + map.put(DIRECTORY_TABLE_DEF.getName(), BucketLayout.FILE_SYSTEM_OPTIMIZED); + map.put(DELETED_DIR_TABLE_DEF.getName(), BucketLayout.FILE_SYSTEM_OPTIMIZED); + // OBS/Legacy tables + map.put(KEY_TABLE_DEF.getName(), BucketLayout.OBJECT_STORE); + map.put(DELETED_TABLE_DEF.getName(), BucketLayout.OBJECT_STORE); + map.put(OPEN_KEY_TABLE_DEF.getName(), BucketLayout.OBJECT_STORE); + map.put(MULTIPART_INFO_TABLE_DEF.getName(), BucketLayout.OBJECT_STORE); + TABLE_BUCKET_LAYOUT_MAP = Collections.unmodifiableMap(map); + } + private static final OMDBDefinition INSTANCE = new OMDBDefinition(); public static OMDBDefinition get() { @@ -361,6 +385,15 @@ public String getLocationConfigKey() { return OMConfigKeys.OZONE_OM_DB_DIRS; } + /** + * Get the bucket layout for a given table name. + * @param tableName The name of the table + * @return The bucket layout for the table, or null if not found + */ + public static BucketLayout getBucketLayoutForTable(String tableName) { + return TABLE_BUCKET_LAYOUT_MAP.get(tableName); + } + public static List getAllColumnFamilies() { List columnFamilies = new ArrayList<>(); COLUMN_FAMILIES.values().forEach(cf -> { diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/compaction/AbstractCompactor.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/compaction/AbstractCompactor.java new file mode 100644 index 000000000000..0321d74584f9 --- /dev/null +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/compaction/AbstractCompactor.java @@ -0,0 +1,157 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.om.compaction; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import org.apache.hadoop.hdds.utils.BackgroundTask; +import org.apache.hadoop.hdds.utils.BackgroundTaskQueue; +import org.apache.hadoop.hdds.utils.BackgroundTaskResult; +import org.apache.hadoop.hdds.utils.db.DBStore; +import org.apache.hadoop.hdds.utils.db.KeyRange; +import org.apache.hadoop.hdds.utils.db.managed.ManagedCompactRangeOptions; +import org.apache.hadoop.ozone.om.OMMetadataManager; +import org.rocksdb.TableProperties; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Abstract base class for compactors that provides common functionality. + */ +public abstract class AbstractCompactor implements Compactor { + private static final Logger LOG = LoggerFactory.getLogger(AbstractCompactor.class); + + private OMMetadataManager metadataManager; + private DBStore dbStore; + private String tableName; + private BackgroundTaskQueue compactRangeQueue; + + private long maxCompactionEntries; + private int minTombstones; + private double tombstoneRatio; + private int rangesPerRun; + + protected AbstractCompactor(CompactorBuilder builder) { + this.tableName = builder.getTableName(); + this.maxCompactionEntries = builder.getMaxCompactionEntries(); + this.minTombstones = builder.getMinTombstones(); + this.rangesPerRun = builder.getRangesPerRun(); + this.tombstoneRatio = builder.getTombstoneRatio(); + this.metadataManager = builder.getMetadataManager(); + this.dbStore = builder.getDBStore(); + this.compactRangeQueue = builder.getCompactRangeQueue(); + } + + @Override + public String getTableName() { + return tableName; + } + + @Override + public void compactRange(KeyRange range) { + try (ManagedCompactRangeOptions options = new ManagedCompactRangeOptions()) { + LOG.info("Compacting range {} for table {}", range, tableName); + options.setBottommostLevelCompaction(ManagedCompactRangeOptions.BottommostLevelCompaction.kForce); + options.setExclusiveManualCompaction(true); + dbStore.compactTable(tableName, range.getStartKey(), range.getEndKey(), options); + } catch (Exception e) { + LOG.error("Failed to compact range {} for table {}", range, tableName, e); + } + } + + protected KeyRangeStats getRangeStats(KeyRange range) { + try { + Map properties = dbStore.getPropertiesOfTableInRange( + tableName, Collections.singletonList(range)); + + KeyRangeStats stats = new KeyRangeStats(0, 0); + for (TableProperties prop : properties.values()) { + stats.add(KeyRangeStats.fromTableProperties(prop)); + } + return stats; + } catch (Exception e) { + LOG.error("Failed to get range stats for range {} in table {}", range, tableName, e); + return new KeyRangeStats(0, 0); + } + } + + @Override + public List getRangesNeedingCompaction() { + List ranges = new ArrayList<>(); + collectRangesNeedingCompaction(ranges); + return ranges; + } + + protected abstract void collectRangesNeedingCompaction(List ranges); + + protected OMMetadataManager getMetadataManager() { + return metadataManager; + } + + protected long getMaxCompactionEntries() { + return maxCompactionEntries; + } + + protected int getRangesPerRun() { + return rangesPerRun; + } + + protected int getMinTombstones() { + return minTombstones; + } + + protected double getTombstoneRatio() { + return tombstoneRatio; + } + + protected DBStore getDBStore() { + return dbStore; + } + + protected void addRangeCompactionTask(KeyRange range) { + LOG.info("Adding range compaction task for range {} in table {}, compactRangeQueue size: {}", + range, tableName, compactRangeQueue.size()); + compactRangeQueue.add(new CompactionTask(range)); + LOG.info("Added range compaction task for range {} in table {}, compactRangeQueue size: {}", + range, tableName, compactRangeQueue.size()); + } + + private class CompactionTask implements BackgroundTask { + private final KeyRange range; + + CompactionTask(KeyRange range) { + this.range = range; + } + + @Override + public int getPriority() { + // TODO: Implement priority based on some criteria + return 0; + } + + @Override + public BackgroundTaskResult call() throws Exception { + LOG.debug("Running compaction for range {} in table {}", range, tableName); + + compactRange(range); + return () -> 1; + } + } +} diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/compaction/Compactor.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/compaction/Compactor.java new file mode 100644 index 000000000000..fba9d0106e06 --- /dev/null +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/compaction/Compactor.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.om.compaction; + +import java.util.List; +import org.apache.hadoop.hdds.utils.db.KeyRange; + +/** + * Interface for compactors that handle RocksDB compaction. + */ +public interface Compactor extends Runnable { + /** + * Get the table this compactor is responsible for. + */ + String getTableName(); + + /** + * Check if a range needs compaction based on its statistics. + */ + default boolean needsCompaction(KeyRangeStats stats, int minTombstones, double tombstoneRatio) { + if (stats.getNumDeletion() < minTombstones) { + return false; + } + return stats.getTombstoneRatio() >= tombstoneRatio; + } + + /** + * Compact a specific range. + */ + void compactRange(KeyRange range); + + /** + * Get the ranges that need compaction. + */ + List getRangesNeedingCompaction(); +} diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/compaction/CompactorBuilder.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/compaction/CompactorBuilder.java new file mode 100644 index 000000000000..5e674de13e85 --- /dev/null +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/compaction/CompactorBuilder.java @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.om.compaction; + +import java.io.IOException; +import org.apache.hadoop.hdds.utils.BackgroundTaskQueue; +import org.apache.hadoop.hdds.utils.db.DBStore; +import org.apache.hadoop.ozone.om.OMMetadataManager; +import org.apache.hadoop.ozone.om.codec.OMDBDefinition; +import org.apache.hadoop.ozone.om.helpers.BucketLayout; + +/** + * Builder class for creating compactors based on table names. + */ +public class CompactorBuilder { + private DBStore db; + private BackgroundTaskQueue compactRangeQueue; + private String tableName; + private long maxCompactionEntries; + private int minTombstones; + private double tombstoneRatio; + private int rangesPerRun; + + private OMMetadataManager metadataManager; + + public CompactorBuilder setCompactRangeQueue(BackgroundTaskQueue compactRangeQueue) { + this.compactRangeQueue = compactRangeQueue; + return this; + } + + public CompactorBuilder setMetadataManager(OMMetadataManager metadataManager) { + this.metadataManager = metadataManager; + return this; + } + + public CompactorBuilder setDBStore(DBStore dbStore) { + this.db = dbStore; + return this; + } + + public CompactorBuilder setTableName(String tableName) { + this.tableName = tableName; + return this; + } + + public CompactorBuilder setMaxCompactionEntries(long maxCompactionEntries) { + this.maxCompactionEntries = maxCompactionEntries; + return this; + } + + public CompactorBuilder setMinTombstones(int minTombstones) { + this.minTombstones = minTombstones; + return this; + } + + public CompactorBuilder setTombstoneRatio(double tombstoneRatio) { + this.tombstoneRatio = tombstoneRatio; + return this; + } + + public CompactorBuilder setRangesPerRun(int rangesPerRun) { + this.rangesPerRun = rangesPerRun; + return this; + } + + public Compactor build() throws IOException { + if (tableName == null) { + throw new IllegalArgumentException("Name and table must be set"); + } + + BucketLayout layout = OMDBDefinition.getBucketLayoutForTable(tableName); + if (layout == null) { + throw new IllegalArgumentException("No bucket layout mapping found for table: " + tableName); + } + + switch (layout) { + case FILE_SYSTEM_OPTIMIZED: + return new FSOTableCompactor(this); + case OBJECT_STORE: + return new OBSTableCompactor(this); + default: + throw new IllegalArgumentException("Unsupported bucket layout: " + layout); + } + } + + public String getTableName() { + return tableName; + } + + public long getMaxCompactionEntries() { + return maxCompactionEntries; + } + + public int getMinTombstones() { + return minTombstones; + } + + public double getTombstoneRatio() { + return tombstoneRatio; + } + + public DBStore getDBStore() { + return db; + } + + public OMMetadataManager getMetadataManager() { + return metadataManager; + } + + public BackgroundTaskQueue getCompactRangeQueue() { + return compactRangeQueue; + } + + public int getRangesPerRun() { + return rangesPerRun; + } +} diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/compaction/CompoundKeyRangeStats.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/compaction/CompoundKeyRangeStats.java new file mode 100644 index 000000000000..eb0eba4d075f --- /dev/null +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/compaction/CompoundKeyRangeStats.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.om.compaction; + +import java.util.List; +import org.apache.commons.lang3.tuple.Pair; +import org.apache.hadoop.hdds.utils.db.KeyRange; + +/** + * CompoundKeyRangeStats is a class that contains a list of KeyRangeStats and a + * compound KeyRangeStats. + */ +public class CompoundKeyRangeStats { + private KeyRangeStats compoundStats = null; + private List> keyRangeStatsList = null; + + public CompoundKeyRangeStats(List> keyRangeStatsList) { + if (keyRangeStatsList == null || keyRangeStatsList.isEmpty()) { + return; + } + this.keyRangeStatsList = keyRangeStatsList; + for (Pair entry : keyRangeStatsList) { + if (compoundStats == null) { + compoundStats = entry.getRight(); + } else { + compoundStats.add(entry.getRight()); + } + } + } + + public KeyRangeStats getCompoundStats() { + return compoundStats; + } + + public List> getKeyRangeStatsList() { + return keyRangeStatsList; + } + + public int size() { + return keyRangeStatsList.size(); + } + + public long getNumEntries() { + return compoundStats.getNumEntries(); + } + + public long getNumDeletion() { + return compoundStats.getNumDeletion(); + } + + public double getTombstoneRatio() { + return compoundStats.getTombstoneRatio(); + } + + public void add(CompoundKeyRangeStats other) { + compoundStats.add(other.getCompoundStats()); + keyRangeStatsList.addAll(other.getKeyRangeStatsList()); + } + + public boolean isEmpty() { + return keyRangeStatsList == null || keyRangeStatsList.isEmpty() || compoundStats == null; + } + + @Override + public String toString() { + return "CompoundKeyRangeStats{" + + "compoundStats=" + compoundStats + + ", keyRangeStatsList=" + keyRangeStatsList + + '}'; + } +} diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/compaction/FSOTableCompactor.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/compaction/FSOTableCompactor.java new file mode 100644 index 000000000000..e140cd4aa569 --- /dev/null +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/compaction/FSOTableCompactor.java @@ -0,0 +1,597 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.om.compaction; + +import com.google.common.base.Preconditions; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.NavigableMap; +import java.util.TreeMap; +import org.apache.commons.lang3.tuple.Pair; +import org.apache.hadoop.hdds.StringUtils; +import org.apache.hadoop.hdds.utils.db.KeyRange; +import org.apache.hadoop.hdds.utils.db.RDBStore; +import org.apache.hadoop.hdds.utils.db.Table; +import org.apache.hadoop.hdds.utils.db.TableIterator; +import org.apache.hadoop.hdds.utils.db.cache.CacheKey; +import org.apache.hadoop.hdds.utils.db.cache.CacheValue; +import org.apache.hadoop.ozone.om.helpers.OmBucketInfo; +import org.apache.hadoop.ozone.om.helpers.OmDirectoryInfo; +import org.rocksdb.LiveFileMetaData; +import org.rocksdb.TableProperties; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Compactor for FSO layout that compacts based on bucket and parent ID ranges. + * + * Unlike OBS compactor which uses bucket boundaries, FSO compactor uses a hierarchical + * approach based on parent-child relationships in the directory structure. + * + * Key features: + * 1. Parent ID-based splitting: Creates ranges based on directory hierarchy + * 2. Depth-aware compaction: Prioritizes shallow directories over deep ones + * 3. Directory-aware merging: Keeps related files/subdirectories together + * 4. Adaptive range sizing: Adjusts range size based on directory entry count + */ +public class FSOTableCompactor extends AbstractCompactor { + private static final Logger LOG = LoggerFactory.getLogger(FSOTableCompactor.class); + private static final String OM_KEY_PREFIX = "/"; + + // Pagination state for resuming compaction + private Long currentVolumeId; + private Long currentBucketId; + private Long currentParentId; + private String resumeKey; + + // Cache for directory hierarchy information + private final NavigableMap directoryCache = new TreeMap<>(); + + // Configuration for adaptive splitting + private static final int MAX_DIRECTORY_DEPTH = 10; + private static final double DEPTH_WEIGHT_FACTOR = 0.8; // Prioritize shallow directories + + public FSOTableCompactor(CompactorBuilder builder) { + super(builder); + } + + /** + * Represents cached metadata about a directory. + */ + private static class DirectoryMetadata { + private final long objectId; + private final long parentId; + private final int depth; + private final String path; + private long entryCount; + private long tombstoneCount; + + DirectoryMetadata(long objectId, long parentId, int depth, String path) { + this.objectId = objectId; + this.parentId = parentId; + this.depth = depth; + this.path = path; + this.entryCount = 0; + this.tombstoneCount = 0; + } + + public long getObjectId() { + return objectId; + } + + public long getParentId() { + return parentId; + } + + public int getDepth() { + return depth; + } + + public String getPath() { + return path; + } + + public long getEntryCount() { + return entryCount; + } + + public long getTombstoneCount() { + return tombstoneCount; + } + } + + @Override + public void run() { + try { + LOG.info("Starting FSO range compaction scan for table {}", getTableName()); + List ranges = getRangesNeedingCompaction(); + LOG.info("Found {} ranges needing compaction for table {}, submitting to compaction queue", + ranges.size(), getTableName()); + for (KeyRange range : ranges) { + LOG.info("Submitting range [{}] for compaction", range); + addRangeCompactionTask(range); + } + LOG.info("Completed FSO range compaction scan for table {}", getTableName()); + } catch (Exception e) { + LOG.error("Failed to run FSO range compaction for table {}", getTableName(), e); + } + } + + @Override + protected void collectRangesNeedingCompaction(List ranges) { + LOG.info("Starting to collect FSO ranges needing compaction for table {}", getTableName()); + + try { + // First, build directory hierarchy cache + buildDirectoryCache(); + + // Process ranges based on directory hierarchy + for (int i = 0; i < getRangesPerRun(); i++) { + Pair rangeAndStats = prepareNextRange(); + if (rangeAndStats == null) { + LOG.info("No more ranges to process for table {}", getTableName()); + break; + } + + if (needsCompaction(rangeAndStats.getRight(), getMinTombstones(), getTombstoneRatio())) { + LOG.info("Range [{}] needs compaction with stats: {}", + rangeAndStats.getLeft(), rangeAndStats.getRight()); + ranges.add(rangeAndStats.getLeft()); + } else { + LOG.info("Range [{}] does not need compaction with stats: {}", + rangeAndStats.getLeft(), rangeAndStats.getRight()); + } + } + } catch (IOException e) { + LOG.error("Failed to collect FSO ranges for compaction for table {}", getTableName(), e); + } + + LOG.info("Collected {} FSO ranges needing compaction for table {}", ranges.size(), getTableName()); + } + + /** + * Build a cache of directory metadata for efficient range calculation. + */ + private void buildDirectoryCache() throws IOException { + if (getTableName().equals(getMetadataManager().getDirectoryTable().getName())) { + LOG.info("Building directory cache for FSO compaction"); + + Table dirTable = getMetadataManager().getDirectoryTable(); + try (TableIterator> iter = + dirTable.iterator()) { + + int count = 0; + while (iter.hasNext() && count < 1000) { // Limit cache size + Table.KeyValue kv = iter.next(); + OmDirectoryInfo dirInfo = kv.getValue(); + + // Calculate depth by counting parent traversals + int depth = calculateDirectoryDepth(dirInfo); + DirectoryMetadata metadata = new DirectoryMetadata( + dirInfo.getObjectID(), + dirInfo.getParentObjectID(), + depth, + kv.getKey() + ); + + directoryCache.put(dirInfo.getObjectID(), metadata); + count++; + } + } + + LOG.info("Built directory cache with {} entries", directoryCache.size()); + } + } + + /** + * Calculate the depth of a directory in the hierarchy. + */ + private int calculateDirectoryDepth(OmDirectoryInfo dirInfo) { + int depth = 0; + long parentId = dirInfo.getParentObjectID(); + + // Traverse up the hierarchy until we reach bucket level + while (depth < MAX_DIRECTORY_DEPTH && directoryCache.containsKey(parentId)) { + DirectoryMetadata parent = directoryCache.get(parentId); + parentId = parent.parentId; + depth++; + } + + return depth; + } + + /** + * Prepare the next range for compaction based on FSO hierarchy. + */ + private Pair prepareNextRange() throws IOException { + LOG.info("Preparing next FSO range for compaction"); + + // Determine starting point + KeyRange currentRange; + if (resumeKey != null) { + // Resume from last split point + LOG.info("Resuming from last split point: {}", resumeKey); + String endKey = findNextParentBoundary(resumeKey); + currentRange = new KeyRange(resumeKey, endKey); + resumeKey = null; + } else { + // Find next parent ID range to process + currentRange = getNextParentIdRange(); + if (currentRange == null) { + return null; + } + } + + // Get stats and handle splitting if needed + return processRangeWithSplitting(currentRange); + } + + /** + * Get the next parent ID range to process. + */ + private KeyRange getNextParentIdRange() throws IOException { + LOG.info("Getting next parent ID range"); + + // If we have a current position, continue from there + if (currentVolumeId != null && currentBucketId != null && currentParentId != null) { + return getNextParentRange(currentVolumeId, currentBucketId, currentParentId); + } + + // Otherwise, start from the beginning + return getFirstParentRange(); + } + + /** + * Get the first parent range in the table. + */ + private KeyRange getFirstParentRange() throws IOException { + // For FSO tables, we need to iterate through volume/bucket combinations + Iterator, CacheValue>> bucketIter = + getMetadataManager().getBucketIterator(); + + if (!bucketIter.hasNext()) { + LOG.info("No buckets found for FSO compaction"); + return null; + } + + Map.Entry, CacheValue> bucketEntry = bucketIter.next(); + OmBucketInfo bucketInfo = bucketEntry.getValue().getCacheValue(); + + currentVolumeId = getMetadataManager().getVolumeId(bucketInfo.getVolumeName()); + currentBucketId = bucketInfo.getObjectID(); + currentParentId = bucketInfo.getObjectID(); // Start with bucket as parent + + String startKey = buildFSOKey(currentVolumeId, currentBucketId, currentParentId, ""); + String endKey = buildFSOKey(currentVolumeId, currentBucketId, currentParentId + 1, ""); + + LOG.info("First parent range: volumeId={}, bucketId={}, parentId={}, range=[{} - {}]", + currentVolumeId, currentBucketId, currentParentId, startKey, endKey); + + return new KeyRange(startKey, endKey); + } + + /** + * Get the next parent range after the current one. + */ + private KeyRange getNextParentRange(long volumeId, long bucketId, long parentId) throws IOException { + // For simplicity, increment parent ID + // In production, this would involve more sophisticated directory traversal + currentParentId = parentId + 1; + + // Check if we need to move to next bucket + if (currentParentId > parentId + 1000) { // Arbitrary limit for demo + return moveToNextBucket(); + } + + String startKey = buildFSOKey(volumeId, bucketId, currentParentId, ""); + String endKey = buildFSOKey(volumeId, bucketId, currentParentId + 1, ""); + + LOG.info("Next parent range: volumeId={}, bucketId={}, parentId={}, range=[{} - {}]", + volumeId, bucketId, currentParentId, startKey, endKey); + + return new KeyRange(startKey, endKey); + } + + /** + * Move to the next bucket for compaction. + */ + private KeyRange moveToNextBucket() throws IOException { + // Reset state and find next bucket + currentParentId = null; + + Iterator, CacheValue>> bucketIter = + getMetadataManager().getBucketIterator(buildBucketKey(currentVolumeId, currentBucketId)); + + if (!bucketIter.hasNext()) { + LOG.info("No more buckets to process"); + currentVolumeId = null; + currentBucketId = null; + return null; + } + + Map.Entry, CacheValue> bucketEntry = bucketIter.next(); + OmBucketInfo bucketInfo = bucketEntry.getValue().getCacheValue(); + + currentVolumeId = getMetadataManager().getVolumeId(bucketInfo.getVolumeName()); + currentBucketId = bucketInfo.getObjectID(); + currentParentId = bucketInfo.getObjectID(); + + String startKey = buildFSOKey(currentVolumeId, currentBucketId, currentParentId, ""); + String endKey = buildFSOKey(currentVolumeId, currentBucketId, currentParentId + 1, ""); + + return new KeyRange(startKey, endKey); + } + + /** + * Process a range with potential splitting if it's too large. + */ + private Pair processRangeWithSplitting(KeyRange range) throws IOException { + CompoundKeyRangeStats compoundStats = getCompoundKeyRangeStatsFromRange(range); + + if (compoundStats.isEmpty()) { + LOG.info("Range [{}] has no entries", range); + return Pair.of(range, new KeyRangeStats()); + } + + LOG.info("Range [{}] has {} entries", range, compoundStats.getNumEntries()); + + // Check if range needs splitting + if (compoundStats.getNumEntries() <= getMaxCompactionEntries()) { + return Pair.of(range, compoundStats.getCompoundStats()); + } + + // Split range using FSO-aware logic + LOG.info("Range [{}] exceeds max entries ({} > {}), applying FSO splitting", + range, compoundStats.getNumEntries(), getMaxCompactionEntries()); + + List> splitRanges = splitRangeByDirectoryBoundaries( + range, compoundStats, getMaxCompactionEntries()); + + if (splitRanges.isEmpty()) { + // Fallback to SST-based splitting + splitRanges = findFitRanges(compoundStats.getKeyRangeStatsList(), getMaxCompactionEntries()); + } + + if (splitRanges.isEmpty()) { + LOG.warn("Failed to split range [{}], returning original", range); + return Pair.of(range, compoundStats.getCompoundStats()); + } + + // Take the first split range and save resume point + Pair firstRange = squashRanges(splitRanges); + String nextBoundary = findNextParentBoundary(firstRange.getLeft().getEndKey()); + + if (range.getEndKey().compareTo(nextBoundary) > 0) { + resumeKey = nextBoundary; + LOG.info("Split range [{}] into [{}], will resume from {}", + range, firstRange.getLeft(), resumeKey); + } + + return firstRange; + } + + /** + * Split a range based on directory boundaries for better FSO alignment. + */ + private List> splitRangeByDirectoryBoundaries( + KeyRange range, CompoundKeyRangeStats stats, long maxEntries) { + + LOG.info("Attempting directory-aware splitting for range [{}]", range); + List> result = new ArrayList<>(); + + // Group SST files by directory boundaries + Map>> directoryGroups = new HashMap<>(); + + for (Pair sstRange : stats.getKeyRangeStatsList()) { + String dirBoundary = extractDirectoryBoundary(sstRange.getLeft().getStartKey()); + directoryGroups.computeIfAbsent(dirBoundary, k -> new ArrayList<>()).add(sstRange); + } + + // Build ranges respecting directory boundaries + long currentEntries = 0; + List> currentGroup = new ArrayList<>(); + + for (Map.Entry>> entry : directoryGroups.entrySet()) { + List> dirSsts = entry.getValue(); + long dirEntries = dirSsts.stream() + .mapToLong(p -> p.getRight().getNumEntries()) + .sum(); + + if (currentEntries > 0 && currentEntries + dirEntries > maxEntries) { + // Current group is full, save it + if (!currentGroup.isEmpty()) { + result.add(squashRanges(currentGroup)); + } + currentGroup = new ArrayList<>(); + currentEntries = 0; + } + + currentGroup.addAll(dirSsts); + currentEntries += dirEntries; + } + + // Add remaining group + if (!currentGroup.isEmpty() && currentEntries <= maxEntries) { + result.add(squashRanges(currentGroup)); + } + + LOG.info("Directory-aware splitting produced {} ranges", result.size()); + return result; + } + + /** + * Extract directory boundary from an FSO key. + */ + private String extractDirectoryBoundary(String key) { + // FSO key format: /volumeId/bucketId/parentId/name + String[] parts = key.split(OM_KEY_PREFIX); + if (parts.length >= 4) { + // Return up to parent ID to group by directory + return OM_KEY_PREFIX + parts[1] + OM_KEY_PREFIX + parts[2] + OM_KEY_PREFIX + parts[3]; + } + return key; + } + + /** + * Find the next parent boundary after the given key. + */ + private String findNextParentBoundary(String key) { + // Extract parent ID and increment + String[] parts = key.split(OM_KEY_PREFIX); + if (parts.length >= 4) { + try { + long parentId = Long.parseLong(parts[3]); + return OM_KEY_PREFIX + parts[1] + OM_KEY_PREFIX + parts[2] + + OM_KEY_PREFIX + (parentId + 1) + OM_KEY_PREFIX; + } catch (NumberFormatException e) { + LOG.warn("Failed to parse parent ID from key: {}", key); + } + } + return StringUtils.getKeyPrefixUpperBound(key); + } + + /** + * Build an FSO key from components. + */ + private String buildFSOKey(long volumeId, long bucketId, long parentId, String name) { + return OM_KEY_PREFIX + volumeId + OM_KEY_PREFIX + bucketId + + OM_KEY_PREFIX + parentId + OM_KEY_PREFIX + name; + } + + /** + * Build a bucket key for iteration. + */ + private String buildBucketKey(long volumeId, long bucketId) { + return OM_KEY_PREFIX + volumeId + OM_KEY_PREFIX + bucketId; + } + + /** + * Find ranges that fit within the max entries limit. + */ + private List> findFitRanges( + List> ranges, long maxEntries) { + + LOG.info("Finding ranges that fit within {} entries", maxEntries); + ranges.sort((a, b) -> a.getLeft().getStartKey().compareTo(b.getLeft().getStartKey())); + + List> out = new ArrayList<>(); + KeyRangeStats chunkStats = new KeyRangeStats(); + + for (Pair range : ranges) { + long n = range.getRight().getNumEntries(); + + if (n > maxEntries) { + LOG.warn("Range [{}] exceeds max entries ({} > {}), skipping", + range.getLeft(), n, maxEntries); + continue; + } + + if (chunkStats.getNumEntries() > 0 && chunkStats.getNumEntries() + n > maxEntries) { + LOG.info("Current chunk + range would exceed max entries, stopping"); + return out; + } + + out.add(range); + chunkStats.add(range.getRight()); + } + + return out; + } + + /** + * Squash multiple ranges into a single range. + */ + private Pair squashRanges(List> list) { + Preconditions.checkNotNull(list, "list is null"); + Preconditions.checkArgument(!list.isEmpty(), "list is empty"); + + String minKey = list.get(0).getLeft().getStartKey(); + String maxKey = list.get(0).getLeft().getEndKey(); + KeyRangeStats stats = new KeyRangeStats(); + + for (Pair range : list) { + minKey = StringUtils.min(minKey, range.getLeft().getStartKey()); + maxKey = StringUtils.max(maxKey, range.getLeft().getEndKey()); + stats.add(range.getRight()); + } + + return Pair.of(new KeyRange(minKey, maxKey), stats); + } + + /** + * Get compound statistics for a key range. + */ + private CompoundKeyRangeStats getCompoundKeyRangeStatsFromRange(KeyRange range) throws IOException { + LOG.info("Getting compound stats for FSO range [{}]", range); + + List> keyRangeStatsList = new ArrayList<>(); + List liveFileMetaDataList = ((RDBStore)getDBStore()).getDb().getLiveFilesMetaData(); + Map fileMap = new HashMap<>(); + + for (LiveFileMetaData metaData : liveFileMetaDataList) { + fileMap.put(metaData.path() + metaData.fileName(), metaData); + } + + Map propsMap = getDBStore().getPropertiesOfTableInRange( + getTableName(), Collections.singletonList(range)); + + for (Map.Entry entry : propsMap.entrySet()) { + String filePath = entry.getKey(); + LiveFileMetaData meta = fileMap.get(filePath); + if (meta == null) { + LOG.warn("LiveFileMetaData not found for file {}", filePath); + continue; + } + + KeyRange keyRange = new KeyRange(meta.smallestKey(), meta.largestKey()); + KeyRangeStats stats = KeyRangeStats.fromTableProperties(entry.getValue()); + keyRangeStatsList.add(Pair.of(keyRange, stats)); + + // Update directory cache if available + updateDirectoryCacheWithStats(keyRange, stats); + } + + return new CompoundKeyRangeStats(keyRangeStatsList); + } + + /** + * Update directory cache with statistics from SST files. + */ + private void updateDirectoryCacheWithStats(KeyRange range, KeyRangeStats stats) { + String startKey = range.getStartKey(); + String[] parts = startKey.split(OM_KEY_PREFIX); + + if (parts.length >= 4) { + try { + long parentId = Long.parseLong(parts[3]); + DirectoryMetadata dirMeta = directoryCache.get(parentId); + if (dirMeta != null) { + dirMeta.entryCount += stats.getNumEntries(); + dirMeta.tombstoneCount += stats.getNumDeletion(); + } + } catch (NumberFormatException e) { + // Ignore parsing errors + } + } + } +} diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/compaction/KeyRangeStats.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/compaction/KeyRangeStats.java new file mode 100644 index 000000000000..8da09ccff4a1 --- /dev/null +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/compaction/KeyRangeStats.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.om.compaction; + +import org.rocksdb.TableProperties; + +/** + * Statistics for a key range. + */ +public class KeyRangeStats { + private long numEntries; + private long numDeletion; + + public KeyRangeStats() { + this.numEntries = 0; + this.numDeletion = 0; + } + + public KeyRangeStats(long numEntries, long numDeletion) { + this.numEntries = numEntries; + this.numDeletion = numDeletion; + } + + public static KeyRangeStats fromTableProperties(TableProperties properties) { + return new KeyRangeStats( + properties.getNumEntries(), + properties.getNumDeletions()); + } + + public void add(KeyRangeStats other) { + this.numEntries += other.numEntries; + this.numDeletion += other.numDeletion; + } + + public long getNumEntries() { + return numEntries; + } + + public long getNumDeletion() { + return numDeletion; + } + + public double getTombstoneRatio() { + if (numEntries == 0) { + return 0.0; + } + return ((double) numDeletion / numEntries); + } + + @Override + public String toString() { + return "KeyRangeStats{" + + "numEntries=" + numEntries + + ", numDeletion=" + numDeletion + + ", tombstoneRatio=" + getTombstoneRatio() + + '}'; + } +} diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/compaction/OBSTableCompactor.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/compaction/OBSTableCompactor.java new file mode 100644 index 000000000000..a6a4d587763e --- /dev/null +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/compaction/OBSTableCompactor.java @@ -0,0 +1,313 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.om.compaction; + +import com.google.common.base.Preconditions; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import org.apache.commons.lang3.tuple.Pair; +import org.apache.hadoop.hdds.StringUtils; +import org.apache.hadoop.hdds.utils.db.KeyRange; +import org.apache.hadoop.hdds.utils.db.RDBStore; +import org.apache.hadoop.hdds.utils.db.cache.CacheKey; +import org.apache.hadoop.hdds.utils.db.cache.CacheValue; +import org.apache.hadoop.ozone.om.helpers.OmBucketInfo; +import org.rocksdb.LiveFileMetaData; +import org.rocksdb.TableProperties; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Compactor for OBS and legacy layout that compacts based on bucket ranges. + */ +public class OBSTableCompactor extends AbstractCompactor { + private static final Logger LOG = LoggerFactory.getLogger(OBSTableCompactor.class); + + private String currentBucketUpperBound; + private String nextKey; + + public OBSTableCompactor(CompactorBuilder builder) { + super(builder); + } + + @Override + public void run() { + try { + LOG.info("Starting range compaction scan for table {}", getTableName()); + List ranges = getRangesNeedingCompaction(); + LOG.info("Found {} ranges needing compaction for table {}, submitting to compaction queue", + ranges.size(), getTableName()); + for (KeyRange range : ranges) { + LOG.info("Submitting range [{}] for compaction", range); + addRangeCompactionTask(range); + } + LOG.info("Completed range compaction scan for table {}", getTableName()); + } catch (Exception e) { + LOG.error("Failed to run range compaction for table {}", getTableName(), e); + } + } + + @Override + protected void collectRangesNeedingCompaction(List ranges) { + LOG.info("Starting to collect ranges needing compaction for table {}", getTableName()); + for (int i = 0; i < getRangesPerRun(); i++) { + try { + LOG.info("Preparing range {} of {} for table {}", i + 1, getRangesPerRun(), getTableName()); + Pair preparedRange = prepareRanges(); + if (preparedRange == null) { + LOG.info("No more ranges to prepare for table {}", getTableName()); + break; + } + // TODO: merge consecutive ranges if they aren't too big, notice the situation that iterator has been reset + // to be more specific, + // constain 1: temp start key and end key are either null or not null at the same time, called them temp range + // constain 2: if temp range is submitted, clean temp range + // if range is null: + // 1. if temp range is not null, then submit and reset the temp range + // otherwise, do nothing + // if range is a spiltted range: + // 1. if temp range is null, submit the range directly + // 2. if temp range is not null and temp accumlated numEntries plus the current range's numEntries + // is greater than maxEntries, then submit **temp range and the current range** + // 3. if temp range is not null, and temp accumlated numEntries plus the current range's numEntries + // is less than maxEntries, then submit **a range that composed of temp range and the current range** + // (notice the difference between point 2, 3) + // if range is not a splitted range: + // 1. if temp range is null, update it to the current range's start key, and temp end key as well + // 2. if temp range is not null and temp accumlated numEntries plus the current range's numEntries + // is greater than maxEntries, then submit temp range and set temp range to the current range + // 3. if temp range is not null and temp accumlated numEntries plus the current range's numEntries + // is less than maxEntries, + // then **set temp range to a range that composed of temp range and the current range** + // (notice the difference between point 2, 3) + if (needsCompaction(preparedRange.getRight(), getMinTombstones(), getTombstoneRatio())) { + LOG.info("Range [{}] needs compaction with stats: {}", preparedRange.getLeft(), preparedRange.getRight()); + ranges.add(preparedRange.getLeft()); + } else { + LOG.info("Range [{}] does not need compaction with stats: {}", preparedRange.getLeft(), + preparedRange.getRight()); + } + } catch (IOException e) { + LOG.error("Failed to prepare ranges for compaction for table {}", getTableName(), e); + } + } + LOG.info("Collected {} ranges needing compaction for table {}", ranges.size(), getTableName()); + } + + /** + * Get the next bucket bound. + * + * @return the next bucket bound, or null if reach the end of the iterator + */ + private KeyRange getNextBucketBound() { + LOG.info("Getting next bucket bound starting from {}", currentBucketUpperBound); + Iterator, CacheValue>> iterator = getMetadataManager() + .getBucketIterator(currentBucketUpperBound); + if (!iterator.hasNext()) { + LOG.info("No more bucket bounds found"); + currentBucketUpperBound = null; + return null; + } + + String bucketStartKey = iterator.next().getKey().getCacheKey(), bucketEndKey; + if (iterator.hasNext()) { + bucketEndKey = iterator.next().getKey().getCacheKey(); + currentBucketUpperBound = bucketEndKey; + LOG.info("Found bucket range: [{} - {}]", bucketStartKey, bucketEndKey); + } else { + bucketEndKey = StringUtils.getKeyPrefixUpperBound(bucketStartKey); + currentBucketUpperBound = bucketEndKey; + LOG.info("Found last bucket range: [{} - {}]", bucketStartKey, bucketEndKey); + } + return new KeyRange(bucketStartKey, bucketEndKey); + } + + /** + * Prepare ranges for compaction. + * + * @return the prepared ranges, the range and its stats in pair may be **combined** by multiple ranges + * @throws IOException if an error occurs + */ + private Pair prepareRanges() throws IOException { + LOG.info("Preparing ranges for compaction"); + + // Get the initial range to process + KeyRange currentRange; + if (nextKey != null) { + // Continue from last split point + LOG.info("Continuing from last split point: {}", nextKey); + currentRange = new KeyRange(nextKey, currentBucketUpperBound); + // clean up the nextKey + nextKey = null; + } else { + // Start from next bucket boundary + KeyRange bucketBound = getNextBucketBound(); + if (bucketBound == null) { + LOG.info("No more bucket bounds to process"); + return null; + } + currentRange = new KeyRange(bucketBound.getStartKey(), bucketBound.getEndKey()); + LOG.info("Starting new bucket range: [{}]", currentRange); + } + + // Get compound stats for the range + CompoundKeyRangeStats compoundStats = getCompoundKeyRangeStatsFromRange(currentRange); + if (compoundStats.isEmpty()) { + LOG.info("Range [{}] has no entries, returning empty range", currentRange); + return Pair.of(currentRange, new KeyRangeStats()); + } + LOG.info("Range [{}] has {} entries", currentRange, compoundStats.getNumEntries()); + + if (compoundStats.getNumEntries() <= getMaxCompactionEntries()) { + LOG.info("Range [{}] fits within max entries limit of {}", currentRange, getMaxCompactionEntries()); + return Pair.of(currentRange, compoundStats.getCompoundStats()); + } else { + LOG.info("Range [{}] exceeds max entries limit ({} > {}), splitting", + currentRange, compoundStats.getNumEntries(), getMaxCompactionEntries()); + List> splittedRanges = findFitRanges( + compoundStats.getKeyRangeStatsList(), getMaxCompactionEntries()); + if (splittedRanges == null || splittedRanges.isEmpty()) { + LOG.info("Failed to split range [{}] into valid sub-ranges, returning original range", currentRange); + return Pair.of(currentRange, compoundStats.getCompoundStats()); + } + Pair squashedRange = squashRanges(splittedRanges); + String squashedRangeEndKeyUpperBound = StringUtils.getKeyPrefixUpperBound(squashedRange.getLeft().getEndKey()); + if (currentRange.getEndKey().compareTo(squashedRangeEndKeyUpperBound) > 0) { + // if the squashed range is not the last range, then we need to split the range + nextKey = squashedRangeEndKeyUpperBound; + LOG.info("Split range [{}] into [{}] and will continue from {}", + currentRange, squashedRange.getLeft(), nextKey); + } else { + LOG.info("Split range [{}] into [{}] (final range)", currentRange, squashedRange.getLeft()); + } + return squashedRange; + } + } + + /** + * return a list of ranges that each range's numEntries ≤ maxEntries, + * and their sum of numEntries are not greater than maxEntries, too. + */ + private List> findFitRanges( + List> ranges, long maxEntries) { + LOG.info("Finding ranges that fit within {} entries", maxEntries); + ranges.sort((a, b) -> a.getLeft().getStartKey().compareTo(b.getLeft().getStartKey())); + + List> out = new ArrayList<>(); + KeyRangeStats chunkStats = new KeyRangeStats(); + + for (Pair range : ranges) { + long n = range.getRight().getNumEntries(); + LOG.info("Processing range [{}] with {} entries", range.getLeft(), n); + + // this single SST already exceeds the threshold + if (n > maxEntries) { + LOG.warn("Range [{}] exceeds max entries ({} > {}), skipping", range.getLeft(), n, maxEntries); + continue; + } + + // adding this SST would overflow the threshold + if (chunkStats.getNumEntries() > 0 && chunkStats.getNumEntries() + n > maxEntries) { + LOG.info("Current chunk ({} entries) + range [{}] ({} entries) exceeds max entries ({}), stopping", + chunkStats.getNumEntries(), range.getLeft(), n, maxEntries); + return out; + } + + /* add current SST into (possibly new) chunk */ + out.add(range); + chunkStats.add(range.getRight()); + LOG.info("Added range [{}] to chunk, total entries: {}", range.getLeft(), chunkStats.getNumEntries()); + } + + LOG.info("Found {} ranges that fit within {} entries", out.size(), maxEntries); + return out; + } + + /** + * Build a KeyRange (start inclusive, end exclusive) that encloses the + * SSTs between firstIdx and lastIdx (inclusive). Uses + * getKeyPrefixUpperBound() to make the end key exclusive. + */ + private Pair squashRanges( + List> list) { + LOG.info("Squashing {} ranges", list.size()); + Preconditions.checkNotNull(list, "list is null"); + Preconditions.checkArgument(!list.isEmpty(), "list is empty"); + + String minKey = list.get(0).getLeft().getStartKey(), maxKey = list.get(0).getLeft().getEndKey(); + KeyRangeStats stats = new KeyRangeStats(); + for (Pair range : list) { + minKey = StringUtils.min(minKey, range.getLeft().getStartKey()); + maxKey = StringUtils.max(maxKey, range.getLeft().getEndKey()); + stats.add(range.getRight()); + LOG.info("Squashing range [{}] with {} entries", range.getLeft(), range.getRight().getNumEntries()); + } + KeyRange squashedRange = new KeyRange(minKey, maxKey); + LOG.info("Squashed {} ranges into [{}] with {} total entries", + list.size(), squashedRange, stats.getNumEntries()); + return Pair.of(squashedRange, stats); + } + + private CompoundKeyRangeStats getCompoundKeyRangeStatsFromRange(KeyRange range) throws IOException { + LOG.info("Getting compound stats for range [{}]", range); + List> keyRangeStatsList = new ArrayList<>(); + List liveFileMetaDataList = ((RDBStore)getDBStore()).getDb().getLiveFilesMetaData(); + Map fileMap = new HashMap<>(); + // https://github.com/facebook/rocksdb/blob/09cd25f76305f2110131f51068656ab392dc2bf5/db/version_set.cc#L1744-L1768 + // https://github.com/facebook/rocksdb/blob/a00391c72996a5dbdd93a621dbc53719c13b05c4/file/filename.cc#L140-L150 + LOG.info("Processing {} live files", liveFileMetaDataList.size()); + for (LiveFileMetaData metaData : liveFileMetaDataList) { + fileMap.put(metaData.path() + metaData.fileName(), metaData); + } + + LOG.info("Getting table properties for range [{}]", range); + Map propsMap = getDBStore().getPropertiesOfTableInRange(getTableName(), + Collections.singletonList(range)); + LOG.info("Found {} table properties in range", propsMap.size()); + + for (Map.Entry entry : propsMap.entrySet()) { + String filePath = entry.getKey(); + LiveFileMetaData meta = fileMap.get(filePath); + if (meta == null) { + LOG.warn("LiveFileMetaData not found for file {}", filePath); + continue; + } + KeyRange keyRange = new KeyRange(meta.smallestKey(), meta.largestKey()); + KeyRangeStats stats = KeyRangeStats.fromTableProperties(entry.getValue()); + keyRangeStatsList.add(Pair.of(keyRange, stats)); + LOG.info("Added file {} with range [{}] and {} entries", + filePath, keyRange, stats.getNumEntries()); + } + + LOG.info("Key range stats list: {}", keyRangeStatsList); + + CompoundKeyRangeStats result = new CompoundKeyRangeStats(keyRangeStatsList); + if (!result.isEmpty()) { + LOG.info("Compound stats for range [{}]: {} entries across {} files", + range, result.getNumEntries(), keyRangeStatsList.size()); + } else { + LOG.info("Range [{}] has no entries", range); + } + return result; + } +} diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/compaction/package-info.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/compaction/package-info.java new file mode 100644 index 000000000000..51dc70428b12 --- /dev/null +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/compaction/package-info.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Utility classes to encode/decode DTO objects to/from byte array. + * OM DB definitions. + */ +package org.apache.hadoop.ozone.om.compaction; diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/RangeCompactionService.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/RangeCompactionService.java new file mode 100644 index 000000000000..d4b4bc1e4757 --- /dev/null +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/RangeCompactionService.java @@ -0,0 +1,151 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.om.service; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdds.utils.BackgroundService; +import org.apache.hadoop.hdds.utils.BackgroundTaskQueue; +import org.apache.hadoop.ozone.om.OMConfigKeys; +import org.apache.hadoop.ozone.om.OMMetadataManager; +import org.apache.hadoop.ozone.om.OzoneManager; +import org.apache.hadoop.ozone.om.compaction.Compactor; +import org.apache.hadoop.ozone.om.compaction.CompactorBuilder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Manages all compactors and schedules their execution. + */ +public class RangeCompactionService extends BackgroundService { + private static final Logger LOG = LoggerFactory.getLogger(RangeCompactionService.class); + private static final int THREAD_POOL_SIZE = 1; + + private final OMMetadataManager metadataManager; + private final List compactors; + private final Map compactorExecutors; + private final long checkIntervalMs; + private final long maxCompactionEntries; + private final int rangesPerRun; + private final int minTombstones; + private final double tombstoneRatio; + + private final BackgroundTaskQueue tasks; + + public RangeCompactionService(OzoneConfiguration conf, OzoneManager ozoneManager, long intervalMs, + long timeoutMs) { + super("RangeCompactionService", intervalMs, TimeUnit.MILLISECONDS, THREAD_POOL_SIZE, timeoutMs, + ozoneManager.getThreadNamePrefix()); + this.metadataManager = ozoneManager.getMetadataManager(); + this.checkIntervalMs = intervalMs; + this.maxCompactionEntries = conf.getLong(OMConfigKeys.OZONE_OM_RANGE_COMPACTION_SERVICE_MAX_COMPACTION_ENTRIES, + OMConfigKeys.OZONE_OM_RANGE_COMPACTION_SERVICE_MAX_COMPACTION_ENTRIES_DEFAULT); + this.rangesPerRun = conf.getInt(OMConfigKeys.OZONE_OM_RANGE_COMPACTION_SERVICE_RANGES_PER_RUN, + OMConfigKeys.OZONE_OM_RANGE_COMPACTION_SERVICE_RANGES_PER_RUN_DEFAULT); + this.minTombstones = conf.getInt(OMConfigKeys.OZONE_OM_RANGE_COMPACTION_SERVICE_MIN_TOMBSTONES, + OMConfigKeys.OZONE_OM_RANGE_COMPACTION_SERVICE_MIN_TOMBSTONES_DEFAULT); + this.tombstoneRatio = conf.getDouble(OMConfigKeys.OZONE_OM_RANGE_COMPACTION_SERVICE_TOMBSTONE_RATIO, + OMConfigKeys.OZONE_OM_RANGE_COMPACTION_SERVICE_TOMBSTONE_RATIO_DEFAULT); + this.compactors = new ArrayList<>(); + this.compactorExecutors = new ConcurrentHashMap<>(); + + this.tasks = new BackgroundTaskQueue(); + initializeCompactors(); + } + + @Override + public BackgroundTaskQueue getTasks() { + return tasks; + } + + @Override + public synchronized void start() { + super.start(); + scheduleCompactionChecks(); + } + + public void stop() { + for (Map.Entry entry : compactorExecutors.entrySet()) { + ScheduledExecutorService executor = entry.getValue(); + executor.shutdown(); + try { + if (!executor.awaitTermination(60, TimeUnit.SECONDS)) { + executor.shutdownNow(); + } + } catch (InterruptedException e) { + executor.shutdownNow(); + Thread.currentThread().interrupt(); + } + } + compactorExecutors.clear(); + } + + private void initializeCompactors() { + // Initialize compactors using the builder pattern + CompactorBuilder builder = new CompactorBuilder() + .setCompactRangeQueue(tasks) + .setMetadataManager(metadataManager) + .setDBStore(metadataManager.getStore()) + .setMaxCompactionEntries(maxCompactionEntries) + .setMinTombstones(minTombstones) + .setTombstoneRatio(tombstoneRatio) + .setRangesPerRun(rangesPerRun); + + // Initialize compactors for OBS and legacy layout + try { + compactors.add(builder.setTableName("keyTable") + .build()); + compactors.add(builder.setTableName("deletedTable") + .build()); + + // Initialize compactors for FSO layout + compactors.add(builder.setTableName("directoryTable") + .build()); + compactors.add(builder.setTableName("fileTable") + .build()); + compactors.add(builder.setTableName("deletedDirectoryTable") + .build()); + } catch (IOException e) { + LOG.error("Failed to initialize compactors", e); + throw new RuntimeException("Failed to initialize compactors", e); + } + } + + private void scheduleCompactionChecks() { + for (Compactor compactor : compactors) { + String tableName = compactor.getTableName(); + ScheduledExecutorService executor = Executors.newScheduledThreadPool(1); + compactorExecutors.put(tableName, executor); + + LOG.info("Scheduling range compaction compactor for table {}", tableName); + executor.scheduleWithFixedDelay( + compactor::run, + checkIntervalMs, // TODO: randomize the start time + checkIntervalMs, + TimeUnit.MILLISECONDS); + } + } +} + diff --git a/pom.xml b/pom.xml index fd261484dff6..2a13ec4c96a2 100644 --- a/pom.xml +++ b/pom.xml @@ -200,7 +200,7 @@ 0.10.2 1.2.26 2.6.1 - 7.7.3 + 7.7.3.2 3.1.0 bash 2.0.17 @@ -1937,6 +1937,7 @@ org.rocksdb.InfoLogLevel org.rocksdb.OptionsUtil org.rocksdb.RocksDBException + org.rocksdb.Range org.rocksdb.StatsLevel org.rocksdb.TableProperties org.rocksdb.TransactionLogIterator.BatchResult @@ -2476,6 +2477,10 @@ apache.snapshots.https https://repository.apache.org/content/repositories/snapshots + + github + https://maven.pkg.github.com/peterxcli/rocksdb +