Skip to content
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
b858f85
HDDS-11205. Implement a search feature for users to locate keys pendi…
ArafatKhan2198 Jul 19, 2024
f9a96c5
Fixed a bug and improved the tests
ArafatKhan2198 Sep 30, 2024
26223d0
Refactored some code
ArafatKhan2198 Oct 1, 2024
abb9d65
Improved the java doc
ArafatKhan2198 Oct 1, 2024
abca3c9
Fixed checkstyle issues
ArafatKhan2198 Oct 4, 2024
2af1de1
Fixed a dead store variable
ArafatKhan2198 Oct 6, 2024
ab9765a
Refactored key search logic to OMDBInsightEndpoint and enhanced with…
ArafatKhan2198 Oct 10, 2024
53d8126
Fixed checkstyle issues
ArafatKhan2198 Oct 10, 2024
28c4536
Made sure all the Insight endpoints utilise one method for extracting…
ArafatKhan2198 Oct 10, 2024
1df317d
Made final review changes
ArafatKhan2198 Oct 10, 2024
5d9d3c2
Fixed failing test
ArafatKhan2198 Oct 10, 2024
9ec2c32
Fixed java doc error
ArafatKhan2198 Oct 11, 2024
384ce67
Merge branch 'master' into HDDS-11205
ArafatKhan2198 Oct 11, 2024
28b0b00
Fixed possible java doc comment
ArafatKhan2198 Oct 11, 2024
16b2c36
Fixed final java doc problem
ArafatKhan2198 Oct 14, 2024
da82a51
Fixed checkstyle issue
ArafatKhan2198 Oct 14, 2024
86c9f46
Final review refactoring
ArafatKhan2198 Oct 16, 2024
2338c5e
Fixed checkstyle issues
ArafatKhan2198 Oct 16, 2024
f688dac
Removed unnecessary blank check
ArafatKhan2198 Oct 16, 2024
96a7f79
Reverted back the blank check
ArafatKhan2198 Oct 16, 2024
005850a
Done with the null check
ArafatKhan2198 Oct 18, 2024
510b307
Fixed the checkstyle and changed the status code
ArafatKhan2198 Oct 18, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -43,20 +43,20 @@ private ReconConstants() {
public static final int DISK_USAGE_TOP_RECORDS_LIMIT = 30;
public static final String DEFAULT_OPEN_KEY_INCLUDE_NON_FSO = "false";
public static final String DEFAULT_OPEN_KEY_INCLUDE_FSO = "false";
public static final String DEFAULT_START_PREFIX = "/";
public static final String DEFAULT_FETCH_COUNT = "1000";
public static final String DEFAULT_KEY_SIZE = "0";
public static final String DEFAULT_BATCH_NUMBER = "1";
public static final String RECON_QUERY_BATCH_PARAM = "batchNum";
public static final String RECON_QUERY_PREVKEY = "prevKey";
public static final String RECON_QUERY_START_PREFIX = "startPrefix";
public static final String RECON_OPEN_KEY_INCLUDE_NON_FSO = "includeNonFso";
public static final String RECON_OPEN_KEY_INCLUDE_FSO = "includeFso";
public static final String RECON_OPEN_KEY_DEFAULT_SEARCH_LIMIT = "1000";
public static final String RECON_OPEN_KEY_SEARCH_DEFAULT_PREV_KEY = "";
public static final String RECON_OM_INSIGHTS_DEFAULT_START_PREFIX = "/";
public static final String RECON_OM_INSIGHTS_DEFAULT_SEARCH_LIMIT = "1000";
public static final String RECON_OM_INSIGHTS_DEFAULT_SEARCH_PREV_KEY = "";
public static final String RECON_QUERY_FILTER = "missingIn";
public static final String PREV_CONTAINER_ID_DEFAULT_VALUE = "0";
public static final String PREV_DELETED_BLOCKS_TRANSACTION_ID_DEFAULT_VALUE =
"0";
public static final String PREV_DELETED_BLOCKS_TRANSACTION_ID_DEFAULT_VALUE = "0";
// Only include containers that are missing in OM by default
public static final String DEFAULT_FILTER_FOR_MISSING_CONTAINERS = "SCM";
public static final String RECON_QUERY_LIMIT = "limit";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.time.Instant;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.List;
import java.util.TimeZone;
import java.util.Date;
Expand All @@ -54,6 +56,8 @@
import org.apache.hadoop.hdds.scm.ha.SCMNodeDetails;
import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher;
import org.apache.hadoop.hdds.utils.HddsServerUtil;
import org.apache.hadoop.hdds.utils.db.Table;
import org.apache.hadoop.hdds.utils.db.TableIterator;
import org.apache.hadoop.hdfs.web.URLConnectionFactory;
import org.apache.hadoop.io.IOUtils;

Expand Down Expand Up @@ -596,6 +600,70 @@ public static long convertToEpochMillis(String dateString, String dateFormat, Ti
}
}

/**
* Retrieves keys from the specified table based on pagination and prefix filtering.
* This method handles different scenarios based on the presence of startPrefix and prevKey,
* enabling efficient key retrieval from the table.
*
* The method handles the following cases:
*
* 1. prevKey provided, startPrefix empty:
* - Seeks to prevKey, skips it, and returns subsequent records up to the limit.
*
* 2. prevKey empty, startPrefix empty:
* - Iterates from the beginning of the table, retrieving all records up to the limit.
*
* 3. startPrefix provided, prevKey empty:
* - Seeks to the first key matching startPrefix and returns all matching keys up to the limit.
*
* 4. startPrefix provided, prevKey provided:
* - Seeks to prevKey, skips it, and returns subsequent keys that match startPrefix, up to the limit.
*
* If limit is 0, all matching keys are retrieved. If both startPrefix and prevKey are empty, the method starts
* from the beginning of the table.
*/
public static <T> Map<String, T> extractKeysFromTable(
Table<String, T> table, String startPrefix, int limit, String prevKey)
throws IOException {

Map<String, T> matchedKeys = new LinkedHashMap<>();
try (TableIterator<String, ? extends Table.KeyValue<String, T>> keyIter = table.iterator()) {

// Scenario 1 & 4: prevKey is provided (whether startPrefix is empty or not)
if (!prevKey.isEmpty()) {
keyIter.seek(prevKey);
if (keyIter.hasNext()) {
// Skip the previous key record
keyIter.next();
}
} else if (!startPrefix.isEmpty()) {
// Scenario 3: startPrefix is provided but prevKey is empty, so seek to startPrefix
keyIter.seek(startPrefix);
}
// Scenario 2: Both startPrefix and prevKey are empty (iterate from the start of the table)
// No seeking needed; just start iterating from the first record in the table
// This is implicit in the following loop, as the iterator will start from the beginning

// Iterate through the keys while adhering to the limit (if the limit is not zero)
while (keyIter.hasNext() && (limit == 0 || matchedKeys.size() < limit)) {
Table.KeyValue<String, T> entry = keyIter.next();
String dbKey = entry.getKey();

// Scenario 3 & 4: If startPrefix is provided, ensure the key matches startPrefix
if (!startPrefix.isEmpty() && !dbKey.startsWith(startPrefix)) {
break; // If the key no longer matches the prefix, exit the loop
}

// Add the valid key-value pair to the results
matchedKeys.put(dbKey, entry.getValue());
}
} catch (IOException exception) {
log.error("Error retrieving keys from table for path: {}", startPrefix, exception);
throw exception;
}
return matchedKeys;
}

/**
* Finds all subdirectories under a parent directory in an FSO bucket. It builds
* a list of paths for these subdirectories. These sub-directories are then used
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,19 +66,25 @@
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static org.apache.commons.lang3.StringUtils.isNotBlank;
import static org.apache.hadoop.ozone.OzoneConsts.OM_KEY_PREFIX;
import static org.apache.hadoop.ozone.om.OmMetadataManagerImpl.OPEN_FILE_TABLE;
import static org.apache.hadoop.ozone.om.OmMetadataManagerImpl.OPEN_KEY_TABLE;
import static org.apache.hadoop.ozone.om.OmMetadataManagerImpl.DELETED_TABLE;
import static org.apache.hadoop.ozone.om.OmMetadataManagerImpl.DELETED_DIR_TABLE;
import static org.apache.hadoop.ozone.recon.ReconConstants.DEFAULT_FETCH_COUNT;
import static org.apache.hadoop.ozone.recon.ReconConstants.DEFAULT_KEY_SIZE;
import static org.apache.hadoop.ozone.recon.ReconConstants.RECON_QUERY_LIMIT;
import static org.apache.hadoop.ozone.recon.ReconConstants.RECON_QUERY_PREVKEY;
import static org.apache.hadoop.ozone.recon.ReconConstants.DEFAULT_OPEN_KEY_INCLUDE_FSO;
import static org.apache.hadoop.ozone.recon.ReconConstants.DEFAULT_OPEN_KEY_INCLUDE_NON_FSO;
import static org.apache.hadoop.ozone.recon.ReconConstants.RECON_OPEN_KEY_INCLUDE_FSO;
import static org.apache.hadoop.ozone.recon.ReconConstants.RECON_OPEN_KEY_INCLUDE_NON_FSO;
import static org.apache.hadoop.ozone.recon.ReconConstants.RECON_QUERY_START_PREFIX;
import static org.apache.hadoop.ozone.recon.ReconConstants.DEFAULT_KEY_SIZE;
import static org.apache.hadoop.ozone.recon.ReconConstants.RECON_QUERY_LIMIT;
import static org.apache.hadoop.ozone.recon.ReconConstants.RECON_QUERY_PREVKEY;
import static org.apache.hadoop.ozone.recon.ReconResponseUtils.createBadRequestResponse;
import static org.apache.hadoop.ozone.recon.ReconResponseUtils.createInternalServerErrorResponse;
import static org.apache.hadoop.ozone.recon.ReconResponseUtils.noMatchedKeysResponse;
import static org.apache.hadoop.ozone.recon.ReconUtils.extractKeysFromTable;
import static org.apache.hadoop.ozone.recon.api.handlers.BucketHandler.getBucketHandler;
import static org.apache.hadoop.ozone.recon.api.handlers.EntityHandler.normalizePath;
import static org.apache.hadoop.ozone.recon.api.handlers.EntityHandler.parseRequestPath;
Expand Down Expand Up @@ -211,15 +217,15 @@ public Response getOpenKeyInfo(
keyIter = openKeyTable.iterator()) {
boolean skipPrevKey = false;
String seekKey = prevKey;
if (!skipPrevKeyDone && StringUtils.isNotBlank(prevKey)) {
if (!skipPrevKeyDone && isNotBlank(prevKey)) {
skipPrevKey = true;
Table.KeyValue<String, OmKeyInfo> seekKeyValue =
keyIter.seek(seekKey);
// check if RocksDB was able to seek correctly to the given key prefix
// if not, then return empty result
// In case of an empty prevKeyPrefix, all the keys are returned
if (seekKeyValue == null ||
(StringUtils.isNotBlank(prevKey) &&
(isNotBlank(prevKey) &&
!seekKeyValue.getKey().equals(prevKey))) {
continue;
}
Expand Down Expand Up @@ -339,62 +345,6 @@ private Long getValueFromId(GlobalStats record) {
return record != null ? record.getValue() : 0L;
}

private void getPendingForDeletionKeyInfo(
int limit,
String prevKey,
KeyInsightInfoResponse deletedKeyAndDirInsightInfo) {
List<RepeatedOmKeyInfo> repeatedOmKeyInfoList =
deletedKeyAndDirInsightInfo.getRepeatedOmKeyInfoList();
Table<String, RepeatedOmKeyInfo> deletedTable =
omMetadataManager.getDeletedTable();
try (
TableIterator<String, ? extends Table.KeyValue<String,
RepeatedOmKeyInfo>>
keyIter = deletedTable.iterator()) {
boolean skipPrevKey = false;
String seekKey = prevKey;
String lastKey = "";
if (StringUtils.isNotBlank(prevKey)) {
skipPrevKey = true;
Table.KeyValue<String, RepeatedOmKeyInfo> seekKeyValue =
keyIter.seek(seekKey);
// check if RocksDB was able to seek correctly to the given key prefix
// if not, then return empty result
// In case of an empty prevKeyPrefix, all the keys are returned
if (seekKeyValue == null ||
(StringUtils.isNotBlank(prevKey) &&
!seekKeyValue.getKey().equals(prevKey))) {
return;
}
}
while (keyIter.hasNext()) {
Table.KeyValue<String, RepeatedOmKeyInfo> kv = keyIter.next();
String key = kv.getKey();
lastKey = key;
RepeatedOmKeyInfo repeatedOmKeyInfo = kv.getValue();
// skip the prev key if prev key is present
if (skipPrevKey && key.equals(prevKey)) {
continue;
}
updateReplicatedAndUnReplicatedTotal(deletedKeyAndDirInsightInfo,
repeatedOmKeyInfo);
repeatedOmKeyInfoList.add(repeatedOmKeyInfo);
if ((repeatedOmKeyInfoList.size()) == limit) {
break;
}
}
deletedKeyAndDirInsightInfo.setLastKey(lastKey);
} catch (IOException ex) {
throw new WebApplicationException(ex,
Response.Status.INTERNAL_SERVER_ERROR);
} catch (IllegalArgumentException e) {
throw new WebApplicationException(e, Response.Status.BAD_REQUEST);
} catch (Exception ex) {
throw new WebApplicationException(ex,
Response.Status.INTERNAL_SERVER_ERROR);
}
}

/** Retrieves the summary of deleted keys.
*
* This method calculates and returns a summary of deleted keys.
Expand Down Expand Up @@ -428,6 +378,7 @@ public Response getDeletedKeySummary() {
* limit - limits the number of key/files returned.
* prevKey - E.g. /vol1/bucket1/key1, this will skip keys till it
* seeks correctly to the given prevKey.
* startPrefix - E.g. /vol1/bucket1, this will return keys matching this prefix.
* Sample API Response:
* {
* "lastKey": "vol1/bucket1/key1",
Expand Down Expand Up @@ -476,17 +427,95 @@ public Response getDeletedKeySummary() {
@GET
@Path("/deletePending")
public Response getDeletedKeyInfo(
@DefaultValue(DEFAULT_FETCH_COUNT) @QueryParam(RECON_QUERY_LIMIT)
int limit,
@DefaultValue(StringUtils.EMPTY) @QueryParam(RECON_QUERY_PREVKEY)
String prevKey) {
KeyInsightInfoResponse
deletedKeyInsightInfo = new KeyInsightInfoResponse();
getPendingForDeletionKeyInfo(limit, prevKey,
deletedKeyInsightInfo);
@DefaultValue(DEFAULT_FETCH_COUNT) @QueryParam(RECON_QUERY_LIMIT) int limit,
@DefaultValue(StringUtils.EMPTY) @QueryParam(RECON_QUERY_PREVKEY) String prevKey,
@DefaultValue(StringUtils.EMPTY) @QueryParam(RECON_QUERY_START_PREFIX) String startPrefix) {

// Initialize the response object to hold the key information
KeyInsightInfoResponse deletedKeyInsightInfo = new KeyInsightInfoResponse();

// Ensure the limit is non-negative
limit = Math.max(0, limit);

boolean keysFound = false;

try {
if (isNotBlank(startPrefix)) {
// Validate and apply prefix-based search
if (!validateStartPrefix(startPrefix)) {
return createBadRequestResponse("Invalid startPrefix: Path must be at the bucket level or deeper.");
}
keysFound = getPendingForDeletionKeyInfo(limit, prevKey, startPrefix, deletedKeyInsightInfo);
} else {
// Retrieve all records without prefix
keysFound = getPendingForDeletionKeyInfo(limit, prevKey, "", deletedKeyInsightInfo);
}
} catch (IllegalArgumentException e) {
LOG.debug("Invalid startPrefix provided: {}", startPrefix, e);
return createBadRequestResponse("Invalid startPrefix: " + e.getMessage());
} catch (IOException e) {
LOG.debug("I/O error while searching deleted keys in OM DB", e);
return createInternalServerErrorResponse("Error searching deleted keys in OM DB: " + e.getMessage());
} catch (Exception e) {
LOG.debug("Unexpected error occurred while searching deleted keys", e);
return createInternalServerErrorResponse("Unexpected error: " + e.getMessage());
}

if (!keysFound) {
return noMatchedKeysResponse("");
}

return Response.ok(deletedKeyInsightInfo).build();
}

/**
* Retrieves keys pending deletion based on startPrefix, filtering keys matching the prefix.
*
* @param limit The limit of records to return.
* @param prevKey Pagination key.
* @param startPrefix The search prefix.
* @param deletedKeyInsightInfo The response object to populate.
*/
private boolean getPendingForDeletionKeyInfo(
int limit, String prevKey, String startPrefix,
KeyInsightInfoResponse deletedKeyInsightInfo) throws IOException {

long replicatedTotal = 0;
long unreplicatedTotal = 0;
boolean keysFound = false;
String lastKey = null;

// Search for deleted keys in DeletedTable
Table<String, RepeatedOmKeyInfo> deletedTable = omMetadataManager.getDeletedTable();
Map<String, RepeatedOmKeyInfo> deletedKeys =
extractKeysFromTable(deletedTable, startPrefix, limit, prevKey);

// Iterate over the retrieved keys and populate the response
for (Map.Entry<String, RepeatedOmKeyInfo> entry : deletedKeys.entrySet()) {
keysFound = true;
RepeatedOmKeyInfo repeatedOmKeyInfo = entry.getValue();

// We know each RepeatedOmKeyInfo has just one OmKeyInfo object
OmKeyInfo keyInfo = repeatedOmKeyInfo.getOmKeyInfoList().get(0);
KeyEntityInfo keyEntityInfo = createKeyEntityInfoFromOmKeyInfo(entry.getKey(), keyInfo);

// Add the key directly to the list without classification
deletedKeyInsightInfo.getRepeatedOmKeyInfoList().add(repeatedOmKeyInfo);

replicatedTotal += keyInfo.getReplicatedSize();
unreplicatedTotal += keyInfo.getDataSize();

lastKey = entry.getKey(); // Update lastKey
}

// Set the aggregated totals in the response
deletedKeyInsightInfo.setReplicatedDataSize(replicatedTotal);
deletedKeyInsightInfo.setUnreplicatedDataSize(unreplicatedTotal);
deletedKeyInsightInfo.setLastKey(lastKey);

return keysFound;
}

/**
* Creates a keys summary for deleted keys and updates the provided
* keysSummary map. Calculates the total number of deleted keys, replicated
Expand Down Expand Up @@ -526,15 +555,15 @@ private void getPendingForDeletionDirInfo(
boolean skipPrevKey = false;
String seekKey = prevKey;
String lastKey = "";
if (StringUtils.isNotBlank(prevKey)) {
if (isNotBlank(prevKey)) {
skipPrevKey = true;
Table.KeyValue<String, OmKeyInfo> seekKeyValue =
keyIter.seek(seekKey);
// check if RocksDB was able to seek correctly to the given key prefix
// if not, then return empty result
// In case of an empty prevKeyPrefix, all the keys are returned
if (seekKeyValue == null ||
(StringUtils.isNotBlank(prevKey) &&
(isNotBlank(prevKey) &&
!seekKeyValue.getKey().equals(prevKey))) {
return;
}
Expand Down Expand Up @@ -1161,7 +1190,7 @@ private Map<String, OmKeyInfo> retrieveKeysFromTable(
try (
TableIterator<String, ? extends Table.KeyValue<String, OmKeyInfo>> keyIter = table.iterator()) {

if (!paramInfo.isSkipPrevKeyDone() && StringUtils.isNotBlank(seekKey)) {
if (!paramInfo.isSkipPrevKeyDone() && isNotBlank(seekKey)) {
skipPrevKey = true;
Table.KeyValue<String, OmKeyInfo> seekKeyValue =
keyIter.seek(seekKey);
Expand Down Expand Up @@ -1277,17 +1306,18 @@ private void createSummaryForDeletedDirectories(
dirSummary.put("totalDeletedDirectories", deletedDirCount);
}

private void updateReplicatedAndUnReplicatedTotal(
KeyInsightInfoResponse deletedKeyAndDirInsightInfo,
RepeatedOmKeyInfo repeatedOmKeyInfo) {
repeatedOmKeyInfo.getOmKeyInfoList().forEach(omKeyInfo -> {
deletedKeyAndDirInsightInfo.setUnreplicatedDataSize(
deletedKeyAndDirInsightInfo.getUnreplicatedDataSize() +
omKeyInfo.getDataSize());
deletedKeyAndDirInsightInfo.setReplicatedDataSize(
deletedKeyAndDirInsightInfo.getReplicatedDataSize() +
omKeyInfo.getReplicatedSize());
});
private boolean validateStartPrefix(String startPrefix) {

// Ensure startPrefix starts with '/' for non-empty values
startPrefix = startPrefix.startsWith("/") ? startPrefix : "/" + startPrefix;

// Split the path to ensure it's at least at the bucket level (volume/bucket).
String[] pathComponents = startPrefix.split("/");
if (pathComponents.length < 3 || pathComponents[2].isEmpty()) {
return false; // Invalid if not at bucket level or deeper
}

return true;
}

private String createPath(OmKeyInfo omKeyInfo) {
Expand Down
Loading