Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import org.apache.hadoop.hdds.client.RatisReplicationConfig;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
Expand Down Expand Up @@ -168,8 +169,8 @@ public void testMissingContainerDownNode() throws Exception {
List<UnhealthyContainers> allMissingContainers =
reconContainerManager.getContainerSchemaManager()
.getUnhealthyContainers(
ContainerSchemaDefinition.UnHealthyContainerStates.MISSING,
0, 1000);
ContainerSchemaDefinition.UnHealthyContainerStates.MISSING, 0L,
Optional.empty(), 1000);
return (allMissingContainers.size() == 1);
});

Expand All @@ -180,7 +181,7 @@ public void testMissingContainerDownNode() throws Exception {
reconContainerManager.getContainerSchemaManager()
.getUnhealthyContainers(
ContainerSchemaDefinition.UnHealthyContainerStates.MISSING,
0, 1000);
0L, Optional.empty(), 1000);
return (allMissingContainers.isEmpty());
});
IOUtils.closeQuietly(client);
Expand Down Expand Up @@ -246,7 +247,7 @@ public void testEmptyMissingContainerDownNode() throws Exception {
.getUnhealthyContainers(
ContainerSchemaDefinition.UnHealthyContainerStates.
EMPTY_MISSING,
0, 1000);
0L, Optional.empty(), 1000);

// Check if EMPTY_MISSING containers are not added to the DB and their count is logged
Map<ContainerSchemaDefinition.UnHealthyContainerStates, Map<String, Long>>
Expand Down Expand Up @@ -274,7 +275,7 @@ public void testEmptyMissingContainerDownNode() throws Exception {
reconContainerManager.getContainerSchemaManager()
.getUnhealthyContainers(
ContainerSchemaDefinition.UnHealthyContainerStates.MISSING,
0, 1000);
0L, Optional.empty(), 1000);
return (allMissingContainers.size() == 1);
});

Expand All @@ -284,7 +285,7 @@ public void testEmptyMissingContainerDownNode() throws Exception {
.getUnhealthyContainers(
ContainerSchemaDefinition.UnHealthyContainerStates.
EMPTY_MISSING,
0, 1000);
0L, Optional.empty(), 1000);


Map<ContainerSchemaDefinition.UnHealthyContainerStates, Map<String, Long>>
Expand Down Expand Up @@ -314,7 +315,7 @@ public void testEmptyMissingContainerDownNode() throws Exception {
.getUnhealthyContainers(
ContainerSchemaDefinition.UnHealthyContainerStates.
EMPTY_MISSING,
0, 1000);
0L, Optional.empty(), 1000);

Map<ContainerSchemaDefinition.UnHealthyContainerStates, Map<String, Long>>
unhealthyContainerStateStatsMap = reconScm.getContainerHealthTask()
Expand All @@ -334,7 +335,7 @@ public void testEmptyMissingContainerDownNode() throws Exception {
reconContainerManager.getContainerSchemaManager()
.getUnhealthyContainers(
ContainerSchemaDefinition.UnHealthyContainerStates.MISSING,
0, 1000);
0L, Optional.empty(), 1000);
return (allMissingContainers.isEmpty());
});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,10 @@
*/
@Singleton
public class ContainerSchemaDefinition implements ReconSchemaDefinition {
private static final Logger LOG = LoggerFactory.getLogger(ContainerSchemaDefinition.class);

public static final String UNHEALTHY_CONTAINERS_TABLE_NAME =
"UNHEALTHY_CONTAINERS";
private static final Logger LOG =
LoggerFactory.getLogger(ContainerSchemaDefinition.class);

/**
* ENUM describing the allowed container states which can be stored in the
Expand Down Expand Up @@ -92,9 +91,12 @@ private void createUnhealthyContainersTable() {
.constraint(DSL.constraint("pk_container_id")
.primaryKey(CONTAINER_ID, CONTAINER_STATE))
.constraint(DSL.constraint(UNHEALTHY_CONTAINERS_TABLE_NAME + "ck1")
.check(field(name("container_state"))
.check(field(name(CONTAINER_STATE))
.in(UnHealthyContainerStates.values())))
.execute();
dslContext.createIndex("idx_container_state")
.on(DSL.table(UNHEALTHY_CONTAINERS_TABLE_NAME), DSL.field(name(CONTAINER_STATE)))
.execute();
}

public DSLContext getDSLContext() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ private ReconConstants() {
public static final String RECON_QUERY_BATCH_PARAM = "batchNum";
public static final String RECON_QUERY_PREVKEY = "prevKey";
public static final String RECON_QUERY_START_PREFIX = "startPrefix";
public static final String RECON_QUERY_PREV_START_KEY = "prevStartKey";
public static final String RECON_QUERY_PREV_LAST_KEY = "prevLastKey";
public static final String RECON_OPEN_KEY_INCLUDE_NON_FSO = "includeNonFso";
public static final String RECON_OPEN_KEY_INCLUDE_FSO = "includeFso";
public static final String RECON_OM_INSIGHTS_DEFAULT_START_PREFIX = "/";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import static org.apache.hadoop.ozone.om.OmMetadataManagerImpl.VOLUME_TABLE;

import java.util.List;
import java.util.Optional;
import javax.inject.Inject;
import javax.ws.rs.GET;
import javax.ws.rs.Path;
Expand Down Expand Up @@ -104,7 +105,7 @@ public Response getClusterState() {
List<UnhealthyContainers> missingContainers = containerHealthSchemaManager
.getUnhealthyContainers(
ContainerSchemaDefinition.UnHealthyContainerStates.MISSING,
0, MISSING_CONTAINER_COUNT_LIMIT);
0L, Optional.empty(), MISSING_CONTAINER_COUNT_LIMIT);

containerStateCounts.setMissingContainerCount(
missingContainers.size() == MISSING_CONTAINER_COUNT_LIMIT ?
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,14 @@

package org.apache.hadoop.ozone.recon.api;

import static org.apache.hadoop.ozone.recon.ReconConstants.DEFAULT_BATCH_NUMBER;
import static org.apache.hadoop.ozone.recon.ReconConstants.DEFAULT_FETCH_COUNT;
import static org.apache.hadoop.ozone.recon.ReconConstants.DEFAULT_FILTER_FOR_MISSING_CONTAINERS;
import static org.apache.hadoop.ozone.recon.ReconConstants.PREV_CONTAINER_ID_DEFAULT_VALUE;
import static org.apache.hadoop.ozone.recon.ReconConstants.RECON_QUERY_BATCH_PARAM;
import static org.apache.hadoop.ozone.recon.ReconConstants.RECON_QUERY_FILTER;
import static org.apache.hadoop.ozone.recon.ReconConstants.RECON_QUERY_LIMIT;
import static org.apache.hadoop.ozone.recon.ReconConstants.RECON_QUERY_PREVKEY;
import static org.apache.hadoop.ozone.recon.ReconConstants.RECON_QUERY_PREV_LAST_KEY;
import static org.apache.hadoop.ozone.recon.ReconConstants.RECON_QUERY_PREV_START_KEY;

import java.io.IOException;
import java.time.Instant;
Expand All @@ -35,6 +35,7 @@
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import java.util.stream.Collectors;
import javax.inject.Inject;
Expand Down Expand Up @@ -340,7 +341,7 @@ public Response getMissingContainers(
) {
List<MissingContainerMetadata> missingContainers = new ArrayList<>();
containerHealthSchemaManager.getUnhealthyContainers(
UnHealthyContainerStates.MISSING, 0, limit)
UnHealthyContainerStates.MISSING, 0L, Optional.empty(), limit)
.forEach(container -> {
long containerID = container.getContainerId();
try {
Expand Down Expand Up @@ -374,9 +375,9 @@ public Response getMissingContainers(
* eg UNDER_REPLICATED, MIS_REPLICATED, OVER_REPLICATED or
* MISSING. Passing null returns all containers.
* @param limit The limit of unhealthy containers to return.
* @param batchNum The batch number (like "page number") of results to return.
* Passing 1, will return records 1 to limit. 2 will return
* limit + 1 to 2 * limit, etc.
* @param prevStartKey startKey of previous batch. If this value is given last N records before this container
* would be returned.
* @param prevLastKey lastKey of previous batch.
* @return {@link Response}
*/
@GET
Expand All @@ -385,10 +386,11 @@ public Response getUnhealthyContainers(
@PathParam("state") String state,
@DefaultValue(DEFAULT_FETCH_COUNT) @QueryParam(RECON_QUERY_LIMIT)
int limit,
@DefaultValue(DEFAULT_BATCH_NUMBER)
@QueryParam(RECON_QUERY_BATCH_PARAM) int batchNum) {
int offset = Math.max(((batchNum - 1) * limit), 0);

@DefaultValue(PREV_CONTAINER_ID_DEFAULT_VALUE)
@QueryParam(RECON_QUERY_PREV_START_KEY) long prevStartKey,
@DefaultValue(PREV_CONTAINER_ID_DEFAULT_VALUE)
@QueryParam(RECON_QUERY_PREV_LAST_KEY) long prevLastKey) {
Optional<Long> maxContainerId = prevStartKey > 0 ? Optional.of(prevStartKey) : Optional.empty();
List<UnhealthyContainerMetadata> unhealthyMeta = new ArrayList<>();
List<UnhealthyContainersSummary> summary;
try {
Expand All @@ -402,7 +404,7 @@ public Response getUnhealthyContainers(

summary = containerHealthSchemaManager.getUnhealthyContainersSummary();
List<UnhealthyContainers> containers = containerHealthSchemaManager
.getUnhealthyContainers(internalState, offset, limit);
.getUnhealthyContainers(internalState, prevLastKey, maxContainerId, limit);

// Filtering out EMPTY_MISSING and NEGATIVE_SIZE containers from the response.
// These container states are not being inserted into the database as they represent
Expand Down Expand Up @@ -435,6 +437,12 @@ public Response getUnhealthyContainers(

UnhealthyContainersResponse response =
new UnhealthyContainersResponse(unhealthyMeta);
if (!unhealthyMeta.isEmpty()) {
response.setFirstKey(unhealthyMeta.stream().map(UnhealthyContainerMetadata::getContainerID)
.min(Long::compareTo).orElse(0L));
response.setLastKey(unhealthyMeta.stream().map(UnhealthyContainerMetadata::getContainerID)
.max(Long::compareTo).orElse(0L));
}
for (UnhealthyContainersSummary s : summary) {
response.setSummaryCount(s.getContainerState(), s.getCount());
}
Expand All @@ -446,19 +454,21 @@ public Response getUnhealthyContainers(
* {@link org.apache.hadoop.ozone.recon.api.types.UnhealthyContainerMetadata}
* for all unhealthy containers.
* @param limit The limit of unhealthy containers to return.
* @param batchNum The batch number (like "page number") of results to return.
* Passing 1, will return records 1 to limit. 2 will return
* limit + 1 to 2 * limit, etc.
* @param prevStartKey startKey of previous batch. If this value is given last N records before this container
* would be returned.
* @param prevLastKey lastKey of previous batch.
* @return {@link Response}
*/
@GET
@Path("/unhealthy")
public Response getUnhealthyContainers(
@DefaultValue(DEFAULT_FETCH_COUNT) @QueryParam(RECON_QUERY_LIMIT)
int limit,
@DefaultValue(DEFAULT_BATCH_NUMBER)
@QueryParam(RECON_QUERY_BATCH_PARAM) int batchNum) {
return getUnhealthyContainers(null, limit, batchNum);
@DefaultValue(PREV_CONTAINER_ID_DEFAULT_VALUE)
@QueryParam(RECON_QUERY_PREV_START_KEY) long prevStartKey,
@DefaultValue(PREV_CONTAINER_ID_DEFAULT_VALUE)
@QueryParam(RECON_QUERY_PREV_LAST_KEY) long prevLastKey) {
return getUnhealthyContainers(null, limit, prevStartKey, prevLastKey);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,21 @@ public class UnhealthyContainersResponse {
@JsonProperty("misReplicatedCount")
private long misReplicatedCount = 0;

/**
* .
*/
@JsonProperty("firstKey")
private long firstKey = 0;


/**
* Total count of mis-replicated containers.
*/
@JsonProperty("lastKey")
private long lastKey = 0;



/**
* A collection of unhealthy containers.
*/
Expand Down Expand Up @@ -98,4 +113,12 @@ public long getMisReplicatedCount() {
public Collection<UnhealthyContainerMetadata> getContainers() {
return containers;
}

public void setFirstKey(long firstKey) {
this.firstKey = firstKey;
}

public void setLastKey(long lastKey) {
this.lastKey = lastKey;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,18 +23,24 @@
import static org.apache.ozone.recon.schema.generated.tables.UnhealthyContainersTable.UNHEALTHY_CONTAINERS;
import static org.jooq.impl.DSL.count;

import com.google.common.annotations.VisibleForTesting;
import com.google.inject.Inject;
import com.google.inject.Singleton;
import java.sql.Connection;
import java.util.Comparator;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
import org.apache.hadoop.ozone.recon.api.types.UnhealthyContainersSummary;
import org.apache.ozone.recon.schema.ContainerSchemaDefinition;
import org.apache.ozone.recon.schema.ContainerSchemaDefinition.UnHealthyContainerStates;
import org.apache.ozone.recon.schema.generated.tables.daos.UnhealthyContainersDao;
import org.apache.ozone.recon.schema.generated.tables.pojos.UnhealthyContainers;
import org.apache.ozone.recon.schema.generated.tables.records.UnhealthyContainersRecord;
import org.jooq.Condition;
import org.jooq.Cursor;
import org.jooq.DSLContext;
import org.jooq.OrderField;
import org.jooq.Record;
import org.jooq.SelectQuery;
import org.jooq.exception.DataAccessException;
Expand Down Expand Up @@ -67,32 +73,47 @@ public ContainerHealthSchemaManager(
* matching the given state will be returned.
* @param state Return only containers in this state, or all containers if
* null
* @param offset The starting record to return in the result set. The first
* record is at zero.
* @param minContainerId minimum containerId for filter
* @param maxContainerId maximum containerId for filter
* @param limit The total records to return
* @return List of unhealthy containers.
*/
public List<UnhealthyContainers> getUnhealthyContainers(
UnHealthyContainerStates state, int offset, int limit) {
UnHealthyContainerStates state, Long minContainerId, Optional<Long> maxContainerId, int limit) {
DSLContext dslContext = containerSchemaDefinition.getDSLContext();
SelectQuery<Record> query = dslContext.selectQuery();
query.addFrom(UNHEALTHY_CONTAINERS);
Condition containerCondition;
OrderField[] orderField;
if (maxContainerId.isPresent() && maxContainerId.get() > 0) {
containerCondition = UNHEALTHY_CONTAINERS.CONTAINER_ID.lessThan(maxContainerId.get());
orderField = new OrderField[]{UNHEALTHY_CONTAINERS.CONTAINER_ID.desc(),
UNHEALTHY_CONTAINERS.CONTAINER_STATE.asc()};
} else {
containerCondition = UNHEALTHY_CONTAINERS.CONTAINER_ID.greaterThan(minContainerId);
orderField = new OrderField[]{UNHEALTHY_CONTAINERS.CONTAINER_ID.asc(),
UNHEALTHY_CONTAINERS.CONTAINER_STATE.asc()};
}
if (state != null) {
if (state.equals(ALL_REPLICAS_BAD)) {
query.addConditions(UNHEALTHY_CONTAINERS.CONTAINER_STATE
.eq(UNDER_REPLICATED.toString()));
query.addConditions(containerCondition.and(UNHEALTHY_CONTAINERS.CONTAINER_STATE
.eq(UNDER_REPLICATED.toString())));
query.addConditions(UNHEALTHY_CONTAINERS.ACTUAL_REPLICA_COUNT.eq(0));
} else {
query.addConditions(
UNHEALTHY_CONTAINERS.CONTAINER_STATE.eq(state.toString()));
query.addConditions(containerCondition.and(UNHEALTHY_CONTAINERS.CONTAINER_STATE.eq(state.toString())));
}
} else {
// CRITICAL FIX: Apply pagination condition even when state is null
// This ensures proper pagination for the "get all unhealthy containers" use case
query.addConditions(containerCondition);
}
query.addOrderBy(UNHEALTHY_CONTAINERS.CONTAINER_ID.asc(),
UNHEALTHY_CONTAINERS.CONTAINER_STATE.asc());
query.addOffset(offset);

query.addOrderBy(orderField);
query.addLimit(limit);

return query.fetchInto(UnhealthyContainers.class);
return query.fetchInto(UnhealthyContainers.class).stream()
.sorted(Comparator.comparingLong(UnhealthyContainers::getContainerId))
.collect(Collectors.toList());
}

/**
Expand Down Expand Up @@ -151,4 +172,19 @@ public void insertUnhealthyContainerRecords(List<UnhealthyContainers> recs) {
}
}

/**
* Clear all unhealthy container records. This is primarily used for testing
* to ensure clean state between tests.
*/
@VisibleForTesting
public void clearAllUnhealthyContainerRecords() {
DSLContext dslContext = containerSchemaDefinition.getDSLContext();
try {
dslContext.deleteFrom(UNHEALTHY_CONTAINERS).execute();
LOG.info("Cleared all unhealthy container records");
} catch (Exception e) {
LOG.info("Failed to clear unhealthy container records", e);
}
}

}
Loading