Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -123,4 +123,11 @@ public static String getLexicographicallyHigherString(String val) {
charVal[lastIdx] += 1;
return String.valueOf(charVal);
}

public static String getFirstNChars(String str, int n) {
if (str == null || str.length() < n) {
return str;
}
return str.substring(0, n);
}
Comment on lines +127 to +132
Copy link

Copilot AI Nov 10, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The method returns the original string when str.length() < n, but should return the full string only when it's shorter. When str is null, returning null may cause NullPointerExceptions in callers. Consider returning an empty string for null input or documenting this behavior clearly.

Copilot uses AI. Check for mistakes.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is fine.

}
Original file line number Diff line number Diff line change
Expand Up @@ -854,18 +854,14 @@ private int getLastLevel() throws RocksDatabaseException {
/**
* Deletes sst files which do not correspond to prefix
* for given table.
* @param prefixPairs a map of TableName to prefixUsed.
* @param prefixInfo a map of TableName to prefixUsed.
*/
public void deleteFilesNotMatchingPrefix(Map<String, String> prefixPairs) throws RocksDatabaseException {
public void deleteFilesNotMatchingPrefix(TablePrefixInfo prefixInfo) throws RocksDatabaseException {
try (UncheckedAutoCloseable ignored = acquire()) {
for (LiveFileMetaData liveFileMetaData : getSstFileList()) {
String sstFileColumnFamily = StringUtils.bytes2String(liveFileMetaData.columnFamilyName());
int lastLevel = getLastLevel();

if (!prefixPairs.containsKey(sstFileColumnFamily)) {
continue;
}

// RocksDB #deleteFile API allows only to delete the last level of
// SST Files. Any level < last level won't get deleted and
// only last file of level 0 can be deleted
Expand All @@ -876,7 +872,7 @@ public void deleteFilesNotMatchingPrefix(Map<String, String> prefixPairs) throws
continue;
}

String prefixForColumnFamily = prefixPairs.get(sstFileColumnFamily);
String prefixForColumnFamily = prefixInfo.getTablePrefix(sstFileColumnFamily);
String firstDbKey = StringUtils.bytes2String(liveFileMetaData.smallestKey());
String lastDbKey = StringUtils.bytes2String(liveFileMetaData.largestKey());
boolean isKeyWithPrefixPresent = RocksDiffUtils.isKeyWithPrefixPresent(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.hdds.utils.db;

import java.util.Collections;
import java.util.Map;
import java.util.Set;

/**
* Encapsulates a store's prefix info corresponding to tables in a db.
*/
public class TablePrefixInfo {
private final Map<String, String> tablePrefixes;

public TablePrefixInfo(Map<String, String> tablePrefixes) {
this.tablePrefixes = Collections.unmodifiableMap(tablePrefixes);
}

public String getTablePrefix(String tableName) {
return tablePrefixes.getOrDefault(tableName, "");
}

public int size() {
return tablePrefixes.size();
}

public Set<String> getTableNames() {
return tablePrefixes.keySet();
}

@Override
public String toString() {
return "TablePrefixInfo{" +
"tablePrefixes=" + tablePrefixes +
'}';
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

/**
* Util package for rocksdb.
*/
package org.apache.hadoop.hdds.utils.db;
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

package org.apache.ozone.rocksdb.util;

import com.google.common.collect.Sets;
import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
Expand All @@ -43,21 +42,20 @@ public final class RdbUtil {
private RdbUtil() { }

public static List<LiveFileMetaData> getLiveSSTFilesForCFs(
final ManagedRocksDB rocksDB, List<String> cfs) {
final Set<String> cfSet = Sets.newHashSet(cfs);
final ManagedRocksDB rocksDB, Set<String> cfs) {
return rocksDB.get().getLiveFilesMetaData().stream()
.filter(lfm -> cfSet.contains(StringUtils.bytes2String(lfm.columnFamilyName())))
.filter(lfm -> cfs.contains(StringUtils.bytes2String(lfm.columnFamilyName())))
.collect(Collectors.toList());
}

public static Set<String> getSSTFilesForComparison(
final ManagedRocksDB rocksDB, List<String> cfs) {
final ManagedRocksDB rocksDB, Set<String> cfs) {
return getLiveSSTFilesForCFs(rocksDB, cfs).stream()
.map(lfm -> new File(lfm.path(), lfm.fileName()).getPath())
.collect(Collectors.toCollection(HashSet::new));
}

public static Map<Object, String> getSSTFilesWithInodesForComparison(final ManagedRocksDB rocksDB, List<String> cfs)
public static Map<Object, String> getSSTFilesWithInodesForComparison(final ManagedRocksDB rocksDB, Set<String> cfs)
throws IOException {
List<LiveFileMetaData> liveSSTFilesForCFs = getLiveSSTFilesForCFs(rocksDB, cfs);
Map<Object, String> inodeToSstMap = new HashMap<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@

package org.apache.ozone.rocksdiff;

import java.util.Map;
import java.util.UUID;
import org.apache.hadoop.hdds.utils.db.TablePrefixInfo;
import org.apache.hadoop.hdds.utils.db.managed.ManagedRocksDB;

/**
Expand All @@ -29,17 +29,17 @@ public class DifferSnapshotInfo {
private final UUID snapshotId;
private final long snapshotGeneration;

private final Map<String, String> tablePrefixes;
private final TablePrefixInfo tablePrefixes;

private final ManagedRocksDB rocksDB;

public DifferSnapshotInfo(String db, UUID id, long gen,
Map<String, String> prefixes,
TablePrefixInfo tablePrefixInfo,
ManagedRocksDB rocksDB) {
dbPath = db;
snapshotId = id;
snapshotGeneration = gen;
tablePrefixes = prefixes;
tablePrefixes = tablePrefixInfo;
this.rocksDB = rocksDB;
}

Expand All @@ -55,7 +55,7 @@ public long getSnapshotGeneration() {
return snapshotGeneration;
}

public Map<String, String> getTablePrefixes() {
public TablePrefixInfo getTablePrefixes() {
return tablePrefixes;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Sets;
import com.google.common.graph.MutableGraph;
import com.google.protobuf.InvalidProtocolBufferException;
import java.io.BufferedWriter;
Expand Down Expand Up @@ -620,12 +621,13 @@ private String trimSSTFilename(String filename) {
* Read the current Live manifest for a given RocksDB instance (Active or
* Checkpoint).
* @param rocksDB open rocksDB instance.
* @param tableFilter set of column-family/table names to include when collecting live SSTs.
* @return a list of SST files (without extension) in the DB.
*/
public Set<String> readRocksDBLiveFiles(ManagedRocksDB rocksDB) {
public Set<String> readRocksDBLiveFiles(ManagedRocksDB rocksDB, Set<String> tableFilter) {
HashSet<String> liveFiles = new HashSet<>();

final List<String> cfs = Arrays.asList(
final Set<String> cfs = Sets.newHashSet(
org.apache.hadoop.hdds.StringUtils.bytes2String(
RocksDB.DEFAULT_COLUMN_FAMILY), "keyTable", "directoryTable",
"fileTable");
Expand All @@ -635,6 +637,9 @@ public Set<String> readRocksDBLiveFiles(ManagedRocksDB rocksDB) {
RdbUtil.getLiveSSTFilesForCFs(rocksDB, cfs);
LOG.debug("SST File Metadata for DB: " + rocksDB.get().getName());
for (LiveFileMetaData m : liveFileMetaDataList) {
if (!tableFilter.contains(StringUtils.bytes2String(m.columnFamilyName()))) {
continue;
}
LOG.debug("File: {}, Level: {}", m.fileName(), m.level());
final String trimmedFilename = trimSSTFilename(m.fileName());
liveFiles.add(trimmedFilename);
Expand Down Expand Up @@ -818,17 +823,18 @@ private String getSSTFullPath(String sstFilenameWithoutExtension,
*
* @param src source snapshot
* @param dest destination snapshot
* @param tablesToLookup tablesToLookup set of table (column family) names used to restrict which SST files to return.
* @param sstFilesDirForSnapDiffJob dir to create hardlinks for SST files
* for snapDiff job.
* @return A list of SST files without extension.
* e.g. ["/path/to/sstBackupDir/000050.sst",
* "/path/to/sstBackupDir/000060.sst"]
*/
public synchronized Optional<List<String>> getSSTDiffListWithFullPath(DifferSnapshotInfo src,
DifferSnapshotInfo dest,
DifferSnapshotInfo dest, Set<String> tablesToLookup,
String sstFilesDirForSnapDiffJob) {

Optional<List<String>> sstDiffList = getSSTDiffList(src, dest);
Optional<List<String>> sstDiffList = getSSTDiffList(src, dest, tablesToLookup);

return sstDiffList.map(diffList -> diffList.stream()
.map(
Expand All @@ -852,15 +858,17 @@ public synchronized Optional<List<String>> getSSTDiffListWithFullPath(DifferSnap
*
* @param src source snapshot
* @param dest destination snapshot
* @param tablesToLookup tablesToLookup Set of column-family (table) names to include when reading SST files;
* must be non-null.
* @return A list of SST files without extension. e.g. ["000050", "000060"]
*/
public synchronized Optional<List<String>> getSSTDiffList(DifferSnapshotInfo src,
DifferSnapshotInfo dest) {
DifferSnapshotInfo dest, Set<String> tablesToLookup) {

// TODO: Reject or swap if dest is taken after src, once snapshot chain
// integration is done.
Set<String> srcSnapFiles = readRocksDBLiveFiles(src.getRocksDB());
Set<String> destSnapFiles = readRocksDBLiveFiles(dest.getRocksDB());
Set<String> srcSnapFiles = readRocksDBLiveFiles(src.getRocksDB(), tablesToLookup);
Set<String> destSnapFiles = readRocksDBLiveFiles(dest.getRocksDB(), tablesToLookup);

Set<String> fwdDAGSameFiles = new HashSet<>();
Set<String> fwdDAGDifferentFiles = new HashSet<>();
Expand Down Expand Up @@ -896,9 +904,9 @@ public synchronized Optional<List<String>> getSSTDiffList(DifferSnapshotInfo src
}
}

if (src.getTablePrefixes() != null && !src.getTablePrefixes().isEmpty()) {
if (src.getTablePrefixes() != null && src.getTablePrefixes().size() != 0) {
RocksDiffUtils.filterRelevantSstFiles(fwdDAGDifferentFiles, src.getTablePrefixes(),
compactionDag.getCompactionMap(), src.getRocksDB(), dest.getRocksDB());
compactionDag.getCompactionMap(), tablesToLookup, src.getRocksDB(), dest.getRocksDB());
}
return Optional.of(new ArrayList<>(fwdDAGDifferentFiles));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,16 @@

package org.apache.ozone.rocksdiff;

import static org.apache.hadoop.ozone.OzoneConsts.OM_KEY_PREFIX;
import static org.apache.hadoop.hdds.StringUtils.getFirstNChars;

import com.google.common.annotations.VisibleForTesting;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import org.apache.commons.collections4.MapUtils;
import org.apache.commons.io.FilenameUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.hdds.utils.db.TablePrefixInfo;
import org.apache.hadoop.hdds.utils.db.managed.ManagedRocksDB;
import org.apache.ozone.compaction.log.CompactionFileInfo;
import org.apache.ozone.rocksdb.util.SstFileInfo;
Expand All @@ -49,41 +48,26 @@ private RocksDiffUtils() {
public static boolean isKeyWithPrefixPresent(String prefixForColumnFamily,
String firstDbKey,
String lastDbKey) {
String firstKeyPrefix = constructBucketKey(firstDbKey);
String endKeyPrefix = constructBucketKey(lastDbKey);
String firstKeyPrefix = getFirstNChars(firstDbKey, prefixForColumnFamily.length());
String endKeyPrefix = getFirstNChars(lastDbKey, prefixForColumnFamily.length());
return firstKeyPrefix.compareTo(prefixForColumnFamily) <= 0
&& prefixForColumnFamily.compareTo(endKeyPrefix) <= 0;
}

public static String constructBucketKey(String keyName) {
if (!keyName.startsWith(OM_KEY_PREFIX)) {
keyName = OM_KEY_PREFIX.concat(keyName);
}
String[] elements = keyName.split(OM_KEY_PREFIX);
String volume = elements[1];
String bucket = elements[2];
StringBuilder builder =
new StringBuilder().append(OM_KEY_PREFIX).append(volume);

if (StringUtils.isNotBlank(bucket)) {
builder.append(OM_KEY_PREFIX).append(bucket);
}
builder.append(OM_KEY_PREFIX);
return builder.toString();
}

public static void filterRelevantSstFiles(Set<String> inputFiles,
Map<String, String> tableToPrefixMap,
TablePrefixInfo tablePrefixInfo,
Set<String> columnFamiliesToLookup,
ManagedRocksDB... dbs) {
filterRelevantSstFiles(inputFiles, tableToPrefixMap, Collections.emptyMap(), dbs);
filterRelevantSstFiles(inputFiles, tablePrefixInfo, Collections.emptyMap(), columnFamiliesToLookup, dbs);
}

/**
* Filter sst files based on prefixes.
*/
public static void filterRelevantSstFiles(Set<String> inputFiles,
Map<String, String> tableToPrefixMap,
TablePrefixInfo tablePrefixInfo,
Map<String, CompactionNode> preExistingCompactionNodes,
Set<String> columnFamiliesToLookup,
ManagedRocksDB... dbs) {
Map<String, LiveFileMetaData> liveFileMetaDataMap = new HashMap<>();
int dbIdx = 0;
Expand All @@ -100,41 +84,38 @@ public static void filterRelevantSstFiles(Set<String> inputFiles,
compactionNode = new CompactionNode(new CompactionFileInfo.Builder(filename)
.setValues(liveFileMetaDataMap.get(filename)).build());
}
if (shouldSkipNode(compactionNode, tableToPrefixMap)) {
if (shouldSkipNode(compactionNode, tablePrefixInfo, columnFamiliesToLookup)) {
fileIterator.remove();
}
}
}

@VisibleForTesting
static boolean shouldSkipNode(SstFileInfo node,
Map<String, String> columnFamilyToPrefixMap) {
static boolean shouldSkipNode(SstFileInfo node, TablePrefixInfo tablePrefixInfo, Set<String> columnFamiliesToLookup) {
// This is for backward compatibility. Before the compaction log table
// migration, startKey, endKey and columnFamily information is not persisted
// in compaction log files.
// Also for the scenario when there is an exception in reading SST files
// for the file node.
if (node.getStartKey() == null || node.getEndKey() == null ||
node.getColumnFamily() == null) {
if (node.getStartKey() == null || node.getEndKey() == null || node.getColumnFamily() == null) {
LOG.debug("Compaction node with fileName: {} doesn't have startKey, " +
"endKey and columnFamily details.", node.getFileName());
return false;
}

if (MapUtils.isEmpty(columnFamilyToPrefixMap)) {
LOG.debug("Provided columnFamilyToPrefixMap is null or empty.");
if (tablePrefixInfo.size() == 0) {
LOG.debug("Provided tablePrefixInfo is null or empty.");
return false;
}

if (!columnFamilyToPrefixMap.containsKey(node.getColumnFamily())) {
if (!columnFamiliesToLookup.contains(node.getColumnFamily())) {
LOG.debug("SstFile node: {} is for columnFamily: {} while filter map " +
"contains columnFamilies: {}.", node.getFileName(),
node.getColumnFamily(), columnFamilyToPrefixMap.keySet());
node.getColumnFamily(), tablePrefixInfo);
return true;
}

String keyPrefix = columnFamilyToPrefixMap.get(node.getColumnFamily());
return !isKeyWithPrefixPresent(keyPrefix, node.getStartKey(),
node.getEndKey());
String keyPrefix = tablePrefixInfo.getTablePrefix(node.getColumnFamily());
return !isKeyWithPrefixPresent(keyPrefix, node.getStartKey(), node.getEndKey());
}
}
Loading