Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,8 @@ public enum UnHealthyContainerStates {
UNDER_REPLICATED,
OVER_REPLICATED,
MIS_REPLICATED,
ALL_REPLICAS_UNHEALTHY
ALL_REPLICAS_UNHEALTHY,
NEGATIVE_SIZE // Added new state to track containers with negative sizes
}

private static final String CONTAINER_ID = "container_id";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,12 @@ public class UnhealthyContainersResponse {
@JsonProperty("misReplicatedCount")
private long misReplicatedCount = 0;

/**
* Total count of containers with negative size.
*/
@JsonProperty("negativeSizeCount")
private long negativeSizeCount = 0;

/**
* A collection of unhealthy containers.
*/
Expand Down Expand Up @@ -77,6 +83,9 @@ public void setSummaryCount(String state, long count) {
} else if (state.equals(
UnHealthyContainerStates.MIS_REPLICATED.toString())) {
this.misReplicatedCount = count;
} else if (state.equals(
UnHealthyContainerStates.NEGATIVE_SIZE.toString())) {
this.negativeSizeCount = count;
}
}

Expand All @@ -96,6 +105,10 @@ public long getMisReplicatedCount() {
return misReplicatedCount;
}

public long getNegativeSizeCount() {
return negativeSizeCount;
}

public Collection<UnhealthyContainerMetadata> getContainers() {
return containers;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,8 @@ private void initializeUnhealthyContainerStateStatsMap(
UnHealthyContainerStates.OVER_REPLICATED, new HashMap<>());
unhealthyContainerStateStatsMap.put(
UnHealthyContainerStates.MIS_REPLICATED, new HashMap<>());
unhealthyContainerStateStatsMap.put(
UnHealthyContainerStates.NEGATIVE_SIZE, new HashMap<>());
}

private ContainerHealthStatus setCurrentContainer(long recordId)
Expand Down Expand Up @@ -313,13 +315,21 @@ private long processExistingDBRecords(long currentTime,
private void processContainer(ContainerInfo container, long currentTime,
Map<UnHealthyContainerStates,
Map<String, Long>>
unhealthyContainerStateStatsMap) {
unhealthyContainerStateStatsMap) {
try {
Set<ContainerReplica> containerReplicas =
containerManager.getContainerReplicas(container.containerID());
ContainerHealthStatus h = new ContainerHealthStatus(container,
containerReplicas, placementPolicy,
reconContainerMetadataManager, conf);

// Handle negative sized containers separately
if (h.getContainer().getUsedBytes() < 0) {
handleNegativeSizedContainers(h, currentTime,
unhealthyContainerStateStatsMap);
return;
}

if (h.isHealthilyReplicated() || h.isDeleted()) {
return;
}
Expand Down Expand Up @@ -365,6 +375,32 @@ private boolean containerDeletedInSCM(ContainerInfo containerInfo) {
return false;
}

/**
* This method is used to handle containers with negative sizes. It logs an
* error message and inserts a record into the UNHEALTHY_CONTAINERS table.
* @param containerHealthStatus
* @param currentTime
* @param unhealthyContainerStateStatsMap
*/
private void handleNegativeSizedContainers(
ContainerHealthStatus containerHealthStatus, long currentTime,
Map<UnHealthyContainerStates, Map<String, Long>>
unhealthyContainerStateStatsMap) {
ContainerInfo container = containerHealthStatus.getContainer();
LOG.error(
"Container {} has negative size. Please visit Recon's unhealthy " +
"container endpoint for more details.",
container.getContainerID());
UnhealthyContainers record =
ContainerHealthRecords.recordForState(containerHealthStatus,
UnHealthyContainerStates.NEGATIVE_SIZE, currentTime);
List<UnhealthyContainers> records = Collections.singletonList(record);
populateContainerStats(containerHealthStatus,
UnHealthyContainerStates.NEGATIVE_SIZE,
unhealthyContainerStateStatsMap);
containerHealthSchemaManager.insertUnhealthyContainerRecords(records);
}

/**
* Helper methods to generate and update the required database records for
* unhealthy containers.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.hadoop.ozone.recon.ReconUtils;
import org.apache.hadoop.ozone.recon.scm.ReconScmTask;
import org.apache.hadoop.ozone.recon.spi.StorageContainerServiceProvider;
import org.hadoop.ozone.recon.schema.ContainerSchemaDefinition;
import org.hadoop.ozone.recon.schema.UtilizationSchemaDefinition;
import org.hadoop.ozone.recon.schema.tables.daos.ContainerCountBySizeDao;
import org.hadoop.ozone.recon.schema.tables.daos.ReconTaskStatusDao;
Expand All @@ -34,13 +35,14 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.ArrayList;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;

import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState.DELETED;
import static org.hadoop.ozone.recon.schema.tables.ContainerCountBySizeTable.CONTAINER_COUNT_BY_SIZE;


Expand All @@ -60,6 +62,8 @@ public class ContainerSizeCountTask extends ReconScmTask {
private ContainerCountBySizeDao containerCountBySizeDao;
private DSLContext dslContext;
private HashMap<ContainerID, Long> processedContainers = new HashMap<>();
private Map<ContainerSchemaDefinition.UnHealthyContainerStates, Map<String, Long>>
unhealthyContainerStateStatsMap;
private ReadWriteLock lock = new ReentrantReadWriteLock(true);

public ContainerSizeCountTask(
Expand Down Expand Up @@ -121,7 +125,17 @@ protected synchronized void run() {
private void process(ContainerInfo container,
Map<ContainerSizeCountKey, Long> map) {
final ContainerID id = container.containerID();
final long currentSize = container.getUsedBytes();
final long usedBytes = container.getUsedBytes();
final long currentSize;

if (usedBytes < 0) {
LOG.warn("Negative usedBytes ({}) for container {}, treating it as 0",
usedBytes, id);
currentSize = 0;
} else {
currentSize = usedBytes;
}

final Long previousSize = processedContainers.put(id, currentSize);
if (previousSize != null) {
decrementContainerSizeCount(previousSize, map);
Expand All @@ -132,24 +146,27 @@ private void process(ContainerInfo container,
/**
* The process() function is responsible for updating the counts of
* containers being tracked in a containerSizeCountMap based on the
* ContainerInfo objects in the list containers.It then iterates through
* ContainerInfo objects in the list containers. It then iterates through
* the list of containers and does the following for each container:
*
* 1) If the container is not present in processedContainers,
* it is a new container, so it is added to the processedContainers map
* and the count for its size in the containerSizeCountMap is incremented
* by 1 using the handlePutKeyEvent() function.
* 2) If the container is present in processedContainers but its size has
* been updated to the new size then the count for the old size in the
* containerSizeCountMap is decremented by 1 using the
* handleDeleteKeyEvent() function. The count for the new size is then
* incremented by 1 using the handlePutKeyEvent() function.
* 3) If the container is not present in containers list, it means the
* container has been deleted.
* The remaining containers inside the deletedContainers map are the ones
* that are not in the cluster and need to be deleted. Finally, the counts in
* the containerSizeCountMap are written to the database using the
* writeCountsToDB() function.
* 1) If the container's state is not "deleted," it will be processed:
* - If the container is not present in processedContainers, it is a new
* container. Therefore, it is added to the processedContainers map, and
* the count for its size in the containerSizeCountMap is incremented by
* 1 using the handlePutKeyEvent() function.
* - If the container is present in processedContainers but its size has
* been updated to a new size, the count for the old size in the
* containerSizeCountMap is decremented by 1 using the
* handleDeleteKeyEvent() function. Subsequently, the count for the new
* size is incremented by 1 using the handlePutKeyEvent() function.
*
* 2) If the container's state is "deleted," it is skipped, as deleted
* containers are not processed.
*
* After processing, the remaining containers inside the deletedContainers map
* are those that are not in the cluster and need to be deleted from the total
* size counts. Finally, the counts in the containerSizeCountMap are written
* to the database using the writeCountsToDB() function.
*/
public void process(List<ContainerInfo> containers) {
lock.writeLock().lock();
Expand All @@ -161,7 +178,9 @@ public void process(List<ContainerInfo> containers) {

// Loop to handle container create and size-update operations
for (ContainerInfo container : containers) {
// The containers present in the cache hence it is not yet deleted
if (container.getState().equals(DELETED)) {
continue; // Skip deleted containers
}
deletedContainers.remove(container.containerID());
// For New Container being created
try {
Expand Down Expand Up @@ -246,10 +265,10 @@ public String getTaskName() {

/**
*
* The handleContainerDeleteOperations() function loops through the entries
* in the deletedContainers map and calls the handleDeleteKeyEvent() function
* for each one. This will decrement the size counts of those containers by
* one which are no longer present in the cluster
* Handles the deletion of containers by updating the tracking of processed containers
* and adjusting the count of containers based on their sizes. When a container is deleted,
* it is removed from the tracking of processed containers, and the count of containers
* corresponding to its size is decremented in the container size count map.
*
* Used by process()
*
Expand All @@ -261,6 +280,9 @@ private void handleContainerDeleteOperations(
Map<ContainerSizeCountKey, Long> containerSizeCountMap) {
for (Map.Entry<ContainerID, Long> containerId :
deletedContainers.entrySet()) {
// processedContainers will only keep a track of all containers that have
// been processed except DELETED containers.
processedContainers.remove(containerId.getKey());
long containerSize = deletedContainers.get(containerId.getKey());
decrementContainerSizeCount(containerSize, containerSizeCountMap);
}
Expand Down Expand Up @@ -316,19 +338,26 @@ private static void updateContainerSizeCount(long containerSize, int delta,
}

/**
*
* The purpose of this function is to categorize containers into different
* size ranges, or "bins," based on their size.
* The ContainerSizeCountKey object is used to store the upper bound value
* for each size range, and is later used to lookup the count of containers
* in that size range within a Map.
*
* Used by decrementContainerSizeCount() and incrementContainerSizeCount()
* If the container size is 0, the method sets the size of
* ContainerSizeCountKey as zero without calculating the upper bound. Used by
* decrementContainerSizeCount() and incrementContainerSizeCount()
*
* @param containerSize to calculate the upperSizeBound
*/
private static ContainerSizeCountKey getContainerSizeCountKey(
long containerSize) {
// If containerSize is 0, return a ContainerSizeCountKey with size 0
if (containerSize == 0) {
return new ContainerSizeCountKey(0L);
}

// Otherwise, calculate the upperSizeBound
return new ContainerSizeCountKey(
ReconUtils.getContainerSizeUpperBound(containerSize));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -866,10 +866,12 @@ public void testGetContainerCounts() throws Exception {
ContainerInfo omContainerInfo1 = mock(ContainerInfo.class);
given(omContainerInfo1.containerID()).willReturn(new ContainerID(1));
given(omContainerInfo1.getUsedBytes()).willReturn(1500000000L); // 1.5GB
given(omContainerInfo1.getState()).willReturn(LifeCycleState.OPEN);

ContainerInfo omContainerInfo2 = mock(ContainerInfo.class);
given(omContainerInfo2.containerID()).willReturn(new ContainerID(2));
given(omContainerInfo2.getUsedBytes()).willReturn(2500000000L); // 2.5GB
given(omContainerInfo2.getState()).willReturn(LifeCycleState.OPEN);

// Create a list of container info objects
List<ContainerInfo> containers = new ArrayList<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -343,6 +343,65 @@ public void testDeletedContainer() throws Exception {
.isGreaterThan(currentTime);
}

@Test
public void testNegativeSizeContainers() throws Exception {
// Setup mock objects and test environment
UnhealthyContainersDao unhealthyContainersDao =
getDao(UnhealthyContainersDao.class);
ContainerHealthSchemaManager containerHealthSchemaManager =
new ContainerHealthSchemaManager(
getSchemaDefinition(ContainerSchemaDefinition.class),
unhealthyContainersDao);
ReconStorageContainerManagerFacade scmMock =
mock(ReconStorageContainerManagerFacade.class);
ContainerManager containerManagerMock = mock(ContainerManager.class);
StorageContainerServiceProvider scmClientMock =
mock(StorageContainerServiceProvider.class);
ReconContainerMetadataManager reconContainerMetadataManager =
mock(ReconContainerMetadataManager.class);
MockPlacementPolicy placementMock = new MockPlacementPolicy();

// Mock container info setup
List<ContainerInfo> mockContainers = getMockContainers(3);
when(scmMock.getContainerManager()).thenReturn(containerManagerMock);
when(scmMock.getScmServiceProvider()).thenReturn(scmClientMock);
when(containerManagerMock.getContainers(any(ContainerID.class),
anyInt())).thenReturn(mockContainers);
for (ContainerInfo c : mockContainers) {
when(containerManagerMock.getContainer(
c.containerID())).thenReturn(c);
when(scmClientMock.getContainerWithPipeline(
c.getContainerID())).thenReturn(new ContainerWithPipeline(c, null));
when(containerManagerMock.getContainer(c.containerID())
.getUsedBytes()).thenReturn(Long.valueOf(-10));
}

// Verify the table is initially empty
assertThat(unhealthyContainersDao.findAll()).isEmpty();

// Setup and start the container health task
ReconTaskStatusDao reconTaskStatusDao = getDao(ReconTaskStatusDao.class);
ReconTaskConfig reconTaskConfig = new ReconTaskConfig();
reconTaskConfig.setMissingContainerTaskInterval(Duration.ofSeconds(2));
ContainerHealthTask containerHealthTask = new ContainerHealthTask(
scmMock.getContainerManager(), scmMock.getScmServiceProvider(),
reconTaskStatusDao,
containerHealthSchemaManager, placementMock, reconTaskConfig,
reconContainerMetadataManager,
new OzoneConfiguration());
containerHealthTask.start();

// Wait for the task to identify unhealthy containers
LambdaTestUtils.await(6000, 1000,
() -> unhealthyContainersDao.count() == 3);

// Assert that all unhealthy containers have been identified as NEGATIVE_SIZE states
List<UnhealthyContainers> negativeSizeContainers =
unhealthyContainersDao.fetchByContainerState("NEGATIVE_SIZE");
assertThat(negativeSizeContainers).hasSize(3);
}


private Set<ContainerReplica> getMockReplicas(
long containerId, State...states) {
Set<ContainerReplica> replicas = new HashSet<>();
Expand Down
Loading