diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ContainerPlacementStatus.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ContainerPlacementStatus.java index 8f3334fa3725..70f256ce8e49 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ContainerPlacementStatus.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ContainerPlacementStatus.java @@ -54,4 +54,18 @@ public interface ContainerPlacementStatus { */ int misReplicationCount(); + /** + * The number of locations (eg racks, node groups) the container should be + * placed on. + * @return The expected Placement Count + */ + int expectedPlacementCount(); + + /** + * The actual placement count, eg how many racks the container is currently + * on. + * @return The actual placement count. + */ + int actualPlacementCount(); + } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/ContainerPlacementStatusDefault.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/ContainerPlacementStatusDefault.java index c9528ec70150..3fdcd2f40169 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/ContainerPlacementStatusDefault.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/ContainerPlacementStatusDefault.java @@ -57,4 +57,14 @@ public int misReplicationCount() { } return requiredRacks - currentRacks; } + + @Override + public int expectedPlacementCount() { + return Math.min(requiredRacks, totalRacks); + } + + @Override + public int actualPlacementCount() { + return currentRacks; + } } diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/recon/TestReconTasks.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/recon/TestReconTasks.java index 263c31e003f9..0e5a72148ea9 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/recon/TestReconTasks.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/recon/TestReconTasks.java @@ -37,7 +37,7 @@ import org.apache.hadoop.ozone.recon.scm.ReconContainerManager; import org.apache.hadoop.ozone.recon.scm.ReconStorageContainerManagerFacade; import org.apache.hadoop.test.LambdaTestUtils; -import org.hadoop.ozone.recon.schema.tables.pojos.MissingContainers; +import org.hadoop.ozone.recon.schema.tables.pojos.UnhealthyContainers; import org.junit.After; import org.junit.Before; import org.junit.Rule; @@ -114,7 +114,7 @@ public void testMissingContainerDownNode() throws Exception { cluster.shutdownHddsDatanode(pipeline.getFirstNode()); LambdaTestUtils.await(120000, 10000, () -> { - List allMissingContainers = + List allMissingContainers = reconContainerManager.getContainerSchemaManager() .getAllMissingContainers(); return (allMissingContainers.size() == 1); @@ -123,7 +123,7 @@ public void testMissingContainerDownNode() throws Exception { // Restart the Datanode to make sure we remove the missing container. cluster.restartHddsDatanode(pipeline.getFirstNode(), true); LambdaTestUtils.await(120000, 10000, () -> { - List allMissingContainers = + List allMissingContainers = reconContainerManager.getContainerSchemaManager() .getAllMissingContainers(); return (allMissingContainers.isEmpty()); diff --git a/hadoop-ozone/recon-codegen/src/main/java/org/hadoop/ozone/recon/schema/ContainerSchemaDefinition.java b/hadoop-ozone/recon-codegen/src/main/java/org/hadoop/ozone/recon/schema/ContainerSchemaDefinition.java index ed60094b58ad..5696ab3e01bc 100644 --- a/hadoop-ozone/recon-codegen/src/main/java/org/hadoop/ozone/recon/schema/ContainerSchemaDefinition.java +++ b/hadoop-ozone/recon-codegen/src/main/java/org/hadoop/ozone/recon/schema/ContainerSchemaDefinition.java @@ -19,6 +19,8 @@ package org.hadoop.ozone.recon.schema; import static org.hadoop.ozone.recon.codegen.SqlDbUtils.TABLE_EXISTS_CHECK; +import static org.jooq.impl.DSL.field; +import static org.jooq.impl.DSL.name; import com.google.inject.Inject; import com.google.inject.Singleton; @@ -38,9 +40,22 @@ public class ContainerSchemaDefinition implements ReconSchemaDefinition { public static final String CONTAINER_HISTORY_TABLE_NAME = "CONTAINER_HISTORY"; - public static final String MISSING_CONTAINERS_TABLE_NAME = - "MISSING_CONTAINERS"; + public static final String UNHEALTHY_CONTAINERS_TABLE_NAME = + "UNHEALTHY_CONTAINERS"; + + /** + * ENUM describing the allowed container states which can be stored in the + * unhealthy containers table. + */ + public enum UnHealthyContainerStates { + MISSING, + UNDER_REPLICATED, + OVER_REPLICATED, + MIS_REPLICATED + } + private static final String CONTAINER_ID = "container_id"; + private static final String CONTAINER_STATE = "container_state"; private final DataSource dataSource; private DSLContext dslContext; @@ -56,8 +71,8 @@ public void initializeSchema() throws SQLException { if (!TABLE_EXISTS_CHECK.test(conn, CONTAINER_HISTORY_TABLE_NAME)) { createContainerHistoryTable(); } - if (!TABLE_EXISTS_CHECK.test(conn, MISSING_CONTAINERS_TABLE_NAME)) { - createMissingContainersTable(); + if (!TABLE_EXISTS_CHECK.test(conn, UNHEALTHY_CONTAINERS_TABLE_NAME)) { + createUnhealthyContainersTable(); } } @@ -78,12 +93,20 @@ private void createContainerHistoryTable() { /** * Create the Missing Containers table. */ - private void createMissingContainersTable() { - dslContext.createTableIfNotExists(MISSING_CONTAINERS_TABLE_NAME) - .column(CONTAINER_ID, SQLDataType.BIGINT) - .column("missing_since", SQLDataType.BIGINT) + private void createUnhealthyContainersTable() { + dslContext.createTableIfNotExists(UNHEALTHY_CONTAINERS_TABLE_NAME) + .column(CONTAINER_ID, SQLDataType.BIGINT.nullable(false)) + .column(CONTAINER_STATE, SQLDataType.VARCHAR(16).nullable(false)) + .column("in_state_since", SQLDataType.BIGINT.nullable(false)) + .column("expected_replica_count", SQLDataType.INTEGER.nullable(false)) + .column("actual_replica_count", SQLDataType.INTEGER.nullable(false)) + .column("replica_delta", SQLDataType.INTEGER.nullable(false)) + .column("reason", SQLDataType.VARCHAR(500).nullable(true)) .constraint(DSL.constraint("pk_container_id") - .primaryKey(CONTAINER_ID)) + .primaryKey(CONTAINER_ID, CONTAINER_STATE)) + .constraint(DSL.constraint(UNHEALTHY_CONTAINERS_TABLE_NAME + "ck1") + .check(field(name("container_state")) + .in(UnHealthyContainerStates.values()))) .execute(); } diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconControllerModule.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconControllerModule.java index 5ec0e3bb5e06..23bdb4978289 100644 --- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconControllerModule.java +++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconControllerModule.java @@ -63,8 +63,8 @@ import org.hadoop.ozone.recon.schema.tables.daos.ContainerHistoryDao; import org.hadoop.ozone.recon.schema.tables.daos.FileCountBySizeDao; import org.hadoop.ozone.recon.schema.tables.daos.GlobalStatsDao; -import org.hadoop.ozone.recon.schema.tables.daos.MissingContainersDao; import org.hadoop.ozone.recon.schema.tables.daos.ReconTaskStatusDao; +import org.hadoop.ozone.recon.schema.tables.daos.UnhealthyContainersDao; import org.jooq.Configuration; import org.jooq.DAO; import org.slf4j.Logger; @@ -126,7 +126,7 @@ public static class ReconDaoBindingModule extends AbstractModule { ImmutableList.of( FileCountBySizeDao.class, ReconTaskStatusDao.class, - MissingContainersDao.class, + UnhealthyContainersDao.class, GlobalStatsDao.class, ClusterGrowthDailyDao.class, ContainerHistoryDao.class); diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/ContainerEndpoint.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/ContainerEndpoint.java index 325b0b9f2ac4..b4f9792c8625 100644 --- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/ContainerEndpoint.java +++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/ContainerEndpoint.java @@ -245,7 +245,7 @@ public Response getMissingContainers() { containerSchemaManager.getLatestContainerHistory( containerID, containerInfo.getReplicationFactor().getNumber()); missingContainers.add(new MissingContainerMetadata(containerID, - container.getMissingSince(), keyCount, pipelineID, datanodes)); + container.getInStateSince(), keyCount, pipelineID, datanodes)); } catch (IOException ioEx) { throw new WebApplicationException(ioEx, Response.Status.INTERNAL_SERVER_ERROR); diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/fsck/ContainerHealthStatus.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/fsck/ContainerHealthStatus.java new file mode 100644 index 000000000000..472efb411b91 --- /dev/null +++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/fsck/ContainerHealthStatus.java @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.ozone.recon.fsck; + +import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReplicaProto; +import org.apache.hadoop.hdds.scm.ContainerPlacementStatus; +import org.apache.hadoop.hdds.scm.PlacementPolicy; +import org.apache.hadoop.hdds.scm.container.ContainerInfo; +import org.apache.hadoop.hdds.scm.container.ContainerReplica; +import java.util.List; +import java.util.Set; +import java.util.stream.Collectors; + +/** + * Class which encapsulates all the information required to determine if a + * container and its replicas are correctly replicated and placed. + */ + +public class ContainerHealthStatus { + + private ContainerInfo container; + private int replicaDelta; + private Set replicas; + private ContainerPlacementStatus placementStatus; + + ContainerHealthStatus(ContainerInfo container, + Set replicas, PlacementPolicy placementPolicy) { + this.container = container; + int repFactor = container.getReplicationFactor().getNumber(); + this.replicas = replicas + .stream() + .filter(r -> !r.getState() + .equals((ContainerReplicaProto.State.UNHEALTHY))) + .collect(Collectors.toSet()); + this.replicaDelta = repFactor - this.replicas.size(); + this.placementStatus = getPlacementStatus(placementPolicy, repFactor); + } + + public long getContainerID() { + return this.container.getContainerID(); + } + + public ContainerInfo getContainer() { + return this.container; + } + + public int getReplicationFactor() { + return container.getReplicationFactor().getNumber(); + } + + public boolean isHealthy() { + return replicaDelta == 0 && !isMisReplicated(); + } + + public boolean isOverReplicated() { + return replicaDelta < 0; + } + + public boolean isUnderReplicated() { + return !isMissing() && replicaDelta > 0; + } + + public int replicaDelta() { + return replicaDelta; + } + + public int getReplicaCount() { + return replicas.size(); + } + + public boolean isMisReplicated() { + return !isMissing() && !placementStatus.isPolicySatisfied(); + } + + public int misReplicatedDelta() { + return placementStatus.misReplicationCount(); + } + + public int expectedPlacementCount() { + return placementStatus.expectedPlacementCount(); + } + + public int actualPlacementCount() { + return placementStatus.actualPlacementCount(); + } + + public String misReplicatedReason() { + return placementStatus.misReplicatedReason(); + } + + public boolean isMissing() { + return replicas.size() == 0; + } + + private ContainerPlacementStatus getPlacementStatus( + PlacementPolicy policy, int repFactor) { + List dns = replicas.stream() + .map(ContainerReplica::getDatanodeDetails) + .collect(Collectors.toList()); + return policy.validateContainerPlacement(dns, repFactor); + } +} \ No newline at end of file diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/fsck/ContainerHealthTask.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/fsck/ContainerHealthTask.java new file mode 100644 index 000000000000..8bd296f32c81 --- /dev/null +++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/fsck/ContainerHealthTask.java @@ -0,0 +1,375 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.recon.fsck; + +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import org.apache.hadoop.hdds.scm.PlacementPolicy; +import org.apache.hadoop.hdds.scm.container.ContainerID; +import org.apache.hadoop.hdds.scm.container.ContainerInfo; +import org.apache.hadoop.hdds.scm.container.ContainerManager; +import org.apache.hadoop.hdds.scm.container.ContainerNotFoundException; +import org.apache.hadoop.hdds.scm.container.ContainerReplica; +import org.apache.hadoop.ozone.recon.persistence.ContainerSchemaManager; +import org.apache.hadoop.ozone.recon.scm.ReconScmTask; +import org.apache.hadoop.ozone.recon.tasks.ReconTaskConfig; +import org.apache.hadoop.util.Time; +import org.hadoop.ozone.recon.schema.ContainerSchemaDefinition.UnHealthyContainerStates; +import org.hadoop.ozone.recon.schema.tables.daos.ReconTaskStatusDao; +import org.hadoop.ozone.recon.schema.tables.pojos.UnhealthyContainers; +import org.hadoop.ozone.recon.schema.tables.records.UnhealthyContainersRecord; +import org.jooq.Cursor; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Class that scans the list of containers and keeps track of containers with + * no replicas in a SQL table. + */ +public class ContainerHealthTask extends ReconScmTask { + + private static final Logger LOG = + LoggerFactory.getLogger(ContainerHealthTask.class); + + private ContainerManager containerManager; + private ContainerSchemaManager containerSchemaManager; + private PlacementPolicy placementPolicy; + private final long interval; + private Set processedContainers = new HashSet<>(); + + public ContainerHealthTask( + ContainerManager containerManager, + ReconTaskStatusDao reconTaskStatusDao, + ContainerSchemaManager containerSchemaManager, + PlacementPolicy placementPolicy, + ReconTaskConfig reconTaskConfig) { + super(reconTaskStatusDao); + this.containerSchemaManager = containerSchemaManager; + this.placementPolicy = placementPolicy; + this.containerManager = containerManager; + this.interval = TimeUnit.SECONDS.toMillis( + reconTaskConfig.getMissingContainerTaskInterval()); + } + + public synchronized void run() { + try { + while (canRun()) { + long start = Time.monotonicNow(); + long currentTime = System.currentTimeMillis(); + long existingCount = processExistingDBRecords(currentTime); + LOG.info("Container Health task thread took {} milliseconds to" + + " process {} existing database records.", + Time.monotonicNow() - start, existingCount); + start = Time.monotonicNow(); + final List containers = containerManager.getContainers(); + containers.stream() + .filter(c -> !processedContainers.contains(c)) + .forEach(c -> processContainer(c, currentTime)); + recordSingleRunCompletion(); + LOG.info("Container Health task thread took {} milliseconds for" + + " processing {} containers.", Time.monotonicNow() - start, + containers.size()); + processedContainers.clear(); + wait(interval); + } + } catch (Throwable t) { + LOG.error("Exception in Missing Container task Thread.", t); + } + } + + private ContainerHealthStatus setCurrentContainer(long recordId) + throws ContainerNotFoundException { + ContainerInfo container = + containerManager.getContainer(new ContainerID(recordId)); + Set replicas = + containerManager.getContainerReplicas(container.containerID()); + return new ContainerHealthStatus(container, replicas, placementPolicy); + } + + private void completeProcessingContainer(ContainerHealthStatus container, + Set existingRecords, long currentTime) { + containerSchemaManager.insertUnhealthyContainerRecords( + ContainerHealthRecords.generateUnhealthyRecords( + container, existingRecords, currentTime)); + processedContainers.add(container.getContainer()); + } + + /** + * This method reads all existing records in the UnhealthyContainers table. + * The container records are read sorted by Container ID, as there can be + * more than 1 record per container. + * Each record is checked to see if it should be retained or deleted, and if + * any of the replica counts have changed the record is updated. Each record + * for a container is collected into a Set and when the next container id + * changes, indicating the end of the records for the current container, + * completeProcessingContainer is called. This will check to see if any + * additional records need to be added to the database. + * + * @param currentTime Timestamp to place on all records generated by this run + * @return Count of records processed + */ + private long processExistingDBRecords(long currentTime) { + long recordCount = 0; + try (Cursor cursor = + containerSchemaManager.getAllUnhealthyRecordsCursor()) { + ContainerHealthStatus currentContainer = null; + Set existingRecords = new HashSet<>(); + while(cursor.hasNext()) { + recordCount++; + UnhealthyContainersRecord rec = cursor.fetchNext(); + try { + if (currentContainer == null) { + currentContainer = setCurrentContainer(rec.getContainerId()); + } + if (currentContainer.getContainerID() != rec.getContainerId()) { + completeProcessingContainer( + currentContainer, existingRecords, currentTime); + existingRecords.clear(); + currentContainer = setCurrentContainer(rec.getContainerId()); + } + if (ContainerHealthRecords + .retainOrUpdateRecord(currentContainer, rec)) { + existingRecords.add(rec.getContainerState()); + if (rec.changed()) { + rec.update(); + } + } else { + rec.delete(); + } + } catch (ContainerNotFoundException cnf) { + rec.delete(); + currentContainer = null; + } + } + // Remember to finish processing the last container + if (currentContainer != null) { + completeProcessingContainer( + currentContainer, existingRecords, currentTime); + } + } + return recordCount; + } + + private void processContainer(ContainerInfo container, long currentTime) { + try { + Set containerReplicas = + containerManager.getContainerReplicas(container.containerID()); + ContainerHealthStatus h = new ContainerHealthStatus( + container, containerReplicas, placementPolicy); + if (h.isHealthy()) { + return; + } + containerSchemaManager.insertUnhealthyContainerRecords( + ContainerHealthRecords.generateUnhealthyRecords(h, currentTime)); + } catch (ContainerNotFoundException e) { + LOG.error("Container not found while processing container in Container " + + "Health task", e); + } + } + + /** + * Helper methods to generate and update the required database records for + * unhealthy containers. + */ + static public class ContainerHealthRecords { + + /** + * Given an existing database record and a ContainerHealthStatus object, + * this method will check if the database record should be retained or not. + * Eg, if a missing record exists, and the ContainerHealthStatus indicates + * the container is still missing, the method will return true, indicating + * the record should be retained. If the container is no longer missing, + * it will return false, indicating the record should be deleted. + * If the record is to be retained, the fields in the record for actual + * replica count, delta and reason will be updated if their counts have + * changed. + * @param container ContainerHealthStatus representing the health state of + * the container. + * @param rec Existing database record from the UnhealthyContainers table. + * @return + */ + static public boolean retainOrUpdateRecord( + ContainerHealthStatus container, UnhealthyContainersRecord rec) { + boolean returnValue = false; + switch(UnHealthyContainerStates.valueOf(rec.getContainerState())) { + case MISSING: + returnValue = container.isMissing(); + break; + case MIS_REPLICATED: + returnValue = keepMisReplicatedRecord(container, rec); + break; + case UNDER_REPLICATED: + returnValue = keepUnderReplicatedRecord(container, rec); + break; + case OVER_REPLICATED: + returnValue = keepOverReplicatedRecord(container, rec); + break; + default: + returnValue = false; + } + return returnValue; + } + + static public List generateUnhealthyRecords( + ContainerHealthStatus container, long time) { + return generateUnhealthyRecords(container, new HashSet<>(), time); + } + + /** + * Check the status of the container and generate any database records that + * need to be recorded. This method also considers the records seen by the + * method retainOrUpdateRecord. If a record has been seen by that method + * then it will not be emitted here. Therefore this method returns only the + * missing records which have not been seen already. + * @return List of UnhealthyContainer records to be stored in the DB + */ + static public List generateUnhealthyRecords( + ContainerHealthStatus container, Set recordForStateExists, + long time) { + List records = new ArrayList<>(); + if (container.isHealthy()) { + return records; + } + + if (container.isMissing() + && !recordForStateExists.contains( + UnHealthyContainerStates.MISSING.toString())) { + records.add( + recordForState(container, UnHealthyContainerStates.MISSING, time)); + // A container cannot have any other records if it is missing so return + return records; + } + + if (container.isUnderReplicated() + && !recordForStateExists.contains( + UnHealthyContainerStates.UNDER_REPLICATED.toString())) { + records.add(recordForState( + container, UnHealthyContainerStates.UNDER_REPLICATED, time)); + } + + if (container.isOverReplicated() + && !recordForStateExists.contains( + UnHealthyContainerStates.OVER_REPLICATED.toString())) { + records.add(recordForState( + container, UnHealthyContainerStates.OVER_REPLICATED, time)); + } + + if (container.isMisReplicated() + && !recordForStateExists.contains( + UnHealthyContainerStates.MIS_REPLICATED.toString())) { + records.add(recordForState( + container, UnHealthyContainerStates.MIS_REPLICATED, time)); + } + return records; + } + + static private UnhealthyContainers recordForState( + ContainerHealthStatus container, UnHealthyContainerStates state, + long time) { + UnhealthyContainers rec = new UnhealthyContainers(); + rec.setContainerId(container.getContainerID()); + if (state == UnHealthyContainerStates.MIS_REPLICATED) { + rec.setExpectedReplicaCount(container.expectedPlacementCount()); + rec.setActualReplicaCount(container.actualPlacementCount()); + rec.setReplicaDelta(container.misReplicatedDelta()); + rec.setReason(container.misReplicatedReason()); + } else { + rec.setExpectedReplicaCount(container.getReplicationFactor()); + rec.setActualReplicaCount(container.getReplicaCount()); + rec.setReplicaDelta(container.replicaDelta()); + } + rec.setContainerState(state.toString()); + rec.setInStateSince(time); + return rec; + } + + static private boolean keepOverReplicatedRecord( + ContainerHealthStatus container, UnhealthyContainersRecord rec) { + if (container.isOverReplicated()) { + updateExpectedReplicaCount(rec, container.getReplicationFactor()); + updateActualReplicaCount(rec, container.getReplicaCount()); + updateReplicaDelta(rec, container.replicaDelta()); + return true; + } + return false; + } + + static private boolean keepUnderReplicatedRecord( + ContainerHealthStatus container, UnhealthyContainersRecord rec) { + if (container.isUnderReplicated()) { + updateExpectedReplicaCount(rec, container.getReplicationFactor()); + updateActualReplicaCount(rec, container.getReplicaCount()); + updateReplicaDelta(rec, container.replicaDelta()); + return true; + } + return false; + } + + static private boolean keepMisReplicatedRecord( + ContainerHealthStatus container, UnhealthyContainersRecord rec) { + if (container.isMisReplicated()) { + updateExpectedReplicaCount(rec, container.expectedPlacementCount()); + updateActualReplicaCount(rec, container.actualPlacementCount()); + updateReplicaDelta(rec, container.misReplicatedDelta()); + updateReason(rec, container.misReplicatedReason()); + return true; + } + return false; + } + + /** + * With a Jooq record, if you update any field in the record, the record + * is marked as changed, even if you updated it to the same value as it is + * already set to. We only need to run a DB update statement if the record + * has really changed. The methods below ensure we do not update the Jooq + * record unless the values have changed and hence save a DB execution + */ + static private void updateExpectedReplicaCount( + UnhealthyContainersRecord rec, int expectedCount) { + if (rec.getExpectedReplicaCount() != expectedCount) { + rec.setExpectedReplicaCount(expectedCount); + } + } + + static private void updateActualReplicaCount( + UnhealthyContainersRecord rec, int actualCount) { + if (rec.getActualReplicaCount() != actualCount) { + rec.setActualReplicaCount(actualCount); + } + } + + static private void updateReplicaDelta( + UnhealthyContainersRecord rec, int delta) { + if (rec.getReplicaDelta() != delta) { + rec.setReplicaDelta(delta); + } + } + + static private void updateReason( + UnhealthyContainersRecord rec, String reason) { + if (!rec.getReason().equals(reason)) { + rec.setReason(reason); + } + } + } + +} diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/fsck/MissingContainerTask.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/fsck/MissingContainerTask.java deleted file mode 100644 index 9c1a2505b745..000000000000 --- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/fsck/MissingContainerTask.java +++ /dev/null @@ -1,113 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - *

- * http://www.apache.org/licenses/LICENSE-2.0 - *

- * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.ozone.recon.fsck; - -import java.util.Set; -import java.util.concurrent.TimeUnit; - - -import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReplicaProto.State; -import org.apache.hadoop.hdds.scm.container.ContainerID; -import org.apache.hadoop.hdds.scm.container.ContainerManager; -import org.apache.hadoop.hdds.scm.container.ContainerNotFoundException; -import org.apache.hadoop.hdds.scm.container.ContainerReplica; -import org.apache.hadoop.ozone.recon.persistence.ContainerSchemaManager; -import org.apache.hadoop.ozone.recon.scm.ReconScmTask; -import org.apache.hadoop.ozone.recon.tasks.ReconTaskConfig; -import org.apache.hadoop.util.Time; -import org.hadoop.ozone.recon.schema.tables.daos.ReconTaskStatusDao; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.util.CollectionUtils; - -/** - * Class that scans the list of containers and keeps track of containers with - * no replicas in a SQL table. - */ -public class MissingContainerTask extends ReconScmTask { - - private static final Logger LOG = - LoggerFactory.getLogger(MissingContainerTask.class); - - private ContainerManager containerManager; - private ContainerSchemaManager containerSchemaManager; - private final long interval; - - public MissingContainerTask( - ContainerManager containerManager, - ReconTaskStatusDao reconTaskStatusDao, - ContainerSchemaManager containerSchemaManager, - ReconTaskConfig reconTaskConfig) { - super(reconTaskStatusDao); - this.containerSchemaManager = containerSchemaManager; - this.containerManager = containerManager; - this.interval = TimeUnit.SECONDS.toMillis( - reconTaskConfig.getMissingContainerTaskInterval()); - } - - public synchronized void run() { - try { - while (canRun()) { - long start = Time.monotonicNow(); - long currentTime = System.currentTimeMillis(); - final Set containerIds = - containerManager.getContainerIDs(); - containerIds.forEach(containerID -> - processContainer(containerID, currentTime)); - recordSingleRunCompletion(); - LOG.info("Missing Container task Thread took {} milliseconds for" + - " processing {} containers.", Time.monotonicNow() - start, - containerIds.size()); - wait(interval); - } - } catch (Throwable t) { - LOG.error("Exception in Missing Container task Thread.", t); - } - } - - private void processContainer(ContainerID containerID, long currentTime) { - try { - Set containerReplicas = - containerManager.getContainerReplicas(containerID); - // check if a container has 0 replicas or if all available replicas - // are marked UNHEALTHY. - boolean isAllUnhealthy = - containerReplicas.stream().allMatch(replica -> - replica.getState().equals(State.UNHEALTHY)); - boolean isMissingContainer = - containerSchemaManager.isMissingContainer(containerID.getId()); - if (CollectionUtils.isEmpty(containerReplicas) || isAllUnhealthy) { - if (!isMissingContainer) { - LOG.info("Found a missing container with ID {}.", - containerID.getId()); - containerSchemaManager.addMissingContainer(containerID.getId(), - currentTime); - } - } else { - if (isMissingContainer) { - LOG.info("Missing container with ID {} is no longer missing.", - containerID.getId()); - containerSchemaManager.deleteMissingContainer(containerID.getId()); - } - } - } catch (ContainerNotFoundException e) { - LOG.error("Container not found while finding missing containers", e); - } - } -} diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/persistence/ContainerSchemaManager.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/persistence/ContainerSchemaManager.java index 6dc70a29354f..74183f8803c8 100644 --- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/persistence/ContainerSchemaManager.java +++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/persistence/ContainerSchemaManager.java @@ -18,14 +18,18 @@ package org.apache.hadoop.ozone.recon.persistence; import static org.hadoop.ozone.recon.schema.tables.ContainerHistoryTable.CONTAINER_HISTORY; +import static org.hadoop.ozone.recon.schema.tables.UnhealthyContainersTable.UNHEALTHY_CONTAINERS; import com.google.inject.Inject; import com.google.inject.Singleton; import org.hadoop.ozone.recon.schema.ContainerSchemaDefinition; +import org.hadoop.ozone.recon.schema.ContainerSchemaDefinition.UnHealthyContainerStates; import org.hadoop.ozone.recon.schema.tables.daos.ContainerHistoryDao; -import org.hadoop.ozone.recon.schema.tables.daos.MissingContainersDao; +import org.hadoop.ozone.recon.schema.tables.daos.UnhealthyContainersDao; import org.hadoop.ozone.recon.schema.tables.pojos.ContainerHistory; -import org.hadoop.ozone.recon.schema.tables.pojos.MissingContainers; +import org.hadoop.ozone.recon.schema.tables.pojos.UnhealthyContainers; +import org.hadoop.ozone.recon.schema.tables.records.UnhealthyContainersRecord; +import org.jooq.Cursor; import org.jooq.DSLContext; import org.jooq.Record2; import java.util.List; @@ -36,33 +40,33 @@ @Singleton public class ContainerSchemaManager { private ContainerHistoryDao containerHistoryDao; - private MissingContainersDao missingContainersDao; + private UnhealthyContainersDao unhealthyContainersDao; private ContainerSchemaDefinition containerSchemaDefinition; @Inject public ContainerSchemaManager(ContainerHistoryDao containerHistoryDao, - ContainerSchemaDefinition containerSchemaDefinition, - MissingContainersDao missingContainersDao) { + ContainerSchemaDefinition containerSchemaDefinition, + UnhealthyContainersDao unhealthyContainersDao) { this.containerHistoryDao = containerHistoryDao; - this.missingContainersDao = missingContainersDao; + this.unhealthyContainersDao = unhealthyContainersDao; this.containerSchemaDefinition = containerSchemaDefinition; } - public void addMissingContainer(long containerID, long time) { - MissingContainers record = new MissingContainers(containerID, time); - missingContainersDao.insert(record); + public List getAllMissingContainers() { + return unhealthyContainersDao + .fetchByContainerState(UnHealthyContainerStates.MISSING.toString()); } - public List getAllMissingContainers() { - return missingContainersDao.findAll(); - } - - public boolean isMissingContainer(long containerID) { - return missingContainersDao.existsById(containerID); + public Cursor getAllUnhealthyRecordsCursor() { + DSLContext dslContext = containerSchemaDefinition.getDSLContext(); + return dslContext + .selectFrom(UNHEALTHY_CONTAINERS) + .orderBy(UNHEALTHY_CONTAINERS.CONTAINER_ID.asc()) + .fetchLazy(); } - public void deleteMissingContainer(long containerID) { - missingContainersDao.deleteById(containerID); + public void insertUnhealthyContainerRecords(List recs) { + unhealthyContainersDao.insert(recs); } public void upsertContainerHistory(long containerID, String datanode, diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconStorageContainerManagerFacade.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconStorageContainerManagerFacade.java index d6af3a87066c..34a930a9a48e 100644 --- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconStorageContainerManagerFacade.java +++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconStorageContainerManagerFacade.java @@ -26,6 +26,7 @@ import java.util.Set; import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdds.scm.PlacementPolicy; import org.apache.hadoop.hdds.scm.block.BlockManager; import org.apache.hadoop.hdds.scm.container.CloseContainerEventHandler; import org.apache.hadoop.hdds.scm.container.ContainerActionsHandler; @@ -33,6 +34,8 @@ import org.apache.hadoop.hdds.scm.container.ContainerReportHandler; import org.apache.hadoop.hdds.scm.container.IncrementalContainerReportHandler; import org.apache.hadoop.hdds.scm.container.ReplicationManager; +import org.apache.hadoop.hdds.scm.container.placement.algorithms.ContainerPlacementPolicyFactory; +import org.apache.hadoop.hdds.scm.container.placement.algorithms.SCMContainerPlacementMetrics; import org.apache.hadoop.hdds.scm.events.SCMEvents; import org.apache.hadoop.hdds.scm.net.NetworkTopology; import org.apache.hadoop.hdds.scm.net.NetworkTopologyImpl; @@ -50,7 +53,7 @@ import org.apache.hadoop.hdds.utils.db.DBStore; import org.apache.hadoop.hdds.utils.db.DBStoreBuilder; import org.apache.hadoop.io.IOUtils; -import org.apache.hadoop.ozone.recon.fsck.MissingContainerTask; +import org.apache.hadoop.ozone.recon.fsck.ContainerHealthTask; import org.apache.hadoop.ozone.recon.persistence.ContainerSchemaManager; import org.apache.hadoop.ozone.recon.spi.StorageContainerServiceProvider; import org.apache.hadoop.ozone.recon.tasks.ReconTaskConfig; @@ -82,6 +85,8 @@ public class ReconStorageContainerManagerFacade private NetworkTopology clusterMap; private StorageContainerServiceProvider scmServiceProvider; private Set reconScmTasks = new HashSet<>(); + private SCMContainerPlacementMetrics placementMetrics; + private PlacementPolicy containerPlacementPolicy; @Inject public ReconStorageContainerManagerFacade(OzoneConfiguration conf, @@ -99,6 +104,10 @@ public ReconStorageContainerManagerFacade(OzoneConfiguration conf, this.nodeManager = new ReconNodeManager(conf, scmStorageConfig, eventQueue, clusterMap); + placementMetrics = SCMContainerPlacementMetrics.create(); + this.containerPlacementPolicy = + ContainerPlacementPolicyFactory.getPolicy(conf, nodeManager, + clusterMap, true, placementMetrics); this.datanodeProtocolServer = new ReconDatanodeProtocolServer( conf, this, eventQueue); this.pipelineManager = @@ -160,10 +169,11 @@ public ReconStorageContainerManagerFacade(OzoneConfiguration conf, scmServiceProvider, reconTaskStatusDao, reconTaskConfig)); - reconScmTasks.add(new MissingContainerTask( + reconScmTasks.add(new ContainerHealthTask( containerManager, reconTaskStatusDao, containerSchemaManager, + containerPlacementPolicy, reconTaskConfig)); } diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/ContainerDBServiceProvider.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/ContainerDBServiceProvider.java index 03b66239f02d..26b971c2e157 100644 --- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/ContainerDBServiceProvider.java +++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/ContainerDBServiceProvider.java @@ -26,7 +26,7 @@ import org.apache.hadoop.ozone.recon.api.types.ContainerKeyPrefix; import org.apache.hadoop.ozone.recon.api.types.ContainerMetadata; import org.apache.hadoop.hdds.utils.db.TableIterator; -import org.hadoop.ozone.recon.schema.tables.pojos.MissingContainers; +import org.hadoop.ozone.recon.schema.tables.pojos.UnhealthyContainers; /** * The Recon Container DB Service interface. @@ -169,5 +169,5 @@ void deleteContainerMapping(ContainerKeyPrefix containerKeyPrefix) * * @return List of MissingContainers. */ - List getMissingContainers(); + List getMissingContainers(); } diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/impl/ContainerDBServiceProviderImpl.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/impl/ContainerDBServiceProviderImpl.java index 30c397b89376..c196745d9fec 100644 --- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/impl/ContainerDBServiceProviderImpl.java +++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/impl/ContainerDBServiceProviderImpl.java @@ -50,7 +50,7 @@ import org.apache.hadoop.hdds.utils.db.TableIterator; import org.hadoop.ozone.recon.schema.tables.daos.GlobalStatsDao; import org.hadoop.ozone.recon.schema.tables.pojos.GlobalStats; -import org.hadoop.ozone.recon.schema.tables.pojos.MissingContainers; +import org.hadoop.ozone.recon.schema.tables.pojos.UnhealthyContainers; import org.jooq.Configuration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -358,7 +358,7 @@ public Map getContainers(int limit, return containers; } - public List getMissingContainers() { + public List getMissingContainers() { return containerSchemaManager.getAllMissingContainers(); } diff --git a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/api/TestContainerEndpoint.java b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/api/TestContainerEndpoint.java index c59c237b4b25..2650ce719ec9 100644 --- a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/api/TestContainerEndpoint.java +++ b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/api/TestContainerEndpoint.java @@ -68,7 +68,9 @@ import org.apache.hadoop.ozone.recon.spi.impl.StorageContainerServiceProviderImpl; import org.apache.hadoop.ozone.recon.tasks.ContainerKeyMapperTask; import org.apache.hadoop.hdds.utils.db.Table; +import org.hadoop.ozone.recon.schema.ContainerSchemaDefinition; import org.hadoop.ozone.recon.schema.tables.pojos.ContainerHistory; +import org.hadoop.ozone.recon.schema.tables.pojos.UnhealthyContainers; import org.junit.Assert; import org.junit.Before; import org.junit.Rule; @@ -405,8 +407,18 @@ public void testGetMissingContainers() { // Add missing containers to the database long missingSince = System.currentTimeMillis(); - containerSchemaManager.addMissingContainer(1L, missingSince); - + UnhealthyContainers missing = new UnhealthyContainers(); + missing.setContainerId(1L); + missing.setInStateSince(missingSince); + missing.setActualReplicaCount(0); + missing.setExpectedReplicaCount(3); + missing.setReplicaDelta(3); + missing.setContainerState( + ContainerSchemaDefinition.UnHealthyContainerStates.MISSING.toString()); + ArrayList missingList = + new ArrayList(); + missingList.add(missing); + containerSchemaManager.insertUnhealthyContainerRecords(missingList); // Add container history for id 1 containerSchemaManager.upsertContainerHistory(1L, "host1", 1L); containerSchemaManager.upsertContainerHistory(1L, "host2", 2L); diff --git a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/fsck/TestContainerHealthStatus.java b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/fsck/TestContainerHealthStatus.java new file mode 100644 index 000000000000..0a3546a6878c --- /dev/null +++ b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/fsck/TestContainerHealthStatus.java @@ -0,0 +1,181 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.ozone.recon.fsck; + +import org.apache.hadoop.hdds.protocol.MockDatanodeDetails; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos; +import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReplicaProto; +import org.apache.hadoop.hdds.scm.PlacementPolicy; +import org.apache.hadoop.hdds.scm.container.ContainerID; +import org.apache.hadoop.hdds.scm.container.ContainerInfo; +import org.apache.hadoop.hdds.scm.container.ContainerReplica; +import org.apache.hadoop.hdds.scm.container.placement.algorithms.ContainerPlacementStatusDefault; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mockito; +import java.util.HashSet; +import java.util.Set; + +import static junit.framework.TestCase.assertEquals; +import static junit.framework.TestCase.assertTrue; +import static org.junit.Assert.assertFalse; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +/** + * Test to ensure the correct container state is return by a + * ContainerHealthStatus instance. + */ +public class TestContainerHealthStatus { + + private PlacementPolicy placementPolicy; + private ContainerInfo container; + + @Before + public void setup() { + placementPolicy = mock(PlacementPolicy.class); + container = mock(ContainerInfo.class); + when(container.getReplicationFactor()) + .thenReturn(HddsProtos.ReplicationFactor.THREE); + when(container.containerID()).thenReturn(new ContainerID(123456)); + when(container.getContainerID()).thenReturn((long)123456); + when(placementPolicy.validateContainerPlacement( + Mockito.anyList(), Mockito.anyInt())) + .thenReturn(new ContainerPlacementStatusDefault(1, 1, 1)); + } + + @Test + public void testHealthyContainer() { + Set replicas = generateReplicas(container, + ContainerReplicaProto.State.CLOSED, + ContainerReplicaProto.State.CLOSED, + ContainerReplicaProto.State.CLOSED); + ContainerHealthStatus status = + new ContainerHealthStatus(container, replicas, placementPolicy); + assertTrue(status.isHealthy()); + assertFalse(status.isOverReplicated()); + assertFalse(status.isUnderReplicated()); + assertEquals(0, status.replicaDelta()); + assertFalse(status.isMissing()); + assertEquals(false, status.isMisReplicated()); + assertEquals(0, status.misReplicatedDelta()); + + assertEquals(container, status.getContainer()); + assertEquals((long)123456, status.getContainerID()); + assertEquals(3, status.getReplicationFactor()); + assertEquals(3, status.getReplicaCount()); + } + + @Test + public void testHealthyContainerWithExtraUnhealthyReplica() { + Set replicas = generateReplicas(container, + ContainerReplicaProto.State.CLOSED, + ContainerReplicaProto.State.CLOSED, + ContainerReplicaProto.State.CLOSED, + ContainerReplicaProto.State.UNHEALTHY); + ContainerHealthStatus status = + new ContainerHealthStatus(container, replicas, placementPolicy); + assertTrue(status.isHealthy()); + assertFalse(status.isOverReplicated()); + assertFalse(status.isUnderReplicated()); + assertEquals(0, status.replicaDelta()); + assertFalse(status.isMissing()); + assertEquals(false, status.isMisReplicated()); + assertEquals(0, status.misReplicatedDelta()); + } + + @Test + public void testMissingContainer() { + Set replicas = new HashSet<>(); + ContainerHealthStatus status = + new ContainerHealthStatus(container, replicas, placementPolicy); + assertFalse(status.isHealthy()); + assertFalse(status.isOverReplicated()); + assertFalse(status.isUnderReplicated()); + assertEquals(3, status.replicaDelta()); + assertTrue(status.isMissing()); + assertEquals(false, status.isMisReplicated()); + assertEquals(0, status.misReplicatedDelta()); + } + + @Test + public void testUnderReplicatedContainer() { + Set replicas = generateReplicas(container, + ContainerReplicaProto.State.CLOSED); + ContainerHealthStatus status = + new ContainerHealthStatus(container, replicas, placementPolicy); + assertFalse(status.isHealthy()); + assertFalse(status.isMissing()); + assertFalse(status.isOverReplicated()); + assertTrue(status.isUnderReplicated()); + assertEquals(2, status.replicaDelta()); + assertEquals(false, status.isMisReplicated()); + assertEquals(0, status.misReplicatedDelta()); + } + + @Test + public void testOverReplicatedContainer() { + Set replicas = generateReplicas(container, + ContainerReplicaProto.State.CLOSED, + ContainerReplicaProto.State.CLOSED, + ContainerReplicaProto.State.CLOSED, + ContainerReplicaProto.State.CLOSED); + ContainerHealthStatus status = + new ContainerHealthStatus(container, replicas, placementPolicy); + assertFalse(status.isHealthy()); + assertFalse(status.isMissing()); + assertFalse(status.isUnderReplicated()); + assertTrue(status.isOverReplicated()); + assertEquals(-1, status.replicaDelta()); + assertEquals(false, status.isMisReplicated()); + assertEquals(0, status.misReplicatedDelta()); + } + + @Test + public void testMisReplicated() { + Set replicas = generateReplicas(container, + ContainerReplicaProto.State.CLOSED, + ContainerReplicaProto.State.CLOSED, + ContainerReplicaProto.State.CLOSED); + when(placementPolicy.validateContainerPlacement( + Mockito.anyList(), Mockito.anyInt())) + .thenReturn(new ContainerPlacementStatusDefault(1, 2, 5)); + ContainerHealthStatus status = + new ContainerHealthStatus(container, replicas, placementPolicy); + assertFalse(status.isHealthy()); + assertFalse(status.isMissing()); + assertFalse(status.isUnderReplicated()); + assertFalse(status.isOverReplicated()); + assertEquals(0, status.replicaDelta()); + assertTrue(status.isMisReplicated()); + assertEquals(1, status.misReplicatedDelta()); + } + + private Set generateReplicas(ContainerInfo cont, + ContainerReplicaProto.State...states) { + Set replicas = new HashSet<>(); + for (ContainerReplicaProto.State s : states) { + replicas.add(new ContainerReplica.ContainerReplicaBuilder() + .setContainerID(cont.containerID()) + .setDatanodeDetails(MockDatanodeDetails.randomDatanodeDetails()) + .setContainerState(s) + .build()); + } + return replicas; + } +} \ No newline at end of file diff --git a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/fsck/TestContainerHealthTask.java b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/fsck/TestContainerHealthTask.java new file mode 100644 index 000000000000..4dd31e76b346 --- /dev/null +++ b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/fsck/TestContainerHealthTask.java @@ -0,0 +1,280 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.recon.fsck; + +import static junit.framework.TestCase.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.UUID; + +import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.protocol.MockDatanodeDetails; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos; +import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReplicaProto.State; +import org.apache.hadoop.hdds.scm.ContainerPlacementStatus; +import org.apache.hadoop.hdds.scm.PlacementPolicy; +import org.apache.hadoop.hdds.scm.container.ContainerID; +import org.apache.hadoop.hdds.scm.container.ContainerInfo; +import org.apache.hadoop.hdds.scm.container.ContainerManager; +import org.apache.hadoop.hdds.scm.container.ContainerReplica; +import org.apache.hadoop.hdds.scm.container.placement.algorithms.ContainerPlacementStatusDefault; +import org.apache.hadoop.ozone.recon.persistence.ContainerSchemaManager; +import org.apache.hadoop.ozone.recon.scm.ReconStorageContainerManagerFacade; +import org.apache.hadoop.ozone.recon.tasks.ReconTaskConfig; +import org.apache.hadoop.test.LambdaTestUtils; +import org.hadoop.ozone.recon.schema.ContainerSchemaDefinition; +import org.hadoop.ozone.recon.schema.tables.daos.ContainerHistoryDao; +import org.apache.hadoop.ozone.recon.persistence.AbstractReconSqlDBTest; +import org.hadoop.ozone.recon.schema.tables.daos.ReconTaskStatusDao; +import org.hadoop.ozone.recon.schema.tables.daos.UnhealthyContainersDao; +import org.hadoop.ozone.recon.schema.tables.pojos.ReconTaskStatus; +import org.hadoop.ozone.recon.schema.tables.pojos.UnhealthyContainers; +import org.junit.Assert; +import org.junit.Test; + +/** + * Class to test a single run of the Container Health Task. + */ +public class TestContainerHealthTask extends AbstractReconSqlDBTest { + @Test + public void testRun() throws Exception { + UnhealthyContainersDao unHealthyContainersTableHandle = + getDao(UnhealthyContainersDao.class); + + ContainerSchemaManager containerSchemaManager = + new ContainerSchemaManager( + mock(ContainerHistoryDao.class), + getSchemaDefinition(ContainerSchemaDefinition.class), + unHealthyContainersTableHandle); + ReconStorageContainerManagerFacade scmMock = + mock(ReconStorageContainerManagerFacade.class); + MockPlacementPolicy placementMock = new MockPlacementPolicy(); + ContainerManager containerManagerMock = mock(ContainerManager.class); + ContainerReplica unhealthyReplicaMock = mock(ContainerReplica.class); + when(unhealthyReplicaMock.getState()).thenReturn(State.UNHEALTHY); + ContainerReplica healthyReplicaMock = mock(ContainerReplica.class); + when(healthyReplicaMock.getState()).thenReturn(State.CLOSED); + + // Create 6 containers. The first 5 will have various unhealty states + // defined below. The container with ID=6 will be healthy. + List mockContainers = getMockContainers(6); + when(scmMock.getContainerManager()).thenReturn(containerManagerMock); + when(containerManagerMock.getContainers()).thenReturn(mockContainers); + for (ContainerInfo c : mockContainers) { + when(containerManagerMock.getContainer(c.containerID())).thenReturn(c); + } + // Under replicated + when(containerManagerMock.getContainerReplicas(new ContainerID(1L))) + .thenReturn(getMockReplicas(1L, State.CLOSED, State.UNHEALTHY)); + + // return one UNHEALTHY replica for container ID 2 -> Missing + when(containerManagerMock.getContainerReplicas(new ContainerID(2L))) + .thenReturn(getMockReplicas(2L, State.UNHEALTHY)); + + // return 0 replicas for container ID 3 -> Missing + when(containerManagerMock.getContainerReplicas(new ContainerID(3L))) + .thenReturn(Collections.emptySet()); + + // Return 5 Healthy -> Over replicated + when(containerManagerMock.getContainerReplicas(new ContainerID(4L))) + .thenReturn(getMockReplicas(4L, State.CLOSED, State.CLOSED, + State.CLOSED, State.CLOSED, State.CLOSED)); + + // Mis-replicated + Set misReplicas = getMockReplicas(5L, + State.CLOSED, State.CLOSED, State.CLOSED); + placementMock.setMisRepWhenDnPresent( + misReplicas.iterator().next().getDatanodeDetails().getUuid()); + when(containerManagerMock.getContainerReplicas(new ContainerID(5L))) + .thenReturn(misReplicas); + + // Return 3 Healthy -> Healthy container + when(containerManagerMock.getContainerReplicas(new ContainerID(6L))) + .thenReturn(getMockReplicas(6L, + State.CLOSED, State.CLOSED, State.CLOSED)); + + List all = unHealthyContainersTableHandle.findAll(); + Assert.assertTrue(all.isEmpty()); + + long currentTime = System.currentTimeMillis(); + ReconTaskStatusDao reconTaskStatusDao = getDao(ReconTaskStatusDao.class); + ReconTaskConfig reconTaskConfig = new ReconTaskConfig(); + reconTaskConfig.setMissingContainerTaskInterval(2); + ContainerHealthTask containerHealthTask = + new ContainerHealthTask(scmMock.getContainerManager(), + reconTaskStatusDao, containerSchemaManager, + placementMock, reconTaskConfig); + containerHealthTask.start(); + LambdaTestUtils.await(6000, 1000, () -> + (unHealthyContainersTableHandle.count() == 5)); + UnhealthyContainers rec = + unHealthyContainersTableHandle.fetchByContainerId(1L).get(0); + assertEquals("UNDER_REPLICATED", rec.getContainerState()); + assertEquals(2, rec.getReplicaDelta().intValue()); + + rec = unHealthyContainersTableHandle.fetchByContainerId(2L).get(0); + assertEquals("MISSING", rec.getContainerState()); + assertEquals(3, rec.getReplicaDelta().intValue()); + + rec = unHealthyContainersTableHandle.fetchByContainerId(3L).get(0); + assertEquals("MISSING", rec.getContainerState()); + assertEquals(3, rec.getReplicaDelta().intValue()); + + rec = unHealthyContainersTableHandle.fetchByContainerId(4L).get(0); + assertEquals("OVER_REPLICATED", rec.getContainerState()); + assertEquals(-2, rec.getReplicaDelta().intValue()); + + rec = unHealthyContainersTableHandle.fetchByContainerId(5L).get(0); + assertEquals("MIS_REPLICATED", rec.getContainerState()); + assertEquals(1, rec.getReplicaDelta().intValue()); + assertEquals(2, rec.getExpectedReplicaCount().intValue()); + assertEquals(1, rec.getActualReplicaCount().intValue()); + assertNotNull(rec.getReason()); + + ReconTaskStatus taskStatus = + reconTaskStatusDao.findById(containerHealthTask.getTaskName()); + Assert.assertTrue(taskStatus.getLastUpdatedTimestamp() > + currentTime); + + // Now run the job again, to check that relevant records are updated or + // removed as appropriate. Need to adjust the return value for all the mocks + // Under replicated -> Delta goes from 2 to 1 + when(containerManagerMock.getContainerReplicas(new ContainerID(1L))) + .thenReturn(getMockReplicas(1L, State.CLOSED, State.CLOSED)); + + // ID 2 was missing - make it healthy now + when(containerManagerMock.getContainerReplicas(new ContainerID(2L))) + .thenReturn(getMockReplicas(2L, + State.CLOSED, State.CLOSED, State.CLOSED)); + + // return 0 replicas for container ID 3 -> Still Missing + when(containerManagerMock.getContainerReplicas(new ContainerID(3L))) + .thenReturn(Collections.emptySet()); + + // Return 4 Healthy -> Delta changes from -2 to -1 + when(containerManagerMock.getContainerReplicas(new ContainerID(4L))) + .thenReturn(getMockReplicas(4L, State.CLOSED, State.CLOSED, + State.CLOSED, State.CLOSED)); + + // Was mis-replicated - make it healthy now + placementMock.setMisRepWhenDnPresent(null); + + LambdaTestUtils.await(6000, 1000, () -> + (unHealthyContainersTableHandle.count() == 3)); + rec = unHealthyContainersTableHandle.fetchByContainerId(1L).get(0); + assertEquals("UNDER_REPLICATED", rec.getContainerState()); + assertEquals(1, rec.getReplicaDelta().intValue()); + + // This container is now healthy, it should not be in the table any more + assertEquals(0, + unHealthyContainersTableHandle.fetchByContainerId(2L).size()); + + rec = unHealthyContainersTableHandle.fetchByContainerId(3L).get(0); + + assertEquals("MISSING", rec.getContainerState()); + assertEquals(3, rec.getReplicaDelta().intValue()); + + rec = unHealthyContainersTableHandle.fetchByContainerId(4L).get(0); + assertEquals("OVER_REPLICATED", rec.getContainerState()); + assertEquals(-1, rec.getReplicaDelta().intValue()); + + // This container is now healthy, it should not be in the table any more + assertEquals(0, + unHealthyContainersTableHandle.fetchByContainerId(5L).size()); + } + + private Set getMockReplicas( + long containerId, State...states) { + Set replicas = new HashSet<>(); + for (State s : states) { + replicas.add(ContainerReplica.newBuilder() + .setDatanodeDetails(MockDatanodeDetails.randomDatanodeDetails()) + .setContainerState(s) + .setContainerID(new ContainerID(containerId)) + .setSequenceId(1) + .build()); + } + return replicas; + } + + private List getMockContainers(int num) { + List containers = new ArrayList<>(); + for (int i = 1; i <= num; i++) { + ContainerInfo c = mock(ContainerInfo.class); + when(c.getContainerID()).thenReturn((long)i); + when(c.getReplicationFactor()) + .thenReturn(HddsProtos.ReplicationFactor.THREE); + when(c.containerID()).thenReturn(new ContainerID(i)); + containers.add(c); + } + return containers; + } + + /** + * This is a simple implementation of PlacementPolicy, so that when + * validateContainerPlacement() is called, by default it will return a value + * placement object. To get an invalid placement object, simply pass a UUID + * of a datanode via setMisRepWhenDnPresent. If a DN with that UUID is passed + * to validateContainerPlacement, then it will return an invalid placement. + */ + private class MockPlacementPolicy implements PlacementPolicy { + + private UUID misRepWhenDnPresent = null; + + public void setMisRepWhenDnPresent(UUID dn) { + misRepWhenDnPresent = dn; + } + + @Override + public List chooseDatanodes( + List excludedNodes, List favoredNodes, + int nodesRequired, long sizeRequired) throws IOException { + return null; + } + + @Override + public ContainerPlacementStatus validateContainerPlacement( + List dns, int replicas) { + if (misRepWhenDnPresent != null && isDnPresent(dns)) { + return new ContainerPlacementStatusDefault(1, 2, 3); + } else { + return new ContainerPlacementStatusDefault(1, 1, 1); + } + } + + private boolean isDnPresent(List dns) { + for(DatanodeDetails dn : dns) { + if (misRepWhenDnPresent != null + && dn.getUuid().equals(misRepWhenDnPresent)) { + return true; + } + } + return false; + } + } + +} \ No newline at end of file diff --git a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/fsck/TestContainerHealthTaskRecordGenerator.java b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/fsck/TestContainerHealthTaskRecordGenerator.java new file mode 100644 index 000000000000..62baf1298ff7 --- /dev/null +++ b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/fsck/TestContainerHealthTaskRecordGenerator.java @@ -0,0 +1,345 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.ozone.recon.fsck; + +import org.apache.hadoop.hdds.protocol.MockDatanodeDetails; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos; +import org.apache.hadoop.hdds.scm.PlacementPolicy; +import org.apache.hadoop.hdds.scm.container.ContainerID; +import org.apache.hadoop.hdds.scm.container.ContainerInfo; +import org.apache.hadoop.hdds.scm.container.ContainerReplica; +import org.apache.hadoop.hdds.scm.container.placement.algorithms.ContainerPlacementStatusDefault; +import org.hadoop.ozone.recon.schema.ContainerSchemaDefinition.UnHealthyContainerStates; +import org.hadoop.ozone.recon.schema.tables.pojos.UnhealthyContainers; +import org.hadoop.ozone.recon.schema.tables.records.UnhealthyContainersRecord; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mockito; + +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +import static junit.framework.TestCase.assertEquals; +import static junit.framework.TestCase.assertNotNull; +import static junit.framework.TestCase.assertTrue; +import static org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReplicaProto.State.CLOSED; +import static org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReplicaProto; +import static org.junit.Assert.assertFalse; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +/** + * Test to validate the ContainerHealthTask Record Generator creates the correct + * records to store in the database. + */ +public class TestContainerHealthTaskRecordGenerator { + + private PlacementPolicy placementPolicy; + private ContainerInfo container; + + @Before + public void setup() { + placementPolicy = mock(PlacementPolicy.class); + container = mock(ContainerInfo.class); + when(container.getReplicationFactor()) + .thenReturn(HddsProtos.ReplicationFactor.THREE); + when(container.containerID()).thenReturn(new ContainerID(123456)); + when(container.getContainerID()).thenReturn((long)123456); + when(placementPolicy.validateContainerPlacement( + Mockito.anyList(), Mockito.anyInt())) + .thenReturn(new ContainerPlacementStatusDefault(1, 1, 1)); + } + + @Test + public void testMissingRecordRetained() { + Set replicas = new HashSet<>(); + ContainerHealthStatus status = + new ContainerHealthStatus(container, replicas, placementPolicy); + // Missing record should be retained + assertTrue(ContainerHealthTask.ContainerHealthRecords + .retainOrUpdateRecord(status, missingRecord())); + // Under / Over / Mis replicated should not be retained as if a container is + // missing then it is not in any other category. + assertFalse(ContainerHealthTask.ContainerHealthRecords + .retainOrUpdateRecord(status, underReplicatedRecord())); + assertFalse(ContainerHealthTask.ContainerHealthRecords + .retainOrUpdateRecord(status, overReplicatedRecord())); + assertFalse(ContainerHealthTask.ContainerHealthRecords + .retainOrUpdateRecord(status, misReplicatedRecord())); + + replicas = generateReplicas(container, CLOSED, CLOSED, CLOSED); + status = new ContainerHealthStatus(container, replicas, placementPolicy); + assertFalse(ContainerHealthTask.ContainerHealthRecords + .retainOrUpdateRecord(status, missingRecord())); + } + + @Test + public void testUnderReplicatedRecordRetainedAndUpdated() { + // under replicated container + Set replicas = + generateReplicas(container, CLOSED, CLOSED); + ContainerHealthStatus status = + new ContainerHealthStatus(container, replicas, placementPolicy); + + UnhealthyContainersRecord rec = underReplicatedRecord(); + assertTrue(ContainerHealthTask.ContainerHealthRecords + .retainOrUpdateRecord(status, rec)); + // The record actual count should be updated from 1 -> 2 + assertEquals(2, rec.getActualReplicaCount().intValue()); + assertEquals(1, rec.getReplicaDelta().intValue()); + + // Missing / Over / Mis replicated should not be retained + assertFalse(ContainerHealthTask.ContainerHealthRecords + .retainOrUpdateRecord(status, missingRecord())); + assertFalse(ContainerHealthTask.ContainerHealthRecords + .retainOrUpdateRecord(status, overReplicatedRecord())); + assertFalse(ContainerHealthTask.ContainerHealthRecords + .retainOrUpdateRecord(status, misReplicatedRecord())); + + // Container is now replicated OK - should be removed. + replicas = generateReplicas(container, CLOSED, CLOSED, CLOSED); + status = new ContainerHealthStatus(container, replicas, placementPolicy); + assertFalse(ContainerHealthTask.ContainerHealthRecords + .retainOrUpdateRecord(status, rec)); + } + + @Test + public void testOverReplicatedRecordRetainedAndUpdated() { + // under replicated container + Set replicas = + generateReplicas(container, CLOSED, CLOSED, CLOSED, CLOSED); + ContainerHealthStatus status = + new ContainerHealthStatus(container, replicas, placementPolicy); + + UnhealthyContainersRecord rec = overReplicatedRecord(); + assertTrue(ContainerHealthTask.ContainerHealthRecords + .retainOrUpdateRecord(status, rec)); + // The record actual count should be updated from 5 -> 4 + assertEquals(4, rec.getActualReplicaCount().intValue()); + assertEquals(-1, rec.getReplicaDelta().intValue()); + + // Missing / Over / Mis replicated should not be retained + assertFalse(ContainerHealthTask.ContainerHealthRecords + .retainOrUpdateRecord(status, missingRecord())); + assertFalse(ContainerHealthTask.ContainerHealthRecords + .retainOrUpdateRecord(status, underReplicatedRecord())); + assertFalse(ContainerHealthTask.ContainerHealthRecords + .retainOrUpdateRecord(status, misReplicatedRecord())); + + // Container is now replicated OK - should be removed. + replicas = generateReplicas(container, CLOSED, CLOSED, CLOSED); + status = new ContainerHealthStatus(container, replicas, placementPolicy); + assertFalse(ContainerHealthTask.ContainerHealthRecords + .retainOrUpdateRecord(status, rec)); + } + + @Test + public void testMisReplicatedRecordRetainedAndUpdated() { + // under replicated container + Set replicas = + generateReplicas(container, CLOSED, CLOSED, CLOSED); + when(placementPolicy.validateContainerPlacement( + Mockito.anyList(), Mockito.anyInt())) + .thenReturn(new ContainerPlacementStatusDefault(2, 3, 5)); + ContainerHealthStatus status = + new ContainerHealthStatus(container, replicas, placementPolicy); + + UnhealthyContainersRecord rec = misReplicatedRecord(); + assertTrue(ContainerHealthTask.ContainerHealthRecords + .retainOrUpdateRecord(status, rec)); + // The record actual count should be updated from 1 -> 2 + assertEquals(2, rec.getActualReplicaCount().intValue()); + assertEquals(1, rec.getReplicaDelta().intValue()); + assertNotNull(rec.getReason()); + + // Missing / Over / Mis replicated should not be retained + assertFalse(ContainerHealthTask.ContainerHealthRecords + .retainOrUpdateRecord(status, missingRecord())); + assertFalse(ContainerHealthTask.ContainerHealthRecords + .retainOrUpdateRecord(status, underReplicatedRecord())); + assertFalse(ContainerHealthTask.ContainerHealthRecords + .retainOrUpdateRecord(status, overReplicatedRecord())); + + // Container is now placed OK - should be removed. + when(placementPolicy.validateContainerPlacement( + Mockito.anyList(), Mockito.anyInt())) + .thenReturn(new ContainerPlacementStatusDefault(3, 3, 5)); + status = new ContainerHealthStatus(container, replicas, placementPolicy); + assertFalse(ContainerHealthTask.ContainerHealthRecords + .retainOrUpdateRecord(status, rec)); + } + + @Test + public void testCorrectRecordsGenerated() { + Set replicas = + generateReplicas(container, CLOSED, CLOSED, CLOSED); + + // HEALTHY container - no records generated. + ContainerHealthStatus status = + new ContainerHealthStatus(container, replicas, placementPolicy); + List records = + ContainerHealthTask.ContainerHealthRecords + .generateUnhealthyRecords(status, (long)1234567); + assertEquals(0, records.size()); + + // Over-replicated - expect 1 over replicated record + replicas = + generateReplicas(container, CLOSED, CLOSED, CLOSED, CLOSED, CLOSED); + status = + new ContainerHealthStatus(container, replicas, placementPolicy); + records = ContainerHealthTask.ContainerHealthRecords + .generateUnhealthyRecords(status, (long)1234567); + assertEquals(1, records.size()); + UnhealthyContainers rec = records.get(0); + assertEquals(UnHealthyContainerStates.OVER_REPLICATED.toString(), + rec.getContainerState()); + assertEquals(3, rec.getExpectedReplicaCount().intValue()); + assertEquals(5, rec.getActualReplicaCount().intValue()); + assertEquals(-2, rec.getReplicaDelta().intValue()); + + // Under and Mis Replicated - expect 2 records - mis and under replicated + replicas = + generateReplicas(container, CLOSED, CLOSED); + when(placementPolicy.validateContainerPlacement( + Mockito.anyList(), Mockito.anyInt())) + .thenReturn(new ContainerPlacementStatusDefault(1, 2, 5)); + status = + new ContainerHealthStatus(container, replicas, placementPolicy); + records = ContainerHealthTask.ContainerHealthRecords + .generateUnhealthyRecords(status, (long)1234567); + assertEquals(2, records.size()); + + rec = findRecordForState(records, UnHealthyContainerStates.MIS_REPLICATED); + assertEquals(UnHealthyContainerStates.MIS_REPLICATED.toString(), + rec.getContainerState()); + assertEquals(2, rec.getExpectedReplicaCount().intValue()); + assertEquals(1, rec.getActualReplicaCount().intValue()); + assertEquals(1, rec.getReplicaDelta().intValue()); + assertNotNull(rec.getReason()); + + rec = findRecordForState(records, + UnHealthyContainerStates.UNDER_REPLICATED); + assertEquals(UnHealthyContainerStates.UNDER_REPLICATED.toString(), + rec.getContainerState()); + assertEquals(3, rec.getExpectedReplicaCount().intValue()); + assertEquals(2, rec.getActualReplicaCount().intValue()); + assertEquals(1, rec.getReplicaDelta().intValue()); + + // Missing Record - expect just a single missing record even though + // it is mis-replicated too + replicas.clear(); + when(placementPolicy.validateContainerPlacement( + Mockito.anyList(), Mockito.anyInt())) + .thenReturn(new ContainerPlacementStatusDefault(1, 2, 5)); + status = + new ContainerHealthStatus(container, replicas, placementPolicy); + records = ContainerHealthTask.ContainerHealthRecords + .generateUnhealthyRecords(status, (long)1234567); + assertEquals(1, records.size()); + rec = records.get(0); + assertEquals(UnHealthyContainerStates.MISSING.toString(), + rec.getContainerState()); + assertEquals(3, rec.getExpectedReplicaCount().intValue()); + assertEquals(0, rec.getActualReplicaCount().intValue()); + assertEquals(3, rec.getReplicaDelta().intValue()); + } + + @Test + public void testRecordNotGeneratedIfAlreadyExists() { + Set existingRec = new HashSet<>(); + for (UnHealthyContainerStates s : UnHealthyContainerStates.values()) { + existingRec.add(s.toString()); + } + + // Over-replicated + Set replicas = generateReplicas( + container, CLOSED, CLOSED, CLOSED, CLOSED, CLOSED); + ContainerHealthStatus status = + new ContainerHealthStatus(container, replicas, placementPolicy); + List records = + ContainerHealthTask.ContainerHealthRecords + .generateUnhealthyRecords(status, existingRec, (long)1234567); + assertEquals(0, records.size()); + + // Missing + replicas.clear(); + status = new ContainerHealthStatus(container, replicas, placementPolicy); + records = ContainerHealthTask.ContainerHealthRecords + .generateUnhealthyRecords(status, existingRec, (long)1234567); + assertEquals(0, records.size()); + + // Under and Mis-Replicated + replicas = generateReplicas(container, CLOSED, CLOSED); + when(placementPolicy.validateContainerPlacement( + Mockito.anyList(), Mockito.anyInt())) + .thenReturn(new ContainerPlacementStatusDefault(1, 2, 5)); + status = new ContainerHealthStatus(container, replicas, placementPolicy); + records = ContainerHealthTask.ContainerHealthRecords + .generateUnhealthyRecords(status, existingRec, (long)1234567); + assertEquals(0, records.size()); + } + + private UnhealthyContainers findRecordForState( + List recs, UnHealthyContainerStates state) { + for (UnhealthyContainers r : recs) { + if (r.getContainerState().equals(state.toString())) { + return r; + } + } + return null; + } + + private UnhealthyContainersRecord missingRecord() { + return new UnhealthyContainersRecord(container.containerID().getId(), + UnHealthyContainerStates.MISSING.toString(), new Long(10), + 3, 0, 3, null); + } + + private UnhealthyContainersRecord underReplicatedRecord() { + return new UnhealthyContainersRecord(container.containerID().getId(), + UnHealthyContainerStates.UNDER_REPLICATED.toString(), + new Long(10), 3, 1, 2, null); + } + + private UnhealthyContainersRecord overReplicatedRecord() { + return new UnhealthyContainersRecord(container.containerID().getId(), + UnHealthyContainerStates.OVER_REPLICATED.toString(), new Long(10), + 3, 5, -2, null); + } + + private UnhealthyContainersRecord misReplicatedRecord() { + return new UnhealthyContainersRecord(container.containerID().getId(), + UnHealthyContainerStates.MIS_REPLICATED.toString(), new Long(10), + 3, 1, 2, "should be on 1 more rack"); + } + + private Set generateReplicas(ContainerInfo cont, + ContainerReplicaProto.State...states) { + Set replicas = new HashSet<>(); + for (ContainerReplicaProto.State s : states) { + replicas.add(new ContainerReplica.ContainerReplicaBuilder() + .setContainerID(cont.containerID()) + .setDatanodeDetails(MockDatanodeDetails.randomDatanodeDetails()) + .setContainerState(s) + .build()); + } + return replicas; + } + +} diff --git a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/fsck/TestMissingContainerTask.java b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/fsck/TestMissingContainerTask.java deleted file mode 100644 index 153f05a7e5f1..000000000000 --- a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/fsck/TestMissingContainerTask.java +++ /dev/null @@ -1,120 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - *

- * http://www.apache.org/licenses/LICENSE-2.0 - *

- * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.ozone.recon.fsck; - -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; - -import java.util.Arrays; -import java.util.Collections; -import java.util.HashSet; -import java.util.List; -import java.util.Set; - -import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReplicaProto.State; -import org.apache.hadoop.hdds.scm.container.ContainerID; -import org.apache.hadoop.hdds.scm.container.ContainerManager; -import org.apache.hadoop.hdds.scm.container.ContainerReplica; -import org.apache.hadoop.ozone.recon.persistence.ContainerSchemaManager; -import org.apache.hadoop.ozone.recon.scm.ReconStorageContainerManagerFacade; -import org.apache.hadoop.ozone.recon.tasks.ReconTaskConfig; -import org.apache.hadoop.test.LambdaTestUtils; -import org.hadoop.ozone.recon.schema.ContainerSchemaDefinition; -import org.hadoop.ozone.recon.schema.tables.daos.ContainerHistoryDao; -import org.apache.hadoop.ozone.recon.persistence.AbstractReconSqlDBTest; -import org.hadoop.ozone.recon.schema.tables.daos.MissingContainersDao; -import org.hadoop.ozone.recon.schema.tables.daos.ReconTaskStatusDao; -import org.hadoop.ozone.recon.schema.tables.pojos.MissingContainers; -import org.hadoop.ozone.recon.schema.tables.pojos.ReconTaskStatus; -import org.junit.Assert; -import org.junit.Test; - -/** - * Class to test single run of Missing Container Task. - */ -public class TestMissingContainerTask extends AbstractReconSqlDBTest { - - @Test - public void testRun() throws Exception { - MissingContainersDao missingContainersTableHandle = - getDao(MissingContainersDao.class); - - ContainerSchemaManager containerSchemaManager = - new ContainerSchemaManager( - mock(ContainerHistoryDao.class), - getSchemaDefinition(ContainerSchemaDefinition.class), - missingContainersTableHandle); - ReconStorageContainerManagerFacade scmMock = - mock(ReconStorageContainerManagerFacade.class); - ContainerManager containerManagerMock = mock(ContainerManager.class); - ContainerReplica unhealthyReplicaMock = mock(ContainerReplica.class); - when(unhealthyReplicaMock.getState()).thenReturn(State.UNHEALTHY); - ContainerReplica healthyReplicaMock = mock(ContainerReplica.class); - when(healthyReplicaMock.getState()).thenReturn(State.CLOSED); - when(scmMock.getContainerManager()).thenReturn(containerManagerMock); - when(containerManagerMock.getContainerIDs()) - .thenReturn(getMockContainerIDs(3)); - // return one HEALTHY and one UNHEALTHY replica for container ID 1 - when(containerManagerMock.getContainerReplicas(new ContainerID(1L))) - .thenReturn(Collections.unmodifiableSet( - new HashSet<>( - Arrays.asList(healthyReplicaMock, unhealthyReplicaMock) - ))); - // return one UNHEALTHY replica for container ID 2 - when(containerManagerMock.getContainerReplicas(new ContainerID(2L))) - .thenReturn(Collections.singleton(unhealthyReplicaMock)); - // return 0 replicas for container ID 3 - when(containerManagerMock.getContainerReplicas(new ContainerID(3L))) - .thenReturn(Collections.emptySet()); - - List all = missingContainersTableHandle.findAll(); - Assert.assertTrue(all.isEmpty()); - - long currentTime = System.currentTimeMillis(); - ReconTaskStatusDao reconTaskStatusDao = getDao(ReconTaskStatusDao.class); - ReconTaskConfig reconTaskConfig = new ReconTaskConfig(); - reconTaskConfig.setMissingContainerTaskInterval(60); - MissingContainerTask missingContainerTask = - new MissingContainerTask(scmMock.getContainerManager(), - reconTaskStatusDao, containerSchemaManager, reconTaskConfig); - missingContainerTask.start(); - - LambdaTestUtils.await(6000, 1000, () -> - (containerSchemaManager.getAllMissingContainers().size() == 2)); - all = containerSchemaManager.getAllMissingContainers(); - // Container IDs 2 and 3 should be present in the missing containers table - Set missingContainerIDs = Collections.unmodifiableSet( - new HashSet<>(Arrays.asList(2L, 3L)) - ); - Assert.assertTrue(all.stream().allMatch(r -> - missingContainerIDs.contains(r.getContainerId()))); - ReconTaskStatus taskStatus = - reconTaskStatusDao.findById(missingContainerTask.getTaskName()); - Assert.assertTrue(taskStatus.getLastUpdatedTimestamp() > - currentTime); - } - - private Set getMockContainerIDs(int num) { - Set containerIDs = new HashSet<>(); - for (int i = 1; i <= num; i++) { - containerIDs.add(new ContainerID(i)); - } - return containerIDs; - } -} \ No newline at end of file