Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -54,4 +54,18 @@ public interface ContainerPlacementStatus {
*/
int misReplicationCount();

/**
* The number of locations (eg racks, node groups) the container should be
* placed on.
* @return The expected Placement Count
*/
int expectedPlacementCount();

/**
* The actual placement count, eg how many racks the container is currently
* on.
* @return The actual placement count.
*/
int actualPlacementCount();

}
Original file line number Diff line number Diff line change
Expand Up @@ -57,4 +57,14 @@ public int misReplicationCount() {
}
return requiredRacks - currentRacks;
}

@Override
public int expectedPlacementCount() {
return Math.min(requiredRacks, totalRacks);
}

@Override
public int actualPlacementCount() {
return currentRacks;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
import org.apache.hadoop.ozone.recon.scm.ReconContainerManager;
import org.apache.hadoop.ozone.recon.scm.ReconStorageContainerManagerFacade;
import org.apache.hadoop.test.LambdaTestUtils;
import org.hadoop.ozone.recon.schema.tables.pojos.MissingContainers;
import org.hadoop.ozone.recon.schema.tables.pojos.UnhealthyContainers;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
Expand Down Expand Up @@ -114,7 +114,7 @@ public void testMissingContainerDownNode() throws Exception {
cluster.shutdownHddsDatanode(pipeline.getFirstNode());

LambdaTestUtils.await(120000, 10000, () -> {
List<MissingContainers> allMissingContainers =
List<UnhealthyContainers> allMissingContainers =
reconContainerManager.getContainerSchemaManager()
.getAllMissingContainers();
return (allMissingContainers.size() == 1);
Expand All @@ -123,7 +123,7 @@ public void testMissingContainerDownNode() throws Exception {
// Restart the Datanode to make sure we remove the missing container.
cluster.restartHddsDatanode(pipeline.getFirstNode(), true);
LambdaTestUtils.await(120000, 10000, () -> {
List<MissingContainers> allMissingContainers =
List<UnhealthyContainers> allMissingContainers =
reconContainerManager.getContainerSchemaManager()
.getAllMissingContainers();
return (allMissingContainers.isEmpty());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
package org.hadoop.ozone.recon.schema;

import static org.hadoop.ozone.recon.codegen.SqlDbUtils.TABLE_EXISTS_CHECK;
import static org.jooq.impl.DSL.field;
import static org.jooq.impl.DSL.name;

import com.google.inject.Inject;
import com.google.inject.Singleton;
Expand All @@ -38,9 +40,22 @@ public class ContainerSchemaDefinition implements ReconSchemaDefinition {

public static final String CONTAINER_HISTORY_TABLE_NAME =
"CONTAINER_HISTORY";
public static final String MISSING_CONTAINERS_TABLE_NAME =
"MISSING_CONTAINERS";
public static final String UNHEALTHY_CONTAINERS_TABLE_NAME =
"UNHEALTHY_CONTAINERS";

/**
* ENUM describing the allowed container states which can be stored in the
* unhealthy containers table.
*/
public enum UnHealthyContainerStates {
MISSING,
UNDER_REPLICATED,
OVER_REPLICATED,
MIS_REPLICATED
}

private static final String CONTAINER_ID = "container_id";
private static final String CONTAINER_STATE = "container_state";
private final DataSource dataSource;
private DSLContext dslContext;

Expand All @@ -56,8 +71,8 @@ public void initializeSchema() throws SQLException {
if (!TABLE_EXISTS_CHECK.test(conn, CONTAINER_HISTORY_TABLE_NAME)) {
createContainerHistoryTable();
}
if (!TABLE_EXISTS_CHECK.test(conn, MISSING_CONTAINERS_TABLE_NAME)) {
createMissingContainersTable();
if (!TABLE_EXISTS_CHECK.test(conn, UNHEALTHY_CONTAINERS_TABLE_NAME)) {
createUnhealthyContainersTable();
}
}

Expand All @@ -78,12 +93,20 @@ private void createContainerHistoryTable() {
/**
* Create the Missing Containers table.
*/
private void createMissingContainersTable() {
dslContext.createTableIfNotExists(MISSING_CONTAINERS_TABLE_NAME)
.column(CONTAINER_ID, SQLDataType.BIGINT)
.column("missing_since", SQLDataType.BIGINT)
private void createUnhealthyContainersTable() {
dslContext.createTableIfNotExists(UNHEALTHY_CONTAINERS_TABLE_NAME)
.column(CONTAINER_ID, SQLDataType.BIGINT.nullable(false))
.column(CONTAINER_STATE, SQLDataType.VARCHAR(16).nullable(false))
.column("in_state_since", SQLDataType.BIGINT.nullable(false))
.column("expected_replica_count", SQLDataType.INTEGER.nullable(false))
.column("actual_replica_count", SQLDataType.INTEGER.nullable(false))
.column("replica_delta", SQLDataType.INTEGER.nullable(false))
.column("reason", SQLDataType.VARCHAR(500).nullable(true))
.constraint(DSL.constraint("pk_container_id")
.primaryKey(CONTAINER_ID))
.primaryKey(CONTAINER_ID, CONTAINER_STATE))
.constraint(DSL.constraint(UNHEALTHY_CONTAINERS_TABLE_NAME + "ck1")
.check(field(name("container_state"))
.in(UnHealthyContainerStates.values())))
.execute();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,8 @@
import org.hadoop.ozone.recon.schema.tables.daos.ContainerHistoryDao;
import org.hadoop.ozone.recon.schema.tables.daos.FileCountBySizeDao;
import org.hadoop.ozone.recon.schema.tables.daos.GlobalStatsDao;
import org.hadoop.ozone.recon.schema.tables.daos.MissingContainersDao;
import org.hadoop.ozone.recon.schema.tables.daos.ReconTaskStatusDao;
import org.hadoop.ozone.recon.schema.tables.daos.UnhealthyContainersDao;
import org.jooq.Configuration;
import org.jooq.DAO;
import org.slf4j.Logger;
Expand Down Expand Up @@ -126,7 +126,7 @@ public static class ReconDaoBindingModule extends AbstractModule {
ImmutableList.of(
FileCountBySizeDao.class,
ReconTaskStatusDao.class,
MissingContainersDao.class,
UnhealthyContainersDao.class,
GlobalStatsDao.class,
ClusterGrowthDailyDao.class,
ContainerHistoryDao.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,7 @@ public Response getMissingContainers() {
containerSchemaManager.getLatestContainerHistory(
containerID, containerInfo.getReplicationFactor().getNumber());
missingContainers.add(new MissingContainerMetadata(containerID,
container.getMissingSince(), keyCount, pipelineID, datanodes));
container.getInStateSince(), keyCount, pipelineID, datanodes));
} catch (IOException ioEx) {
throw new WebApplicationException(ioEx,
Response.Status.INTERNAL_SERVER_ERROR);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ozone.recon.fsck;

import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReplicaProto;
import org.apache.hadoop.hdds.scm.ContainerPlacementStatus;
import org.apache.hadoop.hdds.scm.PlacementPolicy;
import org.apache.hadoop.hdds.scm.container.ContainerInfo;
import org.apache.hadoop.hdds.scm.container.ContainerReplica;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;

/**
* Class which encapsulates all the information required to determine if a
* container and its replicas are correctly replicated and placed.
*/

public class ContainerHealthStatus {

private ContainerInfo container;
private int replicaDelta;
private Set<ContainerReplica> replicas;
private ContainerPlacementStatus placementStatus;

ContainerHealthStatus(ContainerInfo container,
Set<ContainerReplica> replicas, PlacementPolicy placementPolicy) {
this.container = container;
int repFactor = container.getReplicationFactor().getNumber();
this.replicas = replicas
.stream()
.filter(r -> !r.getState()
.equals((ContainerReplicaProto.State.UNHEALTHY)))
.collect(Collectors.toSet());
this.replicaDelta = repFactor - this.replicas.size();
this.placementStatus = getPlacementStatus(placementPolicy, repFactor);
}

public long getContainerID() {
return this.container.getContainerID();
}

public ContainerInfo getContainer() {
return this.container;
}

public int getReplicationFactor() {
return container.getReplicationFactor().getNumber();
}

public boolean isHealthy() {
return replicaDelta == 0 && !isMisReplicated();
}

public boolean isOverReplicated() {
return replicaDelta < 0;
}

public boolean isUnderReplicated() {
return !isMissing() && replicaDelta > 0;
}

public int replicaDelta() {
return replicaDelta;
}

public int getReplicaCount() {
return replicas.size();
}

public boolean isMisReplicated() {
return !isMissing() && !placementStatus.isPolicySatisfied();
}

public int misReplicatedDelta() {
return placementStatus.misReplicationCount();
}

public int expectedPlacementCount() {
return placementStatus.expectedPlacementCount();
}

public int actualPlacementCount() {
return placementStatus.actualPlacementCount();
}

public String misReplicatedReason() {
return placementStatus.misReplicatedReason();
}

public boolean isMissing() {
return replicas.size() == 0;
}

private ContainerPlacementStatus getPlacementStatus(
PlacementPolicy policy, int repFactor) {
List<DatanodeDetails> dns = replicas.stream()
.map(ContainerReplica::getDatanodeDetails)
.collect(Collectors.toList());
return policy.validateContainerPlacement(dns, repFactor);
}
}
Loading