diff --git a/hadoop-hdds/interface-client/src/main/proto/hdds.proto b/hadoop-hdds/interface-client/src/main/proto/hdds.proto index cb4862cb6f39..18c398808c99 100644 --- a/hadoop-hdds/interface-client/src/main/proto/hdds.proto +++ b/hadoop-hdds/interface-client/src/main/proto/hdds.proto @@ -424,6 +424,7 @@ message ContainerReplicaHistoryProto { required int64 lastSeenTime = 3; required int64 bcsId = 4; optional string state = 5; + optional int64 dataChecksum = 6; } message SCMContainerReplicaProto { diff --git a/hadoop-ozone/recon-codegen/src/main/java/org/hadoop/ozone/recon/schema/ContainerSchemaDefinition.java b/hadoop-ozone/recon-codegen/src/main/java/org/hadoop/ozone/recon/schema/ContainerSchemaDefinition.java index 0c778aead5d3..de05a85bca57 100644 --- a/hadoop-ozone/recon-codegen/src/main/java/org/hadoop/ozone/recon/schema/ContainerSchemaDefinition.java +++ b/hadoop-ozone/recon-codegen/src/main/java/org/hadoop/ozone/recon/schema/ContainerSchemaDefinition.java @@ -52,7 +52,8 @@ public enum UnHealthyContainerStates { OVER_REPLICATED, MIS_REPLICATED, ALL_REPLICAS_BAD, - NEGATIVE_SIZE // Added new state to track containers with negative sizes + NEGATIVE_SIZE, // Added new state to track containers with negative sizes + REPLICA_MISMATCH } private static final String CONTAINER_ID = "container_id"; diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/types/UnhealthyContainersResponse.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/types/UnhealthyContainersResponse.java index eaf08d9ca83e..2332664fde8b 100644 --- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/types/UnhealthyContainersResponse.java +++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/types/UnhealthyContainersResponse.java @@ -50,6 +50,12 @@ public class UnhealthyContainersResponse { @JsonProperty("misReplicatedCount") private long misReplicatedCount = 0; + /** + * Total count of containers that have replicas with mismatched data checksums. + */ + @JsonProperty("replicaMismatchCount") + private long replicaMismatchCount = 0; + /** * A collection of unhealthy containers. */ @@ -77,6 +83,9 @@ public void setSummaryCount(String state, long count) { } else if (state.equals( UnHealthyContainerStates.MIS_REPLICATED.toString())) { this.misReplicatedCount = count; + } else if (state.equals( + UnHealthyContainerStates.REPLICA_MISMATCH.toString())) { + this.replicaMismatchCount = count; } } @@ -96,6 +105,10 @@ public long getMisReplicatedCount() { return misReplicatedCount; } + public long getReplicaMismatchCount() { + return replicaMismatchCount; + } + public Collection getContainers() { return containers; } diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/fsck/ContainerHealthStatus.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/fsck/ContainerHealthStatus.java index 5e6d55ce706a..7313d218afc7 100644 --- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/fsck/ContainerHealthStatus.java +++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/fsck/ContainerHealthStatus.java @@ -47,6 +47,7 @@ public class ContainerHealthStatus { private final ContainerInfo container; private final int replicaDelta; + private final Set replicas; private final Set healthyReplicas; private final Set healthyAvailReplicas; private final ContainerPlacementStatus placementStatus; @@ -64,6 +65,7 @@ public class ContainerHealthStatus { this.reconContainerMetadataManager = reconContainerMetadataManager; this.container = container; int repFactor = container.getReplicationConfig().getRequiredNodes(); + this.replicas = replicas; this.healthyReplicas = replicas .stream() .filter(r -> !r.getState() @@ -160,6 +162,13 @@ public boolean isEmpty() { return numKeys == 0; } + public boolean isDataChecksumMismatched() { + return !replicas.isEmpty() && replicas.stream() + .map(ContainerReplica::getDataChecksum) + .distinct() + .count() != 1; + } + private ContainerPlacementStatus getPlacementStatus( PlacementPolicy policy, int repFactor) { List dns = healthyReplicas.stream() @@ -182,19 +191,19 @@ public long getNumKeys() { } private ContainerReplicaCount getContainerReplicaCountInstance( - OzoneConfiguration conf, Set replicas) { + OzoneConfiguration conf, Set containerReplicas) { ReplicationManager.ReplicationManagerConfiguration rmConf = conf.getObject( ReplicationManager.ReplicationManagerConfiguration.class); boolean isEC = container.getReplicationConfig() .getReplicationType() == HddsProtos.ReplicationType.EC; return isEC ? new ECContainerReplicaCount(container, - replicas, new ArrayList<>(), + containerReplicas, new ArrayList<>(), rmConf.getMaintenanceRemainingRedundancy()) : // This class ignores unhealthy replicas, // therefore set 'considerUnhealthy' to false. new RatisContainerReplicaCount(container, - replicas, new ArrayList<>(), + containerReplicas, new ArrayList<>(), rmConf.getMaintenanceReplicaMinimum(), false); } } diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/fsck/ContainerHealthTask.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/fsck/ContainerHealthTask.java index 3893123f3a82..c842ffa4a7a2 100644 --- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/fsck/ContainerHealthTask.java +++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/fsck/ContainerHealthTask.java @@ -117,7 +117,7 @@ public void run() { Thread.sleep(interval); } } catch (Throwable t) { - LOG.error("Exception in Missing Container task Thread.", t); + LOG.error("Exception in Container Health task thread.", t); if (t instanceof InterruptedException) { Thread.currentThread().interrupt(); } @@ -234,6 +234,8 @@ private void initializeUnhealthyContainerStateStatsMap( UnHealthyContainerStates.MIS_REPLICATED, new HashMap<>()); unhealthyContainerStateStatsMap.put( UnHealthyContainerStates.NEGATIVE_SIZE, new HashMap<>()); + unhealthyContainerStateStatsMap.put( + UnHealthyContainerStates.REPLICA_MISMATCH, new HashMap<>()); } private ContainerHealthStatus setCurrentContainer(long recordId) @@ -352,7 +354,7 @@ private void processContainer(ContainerInfo container, long currentTime, containerReplicas, placementPolicy, reconContainerMetadataManager, conf); - if (h.isHealthilyReplicated() || h.isDeleted()) { + if ((h.isHealthilyReplicated() && !h.isDataChecksumMismatched()) || h.isDeleted()) { return; } // For containers deleted in SCM, we sync the container state here. @@ -486,7 +488,7 @@ public static class ContainerHealthRecords { */ public static boolean retainOrUpdateRecord( ContainerHealthStatus container, UnhealthyContainersRecord rec) { - boolean returnValue = false; + boolean returnValue; switch (UnHealthyContainerStates.valueOf(rec.getContainerState())) { case MISSING: returnValue = container.isMissing() && !container.isEmpty(); @@ -500,6 +502,9 @@ public static boolean retainOrUpdateRecord( case OVER_REPLICATED: returnValue = keepOverReplicatedRecord(container, rec); break; + case REPLICA_MISMATCH: + returnValue = keepReplicaMismatchRecord(container, rec); + break; default: returnValue = false; } @@ -528,7 +533,7 @@ public static List generateUnhealthyRecords( Map> unhealthyContainerStateStatsMap) { List records = new ArrayList<>(); - if (container.isHealthilyReplicated() || container.isDeleted()) { + if ((container.isHealthilyReplicated() && !container.isDataChecksumMismatched()) || container.isDeleted()) { return records; } @@ -593,6 +598,16 @@ public static List generateUnhealthyRecords( unhealthyContainerStateStatsMap); } + if (container.isDataChecksumMismatched() + && !recordForStateExists.contains( + UnHealthyContainerStates.REPLICA_MISMATCH.toString())) { + records.add(recordForState( + container, UnHealthyContainerStates.REPLICA_MISMATCH, time)); + populateContainerStats(container, + UnHealthyContainerStates.REPLICA_MISMATCH, + unhealthyContainerStateStatsMap); + } + return records; } @@ -650,6 +665,17 @@ private static boolean keepMisReplicatedRecord( return false; } + private static boolean keepReplicaMismatchRecord( + ContainerHealthStatus container, UnhealthyContainersRecord rec) { + if (container.isDataChecksumMismatched()) { + updateExpectedReplicaCount(rec, container.getReplicationFactor()); + updateActualReplicaCount(rec, container.getReplicaCount()); + updateReplicaDelta(rec, container.replicaDelta()); + return true; + } + return false; + } + /** * With a Jooq record, if you update any field in the record, the record * is marked as changed, even if you updated it to the same value as it is diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/persistence/ContainerHistory.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/persistence/ContainerHistory.java index 32b479a19ea5..690c5facb04b 100644 --- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/persistence/ContainerHistory.java +++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/persistence/ContainerHistory.java @@ -32,10 +32,12 @@ public class ContainerHistory implements Serializable { private long lastSeenTime; private long lastBcsId; private String state; + private long dataChecksum; + @SuppressWarnings("parameternumber") public ContainerHistory(long containerId, String datanodeUuid, String datanodeHost, long firstSeenTime, - long lastSeenTime, long lastBcsId, String state) { + long lastSeenTime, long lastBcsId, String state, long dataChecksum) { this.containerId = containerId; this.datanodeUuid = datanodeUuid; this.datanodeHost = datanodeHost; @@ -43,6 +45,7 @@ public ContainerHistory(long containerId, String datanodeUuid, this.lastSeenTime = lastSeenTime; this.lastBcsId = lastBcsId; this.state = state; + this.dataChecksum = dataChecksum; } // Default constructor, used by jackson lib for object deserialization. @@ -100,4 +103,12 @@ public String getState() { public void setState(String state) { this.state = state; } + + public long getDataChecksum() { + return dataChecksum; + } + + public void setDataChecksum(long dataChecksum) { + this.dataChecksum = dataChecksum; + } } diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ContainerReplicaHistory.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ContainerReplicaHistory.java index 6ba50fe50d5f..805b46638d0d 100644 --- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ContainerReplicaHistory.java +++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ContainerReplicaHistory.java @@ -41,14 +41,16 @@ public class ContainerReplicaHistory { private long bcsId; private String state; + private long dataChecksum; public ContainerReplicaHistory(UUID id, Long firstSeenTime, - Long lastSeenTime, long bcsId, String state) { + Long lastSeenTime, long bcsId, String state, long dataChecksum) { this.uuid = id; this.firstSeenTime = firstSeenTime; this.lastSeenTime = lastSeenTime; this.bcsId = bcsId; this.state = state; + this.dataChecksum = dataChecksum; } public long getBcsId() { @@ -83,16 +85,24 @@ public void setState(String state) { this.state = state; } + public long getDataChecksum() { + return dataChecksum; + } + + public void setDataChecksum(long dataChecksum) { + this.dataChecksum = dataChecksum; + } + public static ContainerReplicaHistory fromProto( ContainerReplicaHistoryProto proto) { return new ContainerReplicaHistory(UUID.fromString(proto.getUuid()), proto.getFirstSeenTime(), proto.getLastSeenTime(), proto.getBcsId(), - proto.getState()); + proto.getState(), proto.getDataChecksum()); } public ContainerReplicaHistoryProto toProto() { return ContainerReplicaHistoryProto.newBuilder().setUuid(uuid.toString()) .setFirstSeenTime(firstSeenTime).setLastSeenTime(lastSeenTime) - .setBcsId(bcsId).setState(state).build(); + .setBcsId(bcsId).setState(state).setDataChecksum(dataChecksum).build(); } } diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconContainerManager.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconContainerManager.java index 7bf51fcc4df8..6a58ced544fb 100644 --- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconContainerManager.java +++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconContainerManager.java @@ -280,6 +280,7 @@ public void updateContainerReplica(ContainerID containerID, boolean flushToDB = false; long bcsId = replica.getSequenceId() != null ? replica.getSequenceId() : -1; String state = replica.getState().toString(); + long dataChecksum = replica.getDataChecksum(); // If replica doesn't exist in in-memory map, add to DB and add to map if (replicaLastSeenMap == null) { @@ -287,7 +288,7 @@ public void updateContainerReplica(ContainerID containerID, replicaHistoryMap.putIfAbsent(id, new ConcurrentHashMap() {{ put(uuid, new ContainerReplicaHistory(uuid, currTime, currTime, - bcsId, state)); + bcsId, state, dataChecksum)); }}); flushToDB = true; } else { @@ -297,7 +298,7 @@ public void updateContainerReplica(ContainerID containerID, // New Datanode replicaLastSeenMap.put(uuid, new ContainerReplicaHistory(uuid, currTime, currTime, bcsId, - state)); + state, dataChecksum)); flushToDB = true; } else { // if the object exists, only update the last seen time & bcsId fields @@ -308,7 +309,7 @@ public void updateContainerReplica(ContainerID containerID, } if (flushToDB) { - upsertContainerHistory(id, uuid, currTime, bcsId, state); + upsertContainerHistory(id, uuid, currTime, bcsId, state, dataChecksum); } } @@ -325,6 +326,7 @@ public void removeContainerReplica(ContainerID containerID, final DatanodeDetails dnInfo = replica.getDatanodeDetails(); final UUID uuid = dnInfo.getUuid(); String state = replica.getState().toString(); + long dataChecksum = replica.getDataChecksum(); final Map replicaLastSeenMap = replicaHistoryMap.get(id); @@ -333,7 +335,7 @@ public void removeContainerReplica(ContainerID containerID, if (ts != null) { // Flush to DB, then remove from in-memory map upsertContainerHistory(id, uuid, ts.getLastSeenTime(), ts.getBcsId(), - state); + state, dataChecksum); replicaLastSeenMap.remove(uuid); } } @@ -392,9 +394,10 @@ public List getAllContainerHistory(long containerID) { final long lastSeenTime = entry.getValue().getLastSeenTime(); long bcsId = entry.getValue().getBcsId(); String state = entry.getValue().getState(); + long dataChecksum = entry.getValue().getDataChecksum(); resList.add(new ContainerHistory(containerID, uuid.toString(), hostname, - firstSeenTime, lastSeenTime, bcsId, state)); + firstSeenTime, lastSeenTime, bcsId, state, dataChecksum)); } return resList; } @@ -429,7 +432,7 @@ public void flushReplicaHistoryMapToDB(boolean clearMap) { } public void upsertContainerHistory(long containerID, UUID uuid, long time, - long bcsId, String state) { + long bcsId, String state, long dataChecksum) { Map tsMap; try { tsMap = cdbServiceProvider.getContainerReplicaHistory(containerID); @@ -437,11 +440,12 @@ public void upsertContainerHistory(long containerID, UUID uuid, long time, if (ts == null) { // New entry tsMap.put(uuid, new ContainerReplicaHistory(uuid, time, time, bcsId, - state)); + state, dataChecksum)); } else { // Entry exists, update last seen time and put it back to DB. ts.setLastSeenTime(time); ts.setState(state); + ts.setDataChecksum(dataChecksum); } cdbServiceProvider.storeContainerReplicaHistory(containerID, tsMap); } catch (IOException e) { diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/upgrade/ReconLayoutFeature.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/upgrade/ReconLayoutFeature.java index 05393237913c..16aab643cbf6 100644 --- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/upgrade/ReconLayoutFeature.java +++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/upgrade/ReconLayoutFeature.java @@ -33,7 +33,8 @@ public enum ReconLayoutFeature { // Represents the starting point for Recon's layout versioning system. INITIAL_VERSION(0, "Recon Layout Versioning Introduction"), - TASK_STATUS_STATISTICS(1, "Recon Task Status Statistics Tracking Introduced"); + TASK_STATUS_STATISTICS(1, "Recon Task Status Statistics Tracking Introduced"), + UNHEALTHY_CONTAINER_REPLICA_MISMATCH(2, "Adding replica mismatch state to the unhealthy container table"); private final int version; private final String description; diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/upgrade/UnhealthyContainerReplicaMismatchAction.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/upgrade/UnhealthyContainerReplicaMismatchAction.java new file mode 100644 index 000000000000..3ff8a51ccbf5 --- /dev/null +++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/upgrade/UnhealthyContainerReplicaMismatchAction.java @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.recon.upgrade; + +import static org.apache.hadoop.ozone.recon.upgrade.ReconLayoutFeature.UNHEALTHY_CONTAINER_REPLICA_MISMATCH; +import static org.apache.hadoop.ozone.recon.upgrade.ReconUpgradeAction.UpgradeActionType.FINALIZE; +import static org.hadoop.ozone.recon.codegen.SqlDbUtils.TABLE_EXISTS_CHECK; +import static org.hadoop.ozone.recon.schema.ContainerSchemaDefinition.UNHEALTHY_CONTAINERS_TABLE_NAME; +import static org.jooq.impl.DSL.field; +import static org.jooq.impl.DSL.name; + +import java.sql.Connection; +import java.sql.SQLException; +import java.util.Arrays; +import javax.sql.DataSource; +import org.apache.hadoop.ozone.recon.scm.ReconStorageContainerManagerFacade; +import org.hadoop.ozone.recon.schema.ContainerSchemaDefinition; +import org.jooq.DSLContext; +import org.jooq.impl.DSL; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Upgrade action for handling the addition of a new unhealthy container state in Recon, which will be for containers, + * that have replicas with different data checksums. + */ +@UpgradeActionRecon(feature = UNHEALTHY_CONTAINER_REPLICA_MISMATCH, type = FINALIZE) +public class UnhealthyContainerReplicaMismatchAction implements ReconUpgradeAction { + private static final Logger LOG = LoggerFactory.getLogger(InitialConstraintUpgradeAction.class); + private DataSource dataSource; + private DSLContext dslContext; + + @Override + public void execute(ReconStorageContainerManagerFacade scmFacade) throws Exception { + this.dataSource = scmFacade.getDataSource(); + try (Connection conn = dataSource.getConnection()) { + if (!TABLE_EXISTS_CHECK.test(conn, UNHEALTHY_CONTAINERS_TABLE_NAME)) { + return; + } + dslContext = DSL.using(conn); + // Drop the existing constraint + dropConstraint(); + // Add the updated constraint with all enum states + addUpdatedConstraint(); + } catch (SQLException e) { + throw new SQLException("Failed to execute UnhealthyContainerReplicaMismatchAction", e); + } + } + + /** + * Drops the existing constraint from the UNHEALTHY_CONTAINERS table. + */ + private void dropConstraint() { + String constraintName = UNHEALTHY_CONTAINERS_TABLE_NAME + "ck1"; + dslContext.alterTable(UNHEALTHY_CONTAINERS_TABLE_NAME) + .dropConstraint(constraintName) + .execute(); + LOG.debug("Dropped the existing constraint: {}", constraintName); + } + + /** + * Adds the updated constraint directly within this class. + */ + private void addUpdatedConstraint() { + String[] enumStates = Arrays + .stream(ContainerSchemaDefinition.UnHealthyContainerStates.values()) + .map(Enum::name) + .toArray(String[]::new); + + dslContext.alterTable(ContainerSchemaDefinition.UNHEALTHY_CONTAINERS_TABLE_NAME) + .add(DSL.constraint(ContainerSchemaDefinition.UNHEALTHY_CONTAINERS_TABLE_NAME + "ck1") + .check(field(name("container_state")) + .in(enumStates))) + .execute(); + + LOG.info("Added the updated constraint to the UNHEALTHY_CONTAINERS table for enum state values: {}", + Arrays.toString(enumStates)); + } + + @Override + public UpgradeActionType getType() { + return FINALIZE; + } +} diff --git a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/api/TestContainerEndpoint.java b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/api/TestContainerEndpoint.java index da5484c9b898..b3759bbab6e5 100644 --- a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/api/TestContainerEndpoint.java +++ b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/api/TestContainerEndpoint.java @@ -733,7 +733,7 @@ public void testGetMissingContainers() throws IOException, TimeoutException { uuid2 = newDatanode("host2", "127.0.0.2"); uuid3 = newDatanode("host3", "127.0.0.3"); uuid4 = newDatanode("host4", "127.0.0.4"); - createUnhealthyRecords(5, 0, 0, 0); + createUnhealthyRecords(5, 0, 0, 0, 0); Response responseWithLimit = containerEndpoint.getMissingContainers(3); MissingContainersResponse responseWithLimitObject @@ -746,6 +746,9 @@ public void testGetMissingContainers() throws IOException, TimeoutException { assertTrue(containerWithLimit.getReplicas().stream() .map(ContainerHistory::getState) .allMatch(s -> s.equals("UNHEALTHY"))); + assertTrue(containerWithLimit.getReplicas().stream() + .map(ContainerHistory::getDataChecksum) + .allMatch(s -> s.equals(1234L))); Collection recordsWithLimit = responseWithLimitObject.getContainers(); @@ -810,12 +813,12 @@ public void testUnhealthyContainers() throws IOException, TimeoutException { assertEquals(Collections.EMPTY_LIST, responseObject.getContainers()); - putContainerInfos(14); + putContainerInfos(15); uuid1 = newDatanode("host1", "127.0.0.1"); uuid2 = newDatanode("host2", "127.0.0.2"); uuid3 = newDatanode("host3", "127.0.0.3"); uuid4 = newDatanode("host4", "127.0.0.4"); - createUnhealthyRecords(5, 4, 3, 2); + createUnhealthyRecords(5, 4, 3, 2, 1); response = containerEndpoint.getUnhealthyContainers(1000, 1); @@ -824,6 +827,7 @@ public void testUnhealthyContainers() throws IOException, TimeoutException { assertEquals(4, responseObject.getOverReplicatedCount()); assertEquals(3, responseObject.getUnderReplicatedCount()); assertEquals(2, responseObject.getMisReplicatedCount()); + assertEquals(1, responseObject.getReplicaMismatchCount()); Collection records = responseObject.getContainers(); @@ -892,6 +896,21 @@ public void testUnhealthyContainers() throws IOException, TimeoutException { assertEquals(12345L, misRep.get(0).getUnhealthySince()); assertEquals(13L, misRep.get(0).getContainerID()); assertEquals("some reason", misRep.get(0).getReason()); + + List replicaMismatch = records + .stream() + .filter(r -> r.getContainerState() + .equals(UnHealthyContainerStates.REPLICA_MISMATCH.toString())) + .collect(Collectors.toList()); + assertEquals(1, replicaMismatch.size()); + assertEquals(3, replicaMismatch.get(0).getExpectedReplicaCount()); + assertEquals(3, replicaMismatch.get(0).getActualReplicaCount()); + assertEquals(0, replicaMismatch.get(0).getReplicaDeltaCount()); + assertEquals(12345L, replicaMismatch.get(0).getUnhealthySince()); + assertEquals(15L, replicaMismatch.get(0).getContainerID()); + List replicas = replicaMismatch.get(0).getReplicas(); + assertTrue(replicas.stream().anyMatch(checksum -> checksum.getDataChecksum() == 1234L)); + assertTrue(replicas.stream().anyMatch(checksum -> checksum.getDataChecksum() == 2345L)); } @Test @@ -920,7 +939,7 @@ public void testUnhealthyContainersFilteredResponse() uuid2 = newDatanode("host2", "127.0.0.2"); uuid3 = newDatanode("host3", "127.0.0.3"); uuid4 = newDatanode("host4", "127.0.0.4"); - createUnhealthyRecords(5, 4, 3, 2); + createUnhealthyRecords(5, 4, 3, 2, 1); createEmptyMissingUnhealthyRecords(2); // For EMPTY_MISSING state createNegativeSizeUnhealthyRecords(2); // For NEGATIVE_SIZE state @@ -934,6 +953,7 @@ public void testUnhealthyContainersFilteredResponse() assertEquals(4, responseObject.getOverReplicatedCount()); assertEquals(3, responseObject.getUnderReplicatedCount()); assertEquals(2, responseObject.getMisReplicatedCount()); + assertEquals(1, responseObject.getReplicaMismatchCount()); Collection records = responseObject.getContainers(); assertTrue(records.stream() @@ -978,7 +998,7 @@ public void testUnhealthyContainersPaging() uuid2 = newDatanode("host2", "127.0.0.2"); uuid3 = newDatanode("host3", "127.0.0.3"); uuid4 = newDatanode("host4", "127.0.0.4"); - createUnhealthyRecords(5, 4, 3, 2); + createUnhealthyRecords(5, 4, 3, 2, 0); UnhealthyContainersResponse firstBatch = (UnhealthyContainersResponse) containerEndpoint.getUnhealthyContainers( 3, 1).getEntity(); @@ -1013,12 +1033,12 @@ public void testGetReplicaHistoryForContainer() throws IOException { final UUID u2 = newDatanode("host2", "127.0.0.2"); final UUID u3 = newDatanode("host3", "127.0.0.3"); final UUID u4 = newDatanode("host4", "127.0.0.4"); - reconContainerManager.upsertContainerHistory(1L, u1, 1L, 1L, "OPEN"); - reconContainerManager.upsertContainerHistory(1L, u2, 2L, 1L, "OPEN"); - reconContainerManager.upsertContainerHistory(1L, u3, 3L, 1L, "OPEN"); - reconContainerManager.upsertContainerHistory(1L, u4, 4L, 1L, "OPEN"); + reconContainerManager.upsertContainerHistory(1L, u1, 1L, 1L, "OPEN", 1234L); + reconContainerManager.upsertContainerHistory(1L, u2, 2L, 1L, "OPEN", 1234L); + reconContainerManager.upsertContainerHistory(1L, u3, 3L, 1L, "OPEN", 1234L); + reconContainerManager.upsertContainerHistory(1L, u4, 4L, 1L, "OPEN", 1234L); - reconContainerManager.upsertContainerHistory(1L, u1, 5L, 1L, "OPEN"); + reconContainerManager.upsertContainerHistory(1L, u1, 5L, 1L, "OPEN", 1234L); Response response = containerEndpoint.getReplicaHistoryForContainer(1L); List histories = @@ -1026,6 +1046,9 @@ public void testGetReplicaHistoryForContainer() throws IOException { assertTrue(histories.stream() .map(ContainerHistory::getState) .allMatch(s -> s.equals("OPEN"))); + assertTrue(histories.stream() + .map(ContainerHistory::getDataChecksum) + .allMatch(s -> s.equals(1234L))); Set datanodes = Collections.unmodifiableSet( new HashSet<>(Arrays.asList( u1.toString(), u2.toString(), u3.toString(), u4.toString()))); @@ -1065,7 +1088,7 @@ private void createEmptyMissingUnhealthyRecords(int emptyMissing) { int cid = 0; for (int i = 0; i < emptyMissing; i++) { createUnhealthyRecord(++cid, UnHealthyContainerStates.EMPTY_MISSING.toString(), - 3, 3, 0, null); + 3, 3, 0, null, false); } } @@ -1073,37 +1096,42 @@ private void createNegativeSizeUnhealthyRecords(int negativeSize) { int cid = 0; for (int i = 0; i < negativeSize; i++) { createUnhealthyRecord(++cid, UnHealthyContainerStates.NEGATIVE_SIZE.toString(), - 3, 3, 0, null); // Added for NEGATIVE_SIZE state + 3, 3, 0, null, false); // Added for NEGATIVE_SIZE state } } private void createUnhealthyRecords(int missing, int overRep, int underRep, - int misRep) { + int misRep, int dataChecksum) { int cid = 0; for (int i = 0; i < missing; i++) { createUnhealthyRecord(++cid, UnHealthyContainerStates.MISSING.toString(), - 3, 0, 3, null); + 3, 0, 3, null, false); } for (int i = 0; i < overRep; i++) { createUnhealthyRecord(++cid, UnHealthyContainerStates.OVER_REPLICATED.toString(), - 3, 5, -2, null); + 3, 5, -2, null, false); } for (int i = 0; i < underRep; i++) { createUnhealthyRecord(++cid, UnHealthyContainerStates.UNDER_REPLICATED.toString(), - 3, 1, 2, null); + 3, 1, 2, null, false); } for (int i = 0; i < misRep; i++) { createUnhealthyRecord(++cid, UnHealthyContainerStates.MIS_REPLICATED.toString(), - 2, 1, 1, "some reason"); + 2, 1, 1, "some reason", false); + } + for (int i = 0; i < dataChecksum; i++) { + createUnhealthyRecord(++cid, + UnHealthyContainerStates.REPLICA_MISMATCH.toString(), + 3, 3, 0, null, true); } } private void createUnhealthyRecord(int id, String state, int expected, - int actual, int delta, String reason) { + int actual, int delta, String reason, boolean dataChecksumMismatch) { long cID = Integer.toUnsignedLong(id); UnhealthyContainers missing = new UnhealthyContainers(); missing.setContainerId(cID); @@ -1118,14 +1146,16 @@ private void createUnhealthyRecord(int id, String state, int expected, missingList.add(missing); containerHealthSchemaManager.insertUnhealthyContainerRecords(missingList); + long differentChecksum = dataChecksumMismatch ? 2345L : 1234L; + reconContainerManager.upsertContainerHistory(cID, uuid1, 1L, 1L, - "UNHEALTHY"); + "UNHEALTHY", differentChecksum); reconContainerManager.upsertContainerHistory(cID, uuid2, 2L, 1L, - "UNHEALTHY"); + "UNHEALTHY", differentChecksum); reconContainerManager.upsertContainerHistory(cID, uuid3, 3L, 1L, - "UNHEALTHY"); + "UNHEALTHY", 1234L); reconContainerManager.upsertContainerHistory(cID, uuid4, 4L, 1L, - "UNHEALTHY"); + "UNHEALTHY", 1234L); } protected ContainerWithPipeline getTestContainer( diff --git a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/fsck/TestContainerHealthStatus.java b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/fsck/TestContainerHealthStatus.java index d404a168c74f..790e10850581 100644 --- a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/fsck/TestContainerHealthStatus.java +++ b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/fsck/TestContainerHealthStatus.java @@ -175,6 +175,40 @@ public void testOverReplicatedContainer() { assertEquals(0, status.misReplicatedDelta()); } + @Test + public void testSameDataChecksumContainer() { + Set replicas = generateReplicas(container, + ContainerReplicaProto.State.CLOSED, + ContainerReplicaProto.State.CLOSED, + ContainerReplicaProto.State.CLOSED); + ContainerHealthStatus status = + new ContainerHealthStatus(container, replicas, placementPolicy, + reconContainerMetadataManager, CONF); + assertTrue(status.isHealthilyReplicated()); + assertFalse(status.isMissing()); + assertFalse(status.isUnderReplicated()); + assertFalse(status.isOverReplicated()); + assertFalse(status.isMisReplicated()); + assertFalse(status.isDataChecksumMismatched()); + } + + @Test + public void testDataChecksumMismatchContainer() { + Set replicas = generateMismatchedReplicas(container, + ContainerReplicaProto.State.CLOSED, + ContainerReplicaProto.State.CLOSED, + ContainerReplicaProto.State.CLOSED); + ContainerHealthStatus status = + new ContainerHealthStatus(container, replicas, placementPolicy, + reconContainerMetadataManager, CONF); + assertTrue(status.isHealthilyReplicated()); + assertFalse(status.isMissing()); + assertFalse(status.isUnderReplicated()); + assertFalse(status.isOverReplicated()); + assertFalse(status.isMisReplicated()); + assertTrue(status.isDataChecksumMismatched()); + } + /** * Starting with a ContainerHealthStatus of 1 over-replicated container * replica and then updating a datanode to one of the out-of-service states. @@ -378,12 +412,29 @@ public void testMisReplicated() { private Set generateReplicas(ContainerInfo cont, ContainerReplicaProto.State...states) { Set replicas = new HashSet<>(); + for (ContainerReplicaProto.State s : states) { + replicas.add(new ContainerReplica.ContainerReplicaBuilder() + .setContainerID(cont.containerID()) + .setDatanodeDetails(MockDatanodeDetails.randomDatanodeDetails()) + .setDataChecksum(1234L) + .setContainerState(s) + .build()); + } + return replicas; + } + + private Set generateMismatchedReplicas(ContainerInfo cont, + ContainerReplicaProto.State...states) { + Set replicas = new HashSet<>(); + long checksum = 1234L; for (ContainerReplicaProto.State s : states) { replicas.add(new ContainerReplica.ContainerReplicaBuilder() .setContainerID(cont.containerID()) .setDatanodeDetails(MockDatanodeDetails.randomDatanodeDetails()) .setContainerState(s) + .setDataChecksum(checksum) .build()); + checksum++; } return replicas; } diff --git a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/fsck/TestContainerHealthTask.java b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/fsck/TestContainerHealthTask.java index 8c8b72ea4512..ebf6ddb4d87f 100644 --- a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/fsck/TestContainerHealthTask.java +++ b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/fsck/TestContainerHealthTask.java @@ -112,7 +112,7 @@ public void testRun() throws Exception { // Create 7 containers. The first 5 will have various unhealthy states // defined below. The container with ID=6 will be healthy and // container with ID=7 will be EMPTY_MISSING (but not inserted into DB) - List mockContainers = getMockContainers(7); + List mockContainers = getMockContainers(8); when(scmMock.getScmServiceProvider()).thenReturn(scmClientMock); when(scmMock.getContainerManager()).thenReturn(containerManagerMock); when(containerManagerMock.getContainers(any(ContainerID.class), @@ -181,6 +181,15 @@ public void testRun() throws Exception { when(reconContainerMetadataManager.getKeyCountForContainer( 7L)).thenReturn(5L); // Indicates non-empty container 7 for now + // container ID 8 - REPLICA_MISMATCH + ContainerInfo containerInfo8 = + TestContainerInfo.newBuilderForTest().setContainerID(8).setReplicationConfig(replicationConfig).build(); + when(containerManagerMock.getContainer(ContainerID.valueOf(8L))).thenReturn(containerInfo8); + Set mismatchReplicas = getMockReplicasChecksumMismatch(8L, + State.CLOSED, State.CLOSED, State.CLOSED); + when(containerManagerMock.getContainerReplicas(containerInfo8.containerID())) + .thenReturn(mismatchReplicas); + List all = unHealthyContainersTableHandle.findAll(); assertThat(all).isEmpty(); @@ -199,7 +208,7 @@ public void testRun() throws Exception { // Ensure unhealthy container count in DB matches expected LambdaTestUtils.await(60000, 1000, () -> - (unHealthyContainersTableHandle.count() == 5)); + (unHealthyContainersTableHandle.count() == 6)); // Check for UNDER_REPLICATED container states UnhealthyContainers rec = @@ -240,6 +249,12 @@ public void testRun() throws Exception { assertEquals(1, rec.getActualReplicaCount().intValue()); assertNotNull(rec.getReason()); + rec = unHealthyContainersTableHandle.fetchByContainerId(8L).get(0); + assertEquals("REPLICA_MISMATCH", rec.getContainerState()); + assertEquals(0, rec.getReplicaDelta().intValue()); + assertEquals(3, rec.getExpectedReplicaCount().intValue()); + assertEquals(3, rec.getActualReplicaCount().intValue()); + ReconTaskStatus taskStatus = reconTaskStatusDao.findById(containerHealthTask.getTaskName()); assertThat(taskStatus.getLastUpdatedTimestamp()) @@ -271,7 +286,7 @@ public void testRun() throws Exception { // Ensure count is reduced after EMPTY_MISSING containers are not inserted LambdaTestUtils.await(60000, 1000, () -> - (unHealthyContainersTableHandle.count() == 2)); + (unHealthyContainersTableHandle.count() == 3)); rec = unHealthyContainersTableHandle.fetchByContainerId(1L).get(0); assertEquals("UNDER_REPLICATED", rec.getContainerState()); @@ -295,7 +310,7 @@ public void testRun() throws Exception { // Just check once again that count remains consistent LambdaTestUtils.await(60000, 1000, () -> - (unHealthyContainersTableHandle.count() == 2)); + (unHealthyContainersTableHandle.count() == 3)); } @Test @@ -422,6 +437,7 @@ public void testAllContainerStateInsertions() { case MIS_REPLICATED: case NEGATIVE_SIZE: + case REPLICA_MISMATCH: unhealthyContainer.setExpectedReplicaCount(3); unhealthyContainer.setActualReplicaCount(3); unhealthyContainer.setReplicaDelta(0); @@ -583,7 +599,25 @@ private Set getMockReplicas( .setContainerState(s) .setContainerID(ContainerID.valueOf(containerId)) .setSequenceId(1) + .setDataChecksum(1234L) + .build()); + } + return replicas; + } + + private Set getMockReplicasChecksumMismatch( + long containerId, State...states) { + Set replicas = new HashSet<>(); + long checksum = 1234L; + for (State s : states) { + replicas.add(ContainerReplica.newBuilder() + .setDatanodeDetails(MockDatanodeDetails.randomDatanodeDetails()) + .setContainerState(s) + .setContainerID(ContainerID.valueOf(containerId)) + .setSequenceId(1) + .setDataChecksum(checksum) .build()); + checksum++; } return replicas; } diff --git a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/fsck/TestContainerHealthTaskRecordGenerator.java b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/fsck/TestContainerHealthTaskRecordGenerator.java index 4e9965638a17..263e5fe04c57 100644 --- a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/fsck/TestContainerHealthTaskRecordGenerator.java +++ b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/fsck/TestContainerHealthTaskRecordGenerator.java @@ -362,6 +362,38 @@ public void testCorrectRecordsGenerated() { logUnhealthyContainerStats(unhealthyContainerStateStatsMap); initializeUnhealthyContainerStateStatsMap(unhealthyContainerStateStatsMap); + // Replica mismatch + replicas = generateMismatchedReplicas(container, CLOSED, CLOSED, CLOSED); + status = + new ContainerHealthStatus(container, replicas, placementPolicy, + reconContainerMetadataManager, CONF); + records = ContainerHealthTask.ContainerHealthRecords + .generateUnhealthyRecords(status, (long) 1234567, + unhealthyContainerStateStatsMap); + assertEquals(1, records.size()); + assertEquals(1, unhealthyContainerStateStatsMap.get( + UnHealthyContainerStates.REPLICA_MISMATCH) + .getOrDefault(CONTAINER_COUNT, 0L)); + + logUnhealthyContainerStats(unhealthyContainerStateStatsMap); + initializeUnhealthyContainerStateStatsMap(unhealthyContainerStateStatsMap); + + // Same data checksum replicas + replicas = generateReplicas(container, CLOSED, CLOSED, CLOSED); + status = + new ContainerHealthStatus(container, replicas, placementPolicy, + reconContainerMetadataManager, CONF); + records = ContainerHealthTask.ContainerHealthRecords + .generateUnhealthyRecords(status, (long) 1234567, + unhealthyContainerStateStatsMap); + assertEquals(0, records.size()); + assertEquals(0, unhealthyContainerStateStatsMap.get( + UnHealthyContainerStates.REPLICA_MISMATCH) + .getOrDefault(CONTAINER_COUNT, 0L)); + + logUnhealthyContainerStats(unhealthyContainerStateStatsMap); + initializeUnhealthyContainerStateStatsMap(unhealthyContainerStateStatsMap); + // Under and Mis Replicated - expect 2 records - mis and under replicated replicas = generateReplicas(container, CLOSED, CLOSED); @@ -612,11 +644,28 @@ private Set generateReplicas(ContainerInfo cont, .setContainerID(cont.containerID()) .setDatanodeDetails(MockDatanodeDetails.randomDatanodeDetails()) .setContainerState(s) + .setDataChecksum(1234L) .build()); } return replicas; } + private Set generateMismatchedReplicas(ContainerInfo cont, + ContainerReplicaProto.State...states) { + Set replicas = new HashSet<>(); + long checksum = 1234L; + for (ContainerReplicaProto.State s : states) { + replicas.add(new ContainerReplica.ContainerReplicaBuilder() + .setContainerID(cont.containerID()) + .setDatanodeDetails(MockDatanodeDetails.randomDatanodeDetails()) + .setContainerState(s) + .setDataChecksum(checksum) + .build()); + checksum++; + } + return replicas; + } + private void initializeUnhealthyContainerStateStatsMap( Map> unhealthyContainerStateStatsMap) { @@ -632,6 +681,8 @@ private void initializeUnhealthyContainerStateStatsMap( UnHealthyContainerStates.MIS_REPLICATED, new HashMap<>()); unhealthyContainerStateStatsMap.put( UnHealthyContainerStates.NEGATIVE_SIZE, new HashMap<>()); + unhealthyContainerStateStatsMap.put( + UnHealthyContainerStates.REPLICA_MISMATCH, new HashMap<>()); } private void logUnhealthyContainerStats( diff --git a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/scm/TestReconContainerManager.java b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/scm/TestReconContainerManager.java index 0ed9b1318a9b..544fce21a3bf 100644 --- a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/scm/TestReconContainerManager.java +++ b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/scm/TestReconContainerManager.java @@ -211,7 +211,7 @@ public void testUpdateAndRemoveContainerReplica() .setUuid(uuid1).setHostName("host1").setIpAddress("127.0.0.1").build(); ContainerReplica containerReplica1 = ContainerReplica.newBuilder() .setContainerID(containerID1).setContainerState(State.OPEN) - .setDatanodeDetails(datanodeDetails1).setSequenceId(1001L).build(); + .setDatanodeDetails(datanodeDetails1).setSequenceId(1001L).setDataChecksum(1234L).build(); final ReconContainerManager containerManager = getContainerManager(); final Map> repHistMap = @@ -239,6 +239,7 @@ public void testUpdateAndRemoveContainerReplica() assertEquals(repHist1.getLastSeenTime(), repHist1.getFirstSeenTime()); assertEquals(containerReplica1.getSequenceId().longValue(), repHist1.getBcsId()); + assertEquals(containerReplica1.getDataChecksum(), repHist1.getDataChecksum()); // Let's update the entry again containerReplica1 = ContainerReplica.newBuilder() @@ -257,7 +258,7 @@ public void testUpdateAndRemoveContainerReplica() .setUuid(uuid2).setHostName("host2").setIpAddress("127.0.0.2").build(); final ContainerReplica containerReplica2 = ContainerReplica.newBuilder() .setContainerID(containerID1).setContainerState(State.OPEN) - .setDatanodeDetails(datanodeDetails2).setSequenceId(1051L).build(); + .setDatanodeDetails(datanodeDetails2).setSequenceId(1051L).setDataChecksum(1234L).build(); // Add replica to DN02 containerManager.updateContainerReplica(containerID1, containerReplica2); @@ -271,6 +272,7 @@ public void testUpdateAndRemoveContainerReplica() // Because this is a new entry, first seen time equals last seen time assertEquals(repHist2.getLastSeenTime(), repHist2.getFirstSeenTime()); assertEquals(1051L, repHist2.getBcsId()); + assertEquals(containerReplica2.getDataChecksum(), repHist2.getDataChecksum()); // Remove replica from DN01 containerManager.removeContainerReplica(containerID1, containerReplica1);