Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,11 @@ private ReconConstants() {
public static final String RECON_SCM_SNAPSHOT_DB = "scm.snapshot.db";

// By default, limit the number of results returned

/**
* The maximum number of top disk usage records to return in a /du response.
*/
public static final int DISK_USAGE_TOP_RECORDS_LIMIT = 30;
public static final String DEFAULT_OPEN_KEY_INCLUDE_NON_FSO = "false";
public static final String DEFAULT_OPEN_KEY_INCLUDE_FSO = "false";
public static final String DEFAULT_FETCH_COUNT = "1000";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.stream.Collectors;

import com.google.common.base.Preconditions;
import com.google.inject.Singleton;
Expand Down Expand Up @@ -59,6 +60,7 @@
import static org.jooq.impl.DSL.select;
import static org.jooq.impl.DSL.using;

import org.apache.hadoop.ozone.recon.api.types.DUResponse;
import org.apache.hadoop.ozone.recon.scm.ReconContainerReportQueue;
import org.apache.hadoop.security.authentication.client.AuthenticationException;
import org.hadoop.ozone.recon.schema.tables.daos.GlobalStatsDao;
Expand Down Expand Up @@ -322,6 +324,33 @@ public static void upsertGlobalStatsTable(Configuration sqlConfiguration,
}
}

/**
* Sorts a list of DiskUsage objects in descending order by size using parallel sorting and
* returns the top N records as specified by the limit.
*
* This method is optimized for large datasets and utilizes parallel processing to efficiently
* sort and retrieve the top N largest records by size. It's especially useful for reducing
* processing time and memory usage when only a subset of sorted records is needed.
*
* Advantages of this approach include:
* - Efficient handling of large datasets by leveraging multi-core processors.
* - Reduction in memory usage and improvement in processing time by limiting the
* number of returned records.
* - Scalability and easy integration with existing systems.
*
* @param diskUsageList the list of DiskUsage objects to be sorted.
* @param limit the maximum number of DiskUsage objects to return.
* @return a list of the top N DiskUsage objects sorted in descending order by size,
* where N is the specified limit.
*/
public static List<DUResponse.DiskUsage> sortDiskUsageDescendingWithLimit(
List<DUResponse.DiskUsage> diskUsageList, int limit) {
return diskUsageList.parallelStream()
.sorted((du1, du2) -> Long.compare(du2.getSize(), du1.getSize()))
.limit(limit)
.collect(Collectors.toList());
}

public static long getFileSizeUpperBound(long fileSize) {
if (fileSize >= ReconConstants.MAX_FILE_SIZE_UPPER_BOUND) {
return Long.MAX_VALUE;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,17 +101,18 @@ public Response getBasicInfo(
* @param path request path
* @param listFile show subpath/disk usage for each key
* @param withReplica count actual DU with replication
* @param sortSubpaths determines whether to sort the subpaths by their sizes in descending order
* and returns the N largest subpaths based on the configuration value DISK_USAGE_TOP_RECORDS_LIMIT.
* @return DU response
* @throws IOException
*/
@GET
@Path("/du")
@SuppressWarnings("methodlength")
public Response getDiskUsage(@QueryParam("path") String path,
@DefaultValue("false")
@QueryParam("files") boolean listFile,
@DefaultValue("false")
@QueryParam("replica") boolean withReplica)
@DefaultValue("false") @QueryParam("files") boolean listFile,
@DefaultValue("false") @QueryParam("replica") boolean withReplica,
@DefaultValue("true") @QueryParam("sortSubPaths") boolean sortSubpaths)
throws IOException {
if (path == null || path.length() == 0) {
return Response.status(Response.Status.BAD_REQUEST).build();
Expand All @@ -127,8 +128,7 @@ public Response getDiskUsage(@QueryParam("path") String path,
reconNamespaceSummaryManager,
omMetadataManager, reconSCM, path);

duResponse = handler.getDuResponse(
listFile, withReplica);
duResponse = handler.getDuResponse(listFile, withReplica, sortSubpaths);

return Response.ok(duResponse).build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@
import java.util.List;
import java.util.Set;

import static org.apache.hadoop.ozone.recon.ReconConstants.DISK_USAGE_TOP_RECORDS_LIMIT;
import static org.apache.hadoop.ozone.recon.ReconUtils.sortDiskUsageDescendingWithLimit;

/**
* Class for handling bucket entity type.
*/
Expand Down Expand Up @@ -87,7 +90,7 @@ private BucketObjectDBInfo getBucketObjDbInfo(String[] names)

@Override
public DUResponse getDuResponse(
boolean listFile, boolean withReplica)
boolean listFile, boolean withReplica, boolean sortSubpaths)
throws IOException {
DUResponse duResponse = new DUResponse();
duResponse.setPath(getNormalizedPath());
Expand Down Expand Up @@ -142,7 +145,15 @@ public DUResponse getDuResponse(
}
duResponse.setCount(dirDUData.size());
duResponse.setSize(bucketDataSize);

if (sortSubpaths) {
// Parallel sort directory/files DU data in descending order of size and returns the top N elements.
dirDUData = sortDiskUsageDescendingWithLimit(dirDUData,
DISK_USAGE_TOP_RECORDS_LIMIT);
}

duResponse.setDuData(dirDUData);

return duResponse;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@
import java.util.List;
import java.util.Set;

import static org.apache.hadoop.ozone.recon.ReconConstants.DISK_USAGE_TOP_RECORDS_LIMIT;
import static org.apache.hadoop.ozone.recon.ReconUtils.sortDiskUsageDescendingWithLimit;

/**
* Class for handling directory entity type.
*/
Expand Down Expand Up @@ -80,7 +83,7 @@ private ObjectDBInfo getDirectoryObjDbInfo(String[] names)

@Override
public DUResponse getDuResponse(
boolean listFile, boolean withReplica)
boolean listFile, boolean withReplica, boolean sortSubPaths)
throws IOException {
DUResponse duResponse = new DUResponse();
duResponse.setPath(getNormalizedPath());
Expand Down Expand Up @@ -154,6 +157,13 @@ public DUResponse getDuResponse(
}
duResponse.setCount(subdirDUData.size());
duResponse.setSize(dirDataSize);

if (sortSubPaths) {
// Parallel sort subdirDUData in descending order of size and returns the top N elements.
subdirDUData = sortDiskUsageDescendingWithLimit(subdirDUData,
DISK_USAGE_TOP_RECORDS_LIMIT);
}

duResponse.setDuData(subdirDUData);

return duResponse;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ public abstract NamespaceSummaryResponse getSummaryResponse()
throws IOException;

public abstract DUResponse getDuResponse(
boolean listFile, boolean withReplica)
boolean listFile, boolean withReplica, boolean sort)
throws IOException;

public abstract QuotaUsageResponse getQuotaResponse()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ private ObjectDBInfo getKeyDbObjectInfo(String[] names)

@Override
public DUResponse getDuResponse(
boolean listFile, boolean withReplica)
boolean listFile, boolean withReplica, boolean sort)
throws IOException {
DUResponse duResponse = new DUResponse();
duResponse.setPath(getNormalizedPath());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@
import java.util.ArrayList;
import java.util.List;

import static org.apache.hadoop.ozone.recon.ReconConstants.DISK_USAGE_TOP_RECORDS_LIMIT;
import static org.apache.hadoop.ozone.recon.ReconUtils.sortDiskUsageDescendingWithLimit;

/**
* Class for handling root entity type.
*/
Expand Down Expand Up @@ -88,7 +91,7 @@ private ObjectDBInfo getPrefixObjDbInfo()

@Override
public DUResponse getDuResponse(
boolean listFile, boolean withReplica)
boolean listFile, boolean withReplica, boolean sortSubPaths)
throws IOException {
DUResponse duResponse = new DUResponse();
duResponse.setPath(getNormalizedPath());
Expand Down Expand Up @@ -137,6 +140,13 @@ public DUResponse getDuResponse(
duResponse.setSizeWithReplica(totalDataSizeWithReplica);
}
duResponse.setSize(totalDataSize);

if (sortSubPaths) {
// Parallel sort volumeDuData in descending order of size and returns the top N elements.
volumeDuData = sortDiskUsageDescendingWithLimit(volumeDuData,
DISK_USAGE_TOP_RECORDS_LIMIT);
}

duResponse.setDuData(volumeDuData);

return duResponse;
Expand All @@ -148,7 +158,8 @@ public QuotaUsageResponse getQuotaResponse()
QuotaUsageResponse quotaUsageResponse = new QuotaUsageResponse();
SCMNodeStat stats = getReconSCM().getScmNodeManager().getStats();
long quotaInBytes = stats.getCapacity().get();
long quotaUsedInBytes = getDuResponse(true, true).getSizeWithReplica();
long quotaUsedInBytes =
getDuResponse(true, true, false).getSizeWithReplica();
quotaUsageResponse.setQuota(quotaInBytes);
quotaUsageResponse.setQuotaUsed(quotaUsedInBytes);
return quotaUsageResponse;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ public NamespaceSummaryResponse getSummaryResponse()

@Override
public DUResponse getDuResponse(
boolean listFile, boolean withReplica)
boolean listFile, boolean withReplica, boolean sort)
throws IOException {
DUResponse duResponse = new DUResponse();
duResponse.setStatus(ResponseStatus.PATH_NOT_FOUND);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,10 @@
import java.util.ArrayList;
import java.util.List;


import static org.apache.hadoop.ozone.recon.ReconConstants.DISK_USAGE_TOP_RECORDS_LIMIT;
import static org.apache.hadoop.ozone.recon.ReconUtils.sortDiskUsageDescendingWithLimit;

/**
* Class for handling volume entity type.
*/
Expand Down Expand Up @@ -92,7 +96,7 @@ private VolumeObjectDBInfo getVolumeObjDbInfo(String[] names)

@Override
public DUResponse getDuResponse(
boolean listFile, boolean withReplica)
boolean listFile, boolean withReplica, boolean sortSubPaths)
throws IOException {
DUResponse duResponse = new DUResponse();
duResponse.setPath(getNormalizedPath());
Expand Down Expand Up @@ -131,6 +135,13 @@ public DUResponse getDuResponse(
duResponse.setSizeWithReplica(volDataSizeWithReplica);
}
duResponse.setSize(volDataSize);

if (sortSubPaths) {
// Parallel sort bucketDuData in descending order of size and returns the top N elements.
bucketDuData = sortDiskUsageDescendingWithLimit(bucketDuData,
DISK_USAGE_TOP_RECORDS_LIMIT);
}

duResponse.setDuData(bucketDuData);
return duResponse;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ private long getEntitySize(String path) throws IOException {
EntityHandler.getEntityHandler(reconNamespaceSummaryManager,
omMetadataManager, reconSCM, path);
if (null != entityHandler) {
DUResponse duResponse = entityHandler.getDuResponse(false, false);
DUResponse duResponse = entityHandler.getDuResponse(false, false, false);
if (null != duResponse && duResponse.getStatus() == ResponseStatus.OK) {
return duResponse.getSize();
}
Expand Down
Loading