From 6f411d10be2102d0f78780750a1adfa2c3601c34 Mon Sep 17 00:00:00 2001 From: deveshsingh Date: Wed, 20 Dec 2023 17:59:39 +0530 Subject: [PATCH 1/7] HDDS-9819.Recon - Potential memory overflow in Container Health Task. --- .../ozone/recon/fsck/ContainerHealthTask.java | 33 ++++++++++++++----- .../recon/fsck/TestContainerHealthTask.java | 10 ++++-- 2 files changed, 32 insertions(+), 11 deletions(-) diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/fsck/ContainerHealthTask.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/fsck/ContainerHealthTask.java index bb93923cfd1d..f0f80033d964 100644 --- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/fsck/ContainerHealthTask.java +++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/fsck/ContainerHealthTask.java @@ -53,6 +53,7 @@ import org.slf4j.LoggerFactory; import static org.apache.hadoop.ozone.recon.ReconConstants.CONTAINER_COUNT; +import static org.apache.hadoop.ozone.recon.ReconConstants.DEFAULT_FETCH_COUNT; import static org.apache.hadoop.ozone.recon.ReconConstants.TOTAL_KEYS; import static org.apache.hadoop.ozone.recon.ReconConstants.TOTAL_USED_BYTES; @@ -132,7 +133,21 @@ public void triggerContainerHealthCheck() { " process {} existing database records.", Time.monotonicNow() - start, existingCount); start = Time.monotonicNow(); - final List containers = containerManager.getContainers(); + checkAndProcessContainers(unhealthyContainerStateStatsMap, start, + currentTime); + processedContainers.clear(); + } finally { + lock.writeLock().unlock(); + } + } + + private void checkAndProcessContainers( + Map> + unhealthyContainerStateStatsMap, long start, long currentTime) { + ContainerID startID = ContainerID.valueOf(0); + List containers = containerManager.getContainers(startID, + Integer.parseInt(DEFAULT_FETCH_COUNT)); + while (!containers.isEmpty()) { containers.stream() .filter(c -> !processedContainers.contains(c)) .forEach(c -> processContainer(c, currentTime, @@ -142,9 +157,11 @@ public void triggerContainerHealthCheck() { " processing {} containers.", Time.monotonicNow() - start, containers.size()); logUnhealthyContainerStats(unhealthyContainerStateStatsMap); - processedContainers.clear(); - } finally { - lock.writeLock().unlock(); + startID = ContainerID.valueOf( + containers.get(containers.size() - 1).getContainerID()); + containers.clear(); + containers = containerManager.getContainers(startID, + Integer.parseInt(DEFAULT_FETCH_COUNT)); } } @@ -280,7 +297,7 @@ private long processExistingDBRecords(long currentTime, private void processContainer(ContainerInfo container, long currentTime, Map> - unhealthyContainerStateStatsMap) { + unhealthyContainerStateStatsMap) { try { Set containerReplicas = containerManager.getContainerReplicas(container.containerID()); @@ -437,7 +454,7 @@ public static List generateUnhealthyRecords( if (container.isUnderReplicated() && !recordForStateExists.contains( - UnHealthyContainerStates.UNDER_REPLICATED.toString())) { + UnHealthyContainerStates.UNDER_REPLICATED.toString())) { records.add(recordForState( container, UnHealthyContainerStates.UNDER_REPLICATED, time)); populateContainerStats(container, @@ -447,7 +464,7 @@ public static List generateUnhealthyRecords( if (container.isOverReplicated() && !recordForStateExists.contains( - UnHealthyContainerStates.OVER_REPLICATED.toString())) { + UnHealthyContainerStates.OVER_REPLICATED.toString())) { records.add(recordForState( container, UnHealthyContainerStates.OVER_REPLICATED, time)); populateContainerStats(container, @@ -457,7 +474,7 @@ public static List generateUnhealthyRecords( if (container.isMisReplicated() && !recordForStateExists.contains( - UnHealthyContainerStates.MIS_REPLICATED.toString())) { + UnHealthyContainerStates.MIS_REPLICATED.toString())) { records.add(recordForState( container, UnHealthyContainerStates.MIS_REPLICATED, time)); populateContainerStats(container, diff --git a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/fsck/TestContainerHealthTask.java b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/fsck/TestContainerHealthTask.java index 847b5d98c7e9..e529de1f2341 100644 --- a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/fsck/TestContainerHealthTask.java +++ b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/fsck/TestContainerHealthTask.java @@ -21,6 +21,8 @@ import static org.hadoop.ozone.recon.schema.ContainerSchemaDefinition.UnHealthyContainerStates.ALL_REPLICAS_UNHEALTHY; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -96,7 +98,8 @@ public void testRun() throws Exception { List mockContainers = getMockContainers(7); when(scmMock.getScmServiceProvider()).thenReturn(scmClientMock); when(scmMock.getContainerManager()).thenReturn(containerManagerMock); - when(containerManagerMock.getContainers()).thenReturn(mockContainers); + when(containerManagerMock.getContainers(any(ContainerID.class), + anyInt())).thenReturn(mockContainers); for (ContainerInfo c : mockContainers) { when(containerManagerMock.getContainer(c.containerID())).thenReturn(c); when(scmClientMock.getContainerWithPipeline(c.getContainerID())) @@ -151,7 +154,7 @@ public void testRun() throws Exception { reconTaskStatusDao, containerHealthSchemaManager, placementMock, reconTaskConfig, reconContainerMetadataManager); containerHealthTask.start(); - LambdaTestUtils.await(6000, 1000, () -> + LambdaTestUtils.await(60000, 1000, () -> (unHealthyContainersTableHandle.count() == 6)); UnhealthyContainers rec = unHealthyContainersTableHandle.fetchByContainerId(1L).get(0); @@ -268,7 +271,8 @@ public void testDeletedContainer() throws Exception { List mockContainers = getMockContainers(3); when(scmMock.getScmServiceProvider()).thenReturn(scmClientMock); when(scmMock.getContainerManager()).thenReturn(containerManagerMock); - when(containerManagerMock.getContainers()).thenReturn(mockContainers); + when(containerManagerMock.getContainers(any(ContainerID.class), + anyInt())).thenReturn(mockContainers); for (ContainerInfo c : mockContainers) { when(containerManagerMock.getContainer(c.containerID())).thenReturn(c); when(scmClientMock.getContainerWithPipeline(c.getContainerID())) From c77e3d57238a29ec115a03b0d325cbe0b7ce5dae Mon Sep 17 00:00:00 2001 From: deveshsingh Date: Thu, 21 Dec 2023 08:52:30 +0530 Subject: [PATCH 2/7] Fixed review comments. --- .../ozone/recon/fsck/ContainerHealthTask.java | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/fsck/ContainerHealthTask.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/fsck/ContainerHealthTask.java index f0f80033d964..5d451dd5ad6f 100644 --- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/fsck/ContainerHealthTask.java +++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/fsck/ContainerHealthTask.java @@ -132,9 +132,8 @@ public void triggerContainerHealthCheck() { LOG.info("Container Health task thread took {} milliseconds to" + " process {} existing database records.", Time.monotonicNow() - start, existingCount); - start = Time.monotonicNow(); - checkAndProcessContainers(unhealthyContainerStateStatsMap, start, - currentTime); + + checkAndProcessContainers(unhealthyContainerStateStatsMap, currentTime); processedContainers.clear(); } finally { lock.writeLock().unlock(); @@ -143,11 +142,13 @@ public void triggerContainerHealthCheck() { private void checkAndProcessContainers( Map> - unhealthyContainerStateStatsMap, long start, long currentTime) { + unhealthyContainerStateStatsMap, long currentTime) { ContainerID startID = ContainerID.valueOf(0); List containers = containerManager.getContainers(startID, Integer.parseInt(DEFAULT_FETCH_COUNT)); + long start; while (!containers.isEmpty()) { + start = Time.monotonicNow(); containers.stream() .filter(c -> !processedContainers.contains(c)) .forEach(c -> processContainer(c, currentTime, @@ -297,7 +298,7 @@ private long processExistingDBRecords(long currentTime, private void processContainer(ContainerInfo container, long currentTime, Map> - unhealthyContainerStateStatsMap) { + unhealthyContainerStateStatsMap) { try { Set containerReplicas = containerManager.getContainerReplicas(container.containerID()); @@ -454,7 +455,7 @@ public static List generateUnhealthyRecords( if (container.isUnderReplicated() && !recordForStateExists.contains( - UnHealthyContainerStates.UNDER_REPLICATED.toString())) { + UnHealthyContainerStates.UNDER_REPLICATED.toString())) { records.add(recordForState( container, UnHealthyContainerStates.UNDER_REPLICATED, time)); populateContainerStats(container, From 7a7e68f41a8cb85efc26ce36d7e430eaf7746641 Mon Sep 17 00:00:00 2001 From: deveshsingh Date: Tue, 2 Jan 2024 14:48:27 +0530 Subject: [PATCH 3/7] HDDS-9819.Fixed Test case failures in TestReconTasks. --- .../hadoop/ozone/recon/TestReconTasks.java | 7 ++----- .../ozone/recon/fsck/ContainerHealthTask.java | 17 ++++++++++------- 2 files changed, 12 insertions(+), 12 deletions(-) diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/recon/TestReconTasks.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/recon/TestReconTasks.java index 965dd2f5255b..44385698c5c3 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/recon/TestReconTasks.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/recon/TestReconTasks.java @@ -72,8 +72,8 @@ public void init() throws Exception { taskConfig.setMissingContainerTaskInterval(Duration.ofSeconds(15)); conf.setFromObject(taskConfig); - conf.set("ozone.scm.stale.node.interval", "10s"); - conf.set("ozone.scm.dead.node.interval", "20s"); + conf.set("ozone.scm.stale.node.interval", "6s"); + conf.set("ozone.scm.dead.node.interval", "10s"); cluster = MiniOzoneCluster.newBuilder(conf).setNumDatanodes(1) .includeRecon(true).build(); cluster.waitForClusterToBeReady(); @@ -102,9 +102,6 @@ public void testSyncSCMContainerInfo() throws Exception { final ContainerInfo container2 = scmContainerManager.allocateContainer( RatisReplicationConfig.getInstance( HddsProtos.ReplicationFactor.ONE), "admin"); - reconContainerManager.allocateContainer( - RatisReplicationConfig.getInstance( - HddsProtos.ReplicationFactor.ONE), "admin"); scmContainerManager.updateContainerState(container1.containerID(), HddsProtos.LifeCycleEvent.FINALIZE); scmContainerManager.updateContainerState(container2.containerID(), diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/fsck/ContainerHealthTask.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/fsck/ContainerHealthTask.java index 5d451dd5ad6f..8b892bcff567 100644 --- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/fsck/ContainerHealthTask.java +++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/fsck/ContainerHealthTask.java @@ -143,7 +143,7 @@ public void triggerContainerHealthCheck() { private void checkAndProcessContainers( Map> unhealthyContainerStateStatsMap, long currentTime) { - ContainerID startID = ContainerID.valueOf(0); + ContainerID startID = ContainerID.valueOf(1); List containers = containerManager.getContainers(startID, Integer.parseInt(DEFAULT_FETCH_COUNT)); long start; @@ -158,11 +158,14 @@ private void checkAndProcessContainers( " processing {} containers.", Time.monotonicNow() - start, containers.size()); logUnhealthyContainerStats(unhealthyContainerStateStatsMap); - startID = ContainerID.valueOf( - containers.get(containers.size() - 1).getContainerID()); + if (containers.size() > Integer.parseInt(DEFAULT_FETCH_COUNT)) { + startID = ContainerID.valueOf( + containers.get(containers.size() - 1).getContainerID()); + containers.clear(); + containers = containerManager.getContainers(startID, + Integer.parseInt(DEFAULT_FETCH_COUNT)); + } containers.clear(); - containers = containerManager.getContainers(startID, - Integer.parseInt(DEFAULT_FETCH_COUNT)); } } @@ -465,7 +468,7 @@ public static List generateUnhealthyRecords( if (container.isOverReplicated() && !recordForStateExists.contains( - UnHealthyContainerStates.OVER_REPLICATED.toString())) { + UnHealthyContainerStates.OVER_REPLICATED.toString())) { records.add(recordForState( container, UnHealthyContainerStates.OVER_REPLICATED, time)); populateContainerStats(container, @@ -475,7 +478,7 @@ public static List generateUnhealthyRecords( if (container.isMisReplicated() && !recordForStateExists.contains( - UnHealthyContainerStates.MIS_REPLICATED.toString())) { + UnHealthyContainerStates.MIS_REPLICATED.toString())) { records.add(recordForState( container, UnHealthyContainerStates.MIS_REPLICATED, time)); populateContainerStats(container, From 88b671ff8c4d1a1eabef5538bc72e4b55a7c4b6c Mon Sep 17 00:00:00 2001 From: deveshsingh Date: Tue, 2 Jan 2024 14:50:53 +0530 Subject: [PATCH 4/7] HDDS-9819.Fixed Test case failures in TestReconTasks. --- .../apache/hadoop/ozone/recon/fsck/ContainerHealthTask.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/fsck/ContainerHealthTask.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/fsck/ContainerHealthTask.java index 8b892bcff567..2a3e2cf720a7 100644 --- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/fsck/ContainerHealthTask.java +++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/fsck/ContainerHealthTask.java @@ -468,7 +468,7 @@ public static List generateUnhealthyRecords( if (container.isOverReplicated() && !recordForStateExists.contains( - UnHealthyContainerStates.OVER_REPLICATED.toString())) { + UnHealthyContainerStates.OVER_REPLICATED.toString())) { records.add(recordForState( container, UnHealthyContainerStates.OVER_REPLICATED, time)); populateContainerStats(container, @@ -478,7 +478,7 @@ public static List generateUnhealthyRecords( if (container.isMisReplicated() && !recordForStateExists.contains( - UnHealthyContainerStates.MIS_REPLICATED.toString())) { + UnHealthyContainerStates.MIS_REPLICATED.toString())) { records.add(recordForState( container, UnHealthyContainerStates.MIS_REPLICATED, time)); populateContainerStats(container, From 81f271c556e3086a73257e4f2ab8ef9e03188046 Mon Sep 17 00:00:00 2001 From: deveshsingh Date: Wed, 10 Jan 2024 12:08:03 +0530 Subject: [PATCH 5/7] HDDS-9819.Fixed review comments. --- .../hadoop/ozone/recon/fsck/ContainerHealthTask.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/fsck/ContainerHealthTask.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/fsck/ContainerHealthTask.java index 2a3e2cf720a7..127e14b62fd0 100644 --- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/fsck/ContainerHealthTask.java +++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/fsck/ContainerHealthTask.java @@ -66,6 +66,7 @@ public class ContainerHealthTask extends ReconScmTask { private static final Logger LOG = LoggerFactory.getLogger(ContainerHealthTask.class); + public static final int FETCH_COUNT = Integer.parseInt(DEFAULT_FETCH_COUNT); private ReadWriteLock lock = new ReentrantReadWriteLock(true); @@ -145,7 +146,7 @@ private void checkAndProcessContainers( unhealthyContainerStateStatsMap, long currentTime) { ContainerID startID = ContainerID.valueOf(1); List containers = containerManager.getContainers(startID, - Integer.parseInt(DEFAULT_FETCH_COUNT)); + FETCH_COUNT); long start; while (!containers.isEmpty()) { start = Time.monotonicNow(); @@ -158,12 +159,11 @@ private void checkAndProcessContainers( " processing {} containers.", Time.monotonicNow() - start, containers.size()); logUnhealthyContainerStats(unhealthyContainerStateStatsMap); - if (containers.size() > Integer.parseInt(DEFAULT_FETCH_COUNT)) { + if (containers.size() > FETCH_COUNT) { startID = ContainerID.valueOf( containers.get(containers.size() - 1).getContainerID()); containers.clear(); - containers = containerManager.getContainers(startID, - Integer.parseInt(DEFAULT_FETCH_COUNT)); + containers = containerManager.getContainers(startID, FETCH_COUNT); } containers.clear(); } From 8612090dd2c92073c5ea1b46b623dc7aa1a8f409 Mon Sep 17 00:00:00 2001 From: deveshsingh Date: Thu, 11 Jan 2024 19:16:47 +0530 Subject: [PATCH 6/7] HDDS-9819.Fixed review comments. --- .../ozone/recon/fsck/ContainerHealthTask.java | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/fsck/ContainerHealthTask.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/fsck/ContainerHealthTask.java index 127e14b62fd0..4296dca366a2 100644 --- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/fsck/ContainerHealthTask.java +++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/fsck/ContainerHealthTask.java @@ -148,6 +148,7 @@ private void checkAndProcessContainers( List containers = containerManager.getContainers(startID, FETCH_COUNT); long start; + long iterationCount = 0; while (!containers.isEmpty()) { start = Time.monotonicNow(); containers.stream() @@ -159,14 +160,19 @@ private void checkAndProcessContainers( " processing {} containers.", Time.monotonicNow() - start, containers.size()); logUnhealthyContainerStats(unhealthyContainerStateStatsMap); - if (containers.size() > FETCH_COUNT) { + if (containers.size() >= FETCH_COUNT) { startID = ContainerID.valueOf( - containers.get(containers.size() - 1).getContainerID()); - containers.clear(); + containers.get(containers.size() - 1).getContainerID() + 1); containers = containerManager.getContainers(startID, FETCH_COUNT); + } else { + containers.clear(); } - containers.clear(); + iterationCount++; } + LOG.info( + "Container Health task thread took {} iterations to fetch all " + + "containers using batched approach with batch size of {}", + iterationCount, FETCH_COUNT); } private void logUnhealthyContainerStats( From 1fb54db550c612b46eafdbb0655330961363f342 Mon Sep 17 00:00:00 2001 From: "Doroszlai, Attila" Date: Sun, 14 Jan 2024 15:46:07 +0100 Subject: [PATCH 7/7] restore changes from HDDS-10034 (lost in merge) --- .../ozone/recon/fsck/TestContainerHealthTask.java | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/fsck/TestContainerHealthTask.java b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/fsck/TestContainerHealthTask.java index 0e5e89bb1cb0..358799cc0330 100644 --- a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/fsck/TestContainerHealthTask.java +++ b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/fsck/TestContainerHealthTask.java @@ -18,10 +18,10 @@ package org.apache.hadoop.ozone.recon.fsck; +import static org.assertj.core.api.Assertions.assertThat; import static org.hadoop.ozone.recon.schema.ContainerSchemaDefinition.UnHealthyContainerStates.ALL_REPLICAS_UNHEALTHY; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; -import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.Mockito.mock; @@ -140,7 +140,7 @@ public void testRun() throws Exception { .thenReturn(Collections.emptySet()); List all = unHealthyContainersTableHandle.findAll(); - assertTrue(all.isEmpty()); + assertThat(all).isEmpty(); long currentTime = System.currentTimeMillis(); ReconTaskStatusDao reconTaskStatusDao = getDao(ReconTaskStatusDao.class); @@ -195,8 +195,8 @@ public void testRun() throws Exception { ReconTaskStatus taskStatus = reconTaskStatusDao.findById(containerHealthTask.getTaskName()); - assertTrue(taskStatus.getLastUpdatedTimestamp() > - currentTime); + assertThat(taskStatus.getLastUpdatedTimestamp()) + .isGreaterThan(currentTime); // Now run the job again, to check that relevant records are updated or // removed as appropriate. Need to adjust the return value for all the mocks @@ -304,7 +304,7 @@ public void testDeletedContainer() throws Exception { .thenReturn(new ContainerWithPipeline(mockContainers.get(0), null)); List all = unHealthyContainersTableHandle.findAll(); - assertTrue(all.isEmpty()); + assertThat(all).isEmpty(); long currentTime = System.currentTimeMillis(); ReconTaskStatusDao reconTaskStatusDao = getDao(ReconTaskStatusDao.class); @@ -332,8 +332,8 @@ public void testDeletedContainer() throws Exception { ReconTaskStatus taskStatus = reconTaskStatusDao.findById(containerHealthTask.getTaskName()); - assertTrue(taskStatus.getLastUpdatedTimestamp() > - currentTime); + assertThat(taskStatus.getLastUpdatedTimestamp()) + .isGreaterThan(currentTime); } private Set getMockReplicas(