Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import org.apache.hadoop.hdds.client.RatisReplicationConfig;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
Expand Down Expand Up @@ -169,8 +170,8 @@ public void testMissingContainerDownNode() throws Exception {
List<UnhealthyContainers> allMissingContainers =
reconContainerManager.getContainerSchemaManager()
.getUnhealthyContainers(
ContainerSchemaDefinition.UnHealthyContainerStates.MISSING,
0, 1000);
ContainerSchemaDefinition.UnHealthyContainerStates.MISSING, 0L,
Optional.empty(), 1000);
return (allMissingContainers.size() == 1);
});

Expand All @@ -181,7 +182,7 @@ public void testMissingContainerDownNode() throws Exception {
reconContainerManager.getContainerSchemaManager()
.getUnhealthyContainers(
ContainerSchemaDefinition.UnHealthyContainerStates.MISSING,
0, 1000);
0L, Optional.empty(), 1000);
return (allMissingContainers.isEmpty());
});
IOUtils.closeQuietly(client);
Expand Down Expand Up @@ -247,7 +248,7 @@ public void testEmptyMissingContainerDownNode() throws Exception {
.getUnhealthyContainers(
ContainerSchemaDefinition.UnHealthyContainerStates.
EMPTY_MISSING,
0, 1000);
0L, Optional.empty(), 1000);

// Check if EMPTY_MISSING containers are not added to the DB and their count is logged
Map<ContainerSchemaDefinition.UnHealthyContainerStates, Map<String, Long>>
Expand Down Expand Up @@ -275,7 +276,7 @@ public void testEmptyMissingContainerDownNode() throws Exception {
reconContainerManager.getContainerSchemaManager()
.getUnhealthyContainers(
ContainerSchemaDefinition.UnHealthyContainerStates.MISSING,
0, 1000);
0L, Optional.empty(), 1000);
return (allMissingContainers.size() == 1);
});

Expand All @@ -285,7 +286,7 @@ public void testEmptyMissingContainerDownNode() throws Exception {
.getUnhealthyContainers(
ContainerSchemaDefinition.UnHealthyContainerStates.
EMPTY_MISSING,
0, 1000);
0L, Optional.empty(), 1000);


Map<ContainerSchemaDefinition.UnHealthyContainerStates, Map<String, Long>>
Expand Down Expand Up @@ -315,7 +316,7 @@ public void testEmptyMissingContainerDownNode() throws Exception {
.getUnhealthyContainers(
ContainerSchemaDefinition.UnHealthyContainerStates.
EMPTY_MISSING,
0, 1000);
0L, Optional.empty(), 1000);

Map<ContainerSchemaDefinition.UnHealthyContainerStates, Map<String, Long>>
unhealthyContainerStateStatsMap = reconScm.getContainerHealthTask()
Expand All @@ -335,7 +336,7 @@ public void testEmptyMissingContainerDownNode() throws Exception {
reconContainerManager.getContainerSchemaManager()
.getUnhealthyContainers(
ContainerSchemaDefinition.UnHealthyContainerStates.MISSING,
0, 1000);
0L, Optional.empty(), 1000);
return (allMissingContainers.isEmpty());
});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,10 @@
*/
@Singleton
public class ContainerSchemaDefinition implements ReconSchemaDefinition {
private static final Logger LOG = LoggerFactory.getLogger(ContainerSchemaDefinition.class);

public static final String UNHEALTHY_CONTAINERS_TABLE_NAME =
"UNHEALTHY_CONTAINERS";
private static final Logger LOG =
LoggerFactory.getLogger(ContainerSchemaDefinition.class);

private static final String CONTAINER_ID = "container_id";
private static final String CONTAINER_STATE = "container_state";
Expand Down Expand Up @@ -78,9 +77,12 @@ private void createUnhealthyContainersTable() {
.constraint(DSL.constraint("pk_container_id")
.primaryKey(CONTAINER_ID, CONTAINER_STATE))
.constraint(DSL.constraint(UNHEALTHY_CONTAINERS_TABLE_NAME + "ck1")
.check(field(name("container_state"))
.check(field(name(CONTAINER_STATE))
.in(UnHealthyContainerStates.values())))
.execute();
dslContext.createIndex("idx_container_state")
.on(DSL.table(UNHEALTHY_CONTAINERS_TABLE_NAME), DSL.field(name(CONTAINER_STATE)))
.execute();
}

public DSLContext getDSLContext() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ public final class ReconConstants {
public static final String RECON_QUERY_BATCH_PARAM = "batchNum";
public static final String RECON_QUERY_PREVKEY = "prevKey";
public static final String RECON_QUERY_START_PREFIX = "startPrefix";
public static final String RECON_QUERY_MAX_CONTAINER_ID = "maxContainerId";
public static final String RECON_QUERY_MIN_CONTAINER_ID = "minContainerId";
public static final String RECON_OPEN_KEY_INCLUDE_NON_FSO = "includeNonFso";
public static final String RECON_OPEN_KEY_INCLUDE_FSO = "includeFso";
public static final String RECON_OM_INSIGHTS_DEFAULT_START_PREFIX = "/";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import static org.apache.hadoop.ozone.om.codec.OMDBDefinition.VOLUME_TABLE;

import java.util.List;
import java.util.Optional;
import javax.inject.Inject;
import javax.ws.rs.GET;
import javax.ws.rs.Path;
Expand Down Expand Up @@ -99,7 +100,7 @@ public Response getClusterState() {
List<UnhealthyContainers> missingContainers = containerHealthSchemaManager
.getUnhealthyContainers(
ContainerSchemaDefinition.UnHealthyContainerStates.MISSING,
0, MISSING_CONTAINER_COUNT_LIMIT);
0L, Optional.empty(), MISSING_CONTAINER_COUNT_LIMIT);

containerStateCounts.setMissingContainerCount(
missingContainers.size() == MISSING_CONTAINER_COUNT_LIMIT ?
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,13 @@

package org.apache.hadoop.ozone.recon.api;

import static org.apache.hadoop.ozone.recon.ReconConstants.DEFAULT_BATCH_NUMBER;
import static org.apache.hadoop.ozone.recon.ReconConstants.DEFAULT_FETCH_COUNT;
import static org.apache.hadoop.ozone.recon.ReconConstants.DEFAULT_FILTER_FOR_MISSING_CONTAINERS;
import static org.apache.hadoop.ozone.recon.ReconConstants.PREV_CONTAINER_ID_DEFAULT_VALUE;
import static org.apache.hadoop.ozone.recon.ReconConstants.RECON_QUERY_BATCH_PARAM;
import static org.apache.hadoop.ozone.recon.ReconConstants.RECON_QUERY_FILTER;
import static org.apache.hadoop.ozone.recon.ReconConstants.RECON_QUERY_LIMIT;
import static org.apache.hadoop.ozone.recon.ReconConstants.RECON_QUERY_MAX_CONTAINER_ID;
import static org.apache.hadoop.ozone.recon.ReconConstants.RECON_QUERY_MIN_CONTAINER_ID;
import static org.apache.hadoop.ozone.recon.ReconConstants.RECON_QUERY_PREVKEY;

import java.io.IOException;
Expand All @@ -35,6 +35,7 @@
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import java.util.stream.Collectors;
import javax.inject.Inject;
Expand Down Expand Up @@ -339,7 +340,7 @@ public Response getMissingContainers(
) {
List<MissingContainerMetadata> missingContainers = new ArrayList<>();
containerHealthSchemaManager.getUnhealthyContainers(
UnHealthyContainerStates.MISSING, 0, limit)
UnHealthyContainerStates.MISSING, 0L, Optional.empty(), limit)
.forEach(container -> {
long containerID = container.getContainerId();
try {
Expand Down Expand Up @@ -373,9 +374,12 @@ public Response getMissingContainers(
* eg UNDER_REPLICATED, MIS_REPLICATED, OVER_REPLICATED or
* MISSING. Passing null returns all containers.
* @param limit The limit of unhealthy containers to return.
* @param batchNum The batch number (like "page number") of results to return.
* Passing 1, will return records 1 to limit. 2 will return
* limit + 1 to 2 * limit, etc.
* @param maxContainerId Upper bound for container IDs to include (exclusive).
* When specified, returns containers with IDs less than this value
* in descending order. Use for backward pagination.
* @param minContainerId Lower bound for container IDs to include (exclusive).
* When maxContainerId is not specified, returns containers with IDs
* greater than this value in ascending order. Use for forward pagination.
* @return {@link Response}
*/
@GET
Expand All @@ -384,10 +388,11 @@ public Response getUnhealthyContainers(
@PathParam("state") String state,
@DefaultValue(DEFAULT_FETCH_COUNT) @QueryParam(RECON_QUERY_LIMIT)
int limit,
@DefaultValue(DEFAULT_BATCH_NUMBER)
@QueryParam(RECON_QUERY_BATCH_PARAM) int batchNum) {
int offset = Math.max(((batchNum - 1) * limit), 0);

@DefaultValue(PREV_CONTAINER_ID_DEFAULT_VALUE)
@QueryParam(RECON_QUERY_MAX_CONTAINER_ID) long maxContainerId,
@DefaultValue(PREV_CONTAINER_ID_DEFAULT_VALUE)
@QueryParam(RECON_QUERY_MIN_CONTAINER_ID) long minContainerId) {
Optional<Long> maxContainerIdOpt = maxContainerId > 0 ? Optional.of(maxContainerId) : Optional.empty();
List<UnhealthyContainerMetadata> unhealthyMeta = new ArrayList<>();
List<UnhealthyContainersSummary> summary;
try {
Expand All @@ -401,7 +406,7 @@ public Response getUnhealthyContainers(

summary = containerHealthSchemaManager.getUnhealthyContainersSummary();
List<UnhealthyContainers> containers = containerHealthSchemaManager
.getUnhealthyContainers(internalState, offset, limit);
.getUnhealthyContainers(internalState, minContainerId, maxContainerIdOpt, limit);

// Filtering out EMPTY_MISSING and NEGATIVE_SIZE containers from the response.
// These container states are not being inserted into the database as they represent
Expand Down Expand Up @@ -434,6 +439,12 @@ public Response getUnhealthyContainers(

UnhealthyContainersResponse response =
new UnhealthyContainersResponse(unhealthyMeta);
if (!unhealthyMeta.isEmpty()) {
response.setFirstKey(unhealthyMeta.stream().map(UnhealthyContainerMetadata::getContainerID)
.min(Long::compareTo).orElse(0L));
response.setLastKey(unhealthyMeta.stream().map(UnhealthyContainerMetadata::getContainerID)
.max(Long::compareTo).orElse(0L));
}
for (UnhealthyContainersSummary s : summary) {
response.setSummaryCount(s.getContainerState(), s.getCount());
}
Expand All @@ -445,19 +456,24 @@ public Response getUnhealthyContainers(
* {@link org.apache.hadoop.ozone.recon.api.types.UnhealthyContainerMetadata}
* for all unhealthy containers.
* @param limit The limit of unhealthy containers to return.
* @param batchNum The batch number (like "page number") of results to return.
* Passing 1, will return records 1 to limit. 2 will return
* limit + 1 to 2 * limit, etc.
* @param maxContainerId Upper bound for container IDs to include (exclusive).
* When specified, returns containers with IDs less than this value
* in descending order. Use for backward pagination.
* @param minContainerId Lower bound for container IDs to include (exclusive).
* When maxContainerId is not specified, returns containers with IDs
* greater than this value in ascending order. Use for forward pagination.
* @return {@link Response}
*/
@GET
@Path("/unhealthy")
public Response getUnhealthyContainers(
@DefaultValue(DEFAULT_FETCH_COUNT) @QueryParam(RECON_QUERY_LIMIT)
int limit,
@DefaultValue(DEFAULT_BATCH_NUMBER)
@QueryParam(RECON_QUERY_BATCH_PARAM) int batchNum) {
return getUnhealthyContainers(null, limit, batchNum);
@DefaultValue(PREV_CONTAINER_ID_DEFAULT_VALUE)
@QueryParam(RECON_QUERY_MAX_CONTAINER_ID) long maxContainerId,
@DefaultValue(PREV_CONTAINER_ID_DEFAULT_VALUE)
@QueryParam(RECON_QUERY_MIN_CONTAINER_ID) long minContainerId) {
return getUnhealthyContainers(null, limit, maxContainerId, minContainerId);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,23 @@ public class UnhealthyContainersResponse {
@JsonProperty("replicaMismatchCount")
private long replicaMismatchCount = 0;

/**
* The smallest container ID in the current response batch.
* Used for pagination to determine the lower bound for the next page.
*/
@JsonProperty("firstKey")
private long firstKey = 0;


/**
* The largest container ID in the current response batch.
* Used for pagination to determine the upper bound for the next page.
*/
@JsonProperty("lastKey")
private long lastKey = 0;



/**
* A collection of unhealthy containers.
*/
Expand Down Expand Up @@ -108,7 +125,23 @@ public long getReplicaMismatchCount() {
return replicaMismatchCount;
}

public long getLastKey() {
return lastKey;
}

public long getFirstKey() {
return firstKey;
}

public Collection<UnhealthyContainerMetadata> getContainers() {
return containers;
}

public void setFirstKey(long firstKey) {
this.firstKey = firstKey;
}

public void setLastKey(long lastKey) {
this.lastKey = lastKey;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,18 +23,24 @@
import static org.apache.ozone.recon.schema.generated.tables.UnhealthyContainersTable.UNHEALTHY_CONTAINERS;
import static org.jooq.impl.DSL.count;

import com.google.common.annotations.VisibleForTesting;
import com.google.inject.Inject;
import com.google.inject.Singleton;
import java.sql.Connection;
import java.util.Comparator;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
import org.apache.hadoop.ozone.recon.api.types.UnhealthyContainersSummary;
import org.apache.ozone.recon.schema.ContainerSchemaDefinition;
import org.apache.ozone.recon.schema.ContainerSchemaDefinition.UnHealthyContainerStates;
import org.apache.ozone.recon.schema.generated.tables.daos.UnhealthyContainersDao;
import org.apache.ozone.recon.schema.generated.tables.pojos.UnhealthyContainers;
import org.apache.ozone.recon.schema.generated.tables.records.UnhealthyContainersRecord;
import org.jooq.Condition;
import org.jooq.Cursor;
import org.jooq.DSLContext;
import org.jooq.OrderField;
import org.jooq.Record;
import org.jooq.SelectQuery;
import org.jooq.exception.DataAccessException;
Expand Down Expand Up @@ -67,32 +73,47 @@ public ContainerHealthSchemaManager(
* matching the given state will be returned.
* @param state Return only containers in this state, or all containers if
* null
* @param offset The starting record to return in the result set. The first
* record is at zero.
* @param minContainerId minimum containerId for filter
* @param maxContainerId maximum containerId for filter
* @param limit The total records to return
* @return List of unhealthy containers.
*/
public List<UnhealthyContainers> getUnhealthyContainers(
UnHealthyContainerStates state, int offset, int limit) {
UnHealthyContainerStates state, Long minContainerId, Optional<Long> maxContainerId, int limit) {
DSLContext dslContext = containerSchemaDefinition.getDSLContext();
SelectQuery<Record> query = dslContext.selectQuery();
query.addFrom(UNHEALTHY_CONTAINERS);
Condition containerCondition;
OrderField[] orderField;
if (maxContainerId.isPresent() && maxContainerId.get() > 0) {
containerCondition = UNHEALTHY_CONTAINERS.CONTAINER_ID.lessThan(maxContainerId.get());
orderField = new OrderField[]{UNHEALTHY_CONTAINERS.CONTAINER_ID.desc(),
UNHEALTHY_CONTAINERS.CONTAINER_STATE.asc()};
} else {
containerCondition = UNHEALTHY_CONTAINERS.CONTAINER_ID.greaterThan(minContainerId);
orderField = new OrderField[]{UNHEALTHY_CONTAINERS.CONTAINER_ID.asc(),
UNHEALTHY_CONTAINERS.CONTAINER_STATE.asc()};
}
if (state != null) {
if (state.equals(ALL_REPLICAS_BAD)) {
query.addConditions(UNHEALTHY_CONTAINERS.CONTAINER_STATE
.eq(UNDER_REPLICATED.toString()));
query.addConditions(containerCondition.and(UNHEALTHY_CONTAINERS.CONTAINER_STATE
.eq(UNDER_REPLICATED.toString())));
query.addConditions(UNHEALTHY_CONTAINERS.ACTUAL_REPLICA_COUNT.eq(0));
} else {
query.addConditions(
UNHEALTHY_CONTAINERS.CONTAINER_STATE.eq(state.toString()));
query.addConditions(containerCondition.and(UNHEALTHY_CONTAINERS.CONTAINER_STATE.eq(state.toString())));
}
} else {
// CRITICAL FIX: Apply pagination condition even when state is null
// This ensures proper pagination for the "get all unhealthy containers" use case
query.addConditions(containerCondition);
}
query.addOrderBy(UNHEALTHY_CONTAINERS.CONTAINER_ID.asc(),
UNHEALTHY_CONTAINERS.CONTAINER_STATE.asc());
query.addOffset(offset);

query.addOrderBy(orderField);
query.addLimit(limit);

return query.fetchInto(UnhealthyContainers.class);
return query.fetchInto(UnhealthyContainers.class).stream()
.sorted(Comparator.comparingLong(UnhealthyContainers::getContainerId))
.collect(Collectors.toList());
}

/**
Expand Down Expand Up @@ -151,4 +172,19 @@ public void insertUnhealthyContainerRecords(List<UnhealthyContainers> recs) {
}
}

/**
* Clear all unhealthy container records. This is primarily used for testing
* to ensure clean state between tests.
*/
@VisibleForTesting
public void clearAllUnhealthyContainerRecords() {
DSLContext dslContext = containerSchemaDefinition.getDSLContext();
try {
dslContext.deleteFrom(UNHEALTHY_CONTAINERS).execute();
LOG.info("Cleared all unhealthy container records");
} catch (Exception e) {
LOG.info("Failed to clear unhealthy container records", e);
}
}

}
Loading