diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/recon/TestReconTasks.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/recon/TestReconTasks.java index 965dd2f5255b..44385698c5c3 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/recon/TestReconTasks.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/recon/TestReconTasks.java @@ -72,8 +72,8 @@ public void init() throws Exception { taskConfig.setMissingContainerTaskInterval(Duration.ofSeconds(15)); conf.setFromObject(taskConfig); - conf.set("ozone.scm.stale.node.interval", "10s"); - conf.set("ozone.scm.dead.node.interval", "20s"); + conf.set("ozone.scm.stale.node.interval", "6s"); + conf.set("ozone.scm.dead.node.interval", "10s"); cluster = MiniOzoneCluster.newBuilder(conf).setNumDatanodes(1) .includeRecon(true).build(); cluster.waitForClusterToBeReady(); @@ -102,9 +102,6 @@ public void testSyncSCMContainerInfo() throws Exception { final ContainerInfo container2 = scmContainerManager.allocateContainer( RatisReplicationConfig.getInstance( HddsProtos.ReplicationFactor.ONE), "admin"); - reconContainerManager.allocateContainer( - RatisReplicationConfig.getInstance( - HddsProtos.ReplicationFactor.ONE), "admin"); scmContainerManager.updateContainerState(container1.containerID(), HddsProtos.LifeCycleEvent.FINALIZE); scmContainerManager.updateContainerState(container2.containerID(), diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/fsck/ContainerHealthTask.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/fsck/ContainerHealthTask.java index bb93923cfd1d..4296dca366a2 100644 --- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/fsck/ContainerHealthTask.java +++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/fsck/ContainerHealthTask.java @@ -53,6 +53,7 @@ import org.slf4j.LoggerFactory; import static org.apache.hadoop.ozone.recon.ReconConstants.CONTAINER_COUNT; +import static org.apache.hadoop.ozone.recon.ReconConstants.DEFAULT_FETCH_COUNT; import static org.apache.hadoop.ozone.recon.ReconConstants.TOTAL_KEYS; import static org.apache.hadoop.ozone.recon.ReconConstants.TOTAL_USED_BYTES; @@ -65,6 +66,7 @@ public class ContainerHealthTask extends ReconScmTask { private static final Logger LOG = LoggerFactory.getLogger(ContainerHealthTask.class); + public static final int FETCH_COUNT = Integer.parseInt(DEFAULT_FETCH_COUNT); private ReadWriteLock lock = new ReentrantReadWriteLock(true); @@ -131,8 +133,24 @@ public void triggerContainerHealthCheck() { LOG.info("Container Health task thread took {} milliseconds to" + " process {} existing database records.", Time.monotonicNow() - start, existingCount); + + checkAndProcessContainers(unhealthyContainerStateStatsMap, currentTime); + processedContainers.clear(); + } finally { + lock.writeLock().unlock(); + } + } + + private void checkAndProcessContainers( + Map> + unhealthyContainerStateStatsMap, long currentTime) { + ContainerID startID = ContainerID.valueOf(1); + List containers = containerManager.getContainers(startID, + FETCH_COUNT); + long start; + long iterationCount = 0; + while (!containers.isEmpty()) { start = Time.monotonicNow(); - final List containers = containerManager.getContainers(); containers.stream() .filter(c -> !processedContainers.contains(c)) .forEach(c -> processContainer(c, currentTime, @@ -142,10 +160,19 @@ public void triggerContainerHealthCheck() { " processing {} containers.", Time.monotonicNow() - start, containers.size()); logUnhealthyContainerStats(unhealthyContainerStateStatsMap); - processedContainers.clear(); - } finally { - lock.writeLock().unlock(); + if (containers.size() >= FETCH_COUNT) { + startID = ContainerID.valueOf( + containers.get(containers.size() - 1).getContainerID() + 1); + containers = containerManager.getContainers(startID, FETCH_COUNT); + } else { + containers.clear(); + } + iterationCount++; } + LOG.info( + "Container Health task thread took {} iterations to fetch all " + + "containers using batched approach with batch size of {}", + iterationCount, FETCH_COUNT); } private void logUnhealthyContainerStats( diff --git a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/fsck/TestContainerHealthTask.java b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/fsck/TestContainerHealthTask.java index ea8f9f2a4b68..358799cc0330 100644 --- a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/fsck/TestContainerHealthTask.java +++ b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/fsck/TestContainerHealthTask.java @@ -18,10 +18,12 @@ package org.apache.hadoop.ozone.recon.fsck; -import static org.hadoop.ozone.recon.schema.ContainerSchemaDefinition.UnHealthyContainerStates.ALL_REPLICAS_UNHEALTHY; import static org.assertj.core.api.Assertions.assertThat; +import static org.hadoop.ozone.recon.schema.ContainerSchemaDefinition.UnHealthyContainerStates.ALL_REPLICAS_UNHEALTHY; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -96,7 +98,8 @@ public void testRun() throws Exception { List mockContainers = getMockContainers(7); when(scmMock.getScmServiceProvider()).thenReturn(scmClientMock); when(scmMock.getContainerManager()).thenReturn(containerManagerMock); - when(containerManagerMock.getContainers()).thenReturn(mockContainers); + when(containerManagerMock.getContainers(any(ContainerID.class), + anyInt())).thenReturn(mockContainers); for (ContainerInfo c : mockContainers) { when(containerManagerMock.getContainer(c.containerID())).thenReturn(c); when(scmClientMock.getContainerWithPipeline(c.getContainerID())) @@ -151,7 +154,7 @@ public void testRun() throws Exception { reconTaskStatusDao, containerHealthSchemaManager, placementMock, reconTaskConfig, reconContainerMetadataManager); containerHealthTask.start(); - LambdaTestUtils.await(6000, 1000, () -> + LambdaTestUtils.await(60000, 1000, () -> (unHealthyContainersTableHandle.count() == 6)); UnhealthyContainers rec = unHealthyContainersTableHandle.fetchByContainerId(1L).get(0); @@ -192,7 +195,8 @@ public void testRun() throws Exception { ReconTaskStatus taskStatus = reconTaskStatusDao.findById(containerHealthTask.getTaskName()); - assertThat(taskStatus.getLastUpdatedTimestamp()).isGreaterThan(currentTime); + assertThat(taskStatus.getLastUpdatedTimestamp()) + .isGreaterThan(currentTime); // Now run the job again, to check that relevant records are updated or // removed as appropriate. Need to adjust the return value for all the mocks @@ -267,7 +271,8 @@ public void testDeletedContainer() throws Exception { List mockContainers = getMockContainers(3); when(scmMock.getScmServiceProvider()).thenReturn(scmClientMock); when(scmMock.getContainerManager()).thenReturn(containerManagerMock); - when(containerManagerMock.getContainers()).thenReturn(mockContainers); + when(containerManagerMock.getContainers(any(ContainerID.class), + anyInt())).thenReturn(mockContainers); for (ContainerInfo c : mockContainers) { when(containerManagerMock.getContainer(c.containerID())).thenReturn(c); when(scmClientMock.getContainerWithPipeline(c.getContainerID())) @@ -327,7 +332,8 @@ public void testDeletedContainer() throws Exception { ReconTaskStatus taskStatus = reconTaskStatusDao.findById(containerHealthTask.getTaskName()); - assertThat(taskStatus.getLastUpdatedTimestamp()).isGreaterThan(currentTime); + assertThat(taskStatus.getLastUpdatedTimestamp()) + .isGreaterThan(currentTime); } private Set getMockReplicas(