Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions hadoop-hdds/interface-client/src/main/proto/hdds.proto
Original file line number Diff line number Diff line change
Expand Up @@ -424,6 +424,7 @@ message ContainerReplicaHistoryProto {
required int64 lastSeenTime = 3;
required int64 bcsId = 4;
optional string state = 5;
optional int64 dataChecksum = 6;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Adding this new field in proto, will there be any decoding issues for existing data in replica_history table in upgrade case when try to load ? Has this been tested ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added this as an optional field, so that is saving us from some problems regarding compatibility (similarly how it was done in #4443). But I'm not sure if we should handle the case when there is no dataChecksum and we call this

public static ContainerReplicaHistory fromProto(
ContainerReplicaHistoryProto proto) {
return new ContainerReplicaHistory(UUID.fromString(proto.getUuid()),
proto.getFirstSeenTime(), proto.getLastSeenTime(), proto.getBcsId(),
proto.getState(), proto.getDataChecksum());
}

If we should, than the state field is also missing this and we should do it for that as well.

}

message SCMContainerReplicaProto {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,8 @@ public enum UnHealthyContainerStates {
OVER_REPLICATED,
MIS_REPLICATED,
ALL_REPLICAS_BAD,
NEGATIVE_SIZE // Added new state to track containers with negative sizes
NEGATIVE_SIZE, // Added new state to track containers with negative sizes
REPLICA_MISMATCH
}

private static final String CONTAINER_ID = "container_id";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,12 @@ public class UnhealthyContainersResponse {
@JsonProperty("misReplicatedCount")
private long misReplicatedCount = 0;

/**
* Total count of containers that have replicas with mismatched data checksums.
*/
@JsonProperty("replicaMismatchCount")
private long replicaMismatchCount = 0;

/**
* A collection of unhealthy containers.
*/
Expand Down Expand Up @@ -77,6 +83,9 @@ public void setSummaryCount(String state, long count) {
} else if (state.equals(
UnHealthyContainerStates.MIS_REPLICATED.toString())) {
this.misReplicatedCount = count;
} else if (state.equals(
UnHealthyContainerStates.REPLICA_MISMATCH.toString())) {
this.replicaMismatchCount = count;
}
}

Expand All @@ -96,6 +105,10 @@ public long getMisReplicatedCount() {
return misReplicatedCount;
}

public long getReplicaMismatchCount() {
return replicaMismatchCount;
}

public Collection<UnhealthyContainerMetadata> getContainers() {
return containers;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ public class ContainerHealthStatus {

private final ContainerInfo container;
private final int replicaDelta;
private final Set<ContainerReplica> replicas;
private final Set<ContainerReplica> healthyReplicas;
private final Set<ContainerReplica> healthyAvailReplicas;
private final ContainerPlacementStatus placementStatus;
Expand All @@ -64,6 +65,7 @@ public class ContainerHealthStatus {
this.reconContainerMetadataManager = reconContainerMetadataManager;
this.container = container;
int repFactor = container.getReplicationConfig().getRequiredNodes();
this.replicas = replicas;
this.healthyReplicas = replicas
.stream()
.filter(r -> !r.getState()
Expand Down Expand Up @@ -160,6 +162,13 @@ public boolean isEmpty() {
return numKeys == 0;
}

public boolean isDataChecksumMismatched() {
return !replicas.isEmpty() && replicas.stream()
.map(ContainerReplica::getDataChecksum)
.distinct()
.count() != 1;
}

private ContainerPlacementStatus getPlacementStatus(
PlacementPolicy policy, int repFactor) {
List<DatanodeDetails> dns = healthyReplicas.stream()
Expand All @@ -182,19 +191,19 @@ public long getNumKeys() {
}

private ContainerReplicaCount getContainerReplicaCountInstance(
OzoneConfiguration conf, Set<ContainerReplica> replicas) {
OzoneConfiguration conf, Set<ContainerReplica> containerReplicas) {
ReplicationManager.ReplicationManagerConfiguration rmConf = conf.getObject(
ReplicationManager.ReplicationManagerConfiguration.class);
boolean isEC = container.getReplicationConfig()
.getReplicationType() == HddsProtos.ReplicationType.EC;
return isEC ?
new ECContainerReplicaCount(container,
replicas, new ArrayList<>(),
containerReplicas, new ArrayList<>(),
rmConf.getMaintenanceRemainingRedundancy()) :
// This class ignores unhealthy replicas,
// therefore set 'considerUnhealthy' to false.
new RatisContainerReplicaCount(container,
replicas, new ArrayList<>(),
containerReplicas, new ArrayList<>(),
rmConf.getMaintenanceReplicaMinimum(), false);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ public void run() {
Thread.sleep(interval);
}
} catch (Throwable t) {
LOG.error("Exception in Missing Container task Thread.", t);
LOG.error("Exception in Container Health task thread.", t);
if (t instanceof InterruptedException) {
Thread.currentThread().interrupt();
}
Expand Down Expand Up @@ -234,6 +234,8 @@ private void initializeUnhealthyContainerStateStatsMap(
UnHealthyContainerStates.MIS_REPLICATED, new HashMap<>());
unhealthyContainerStateStatsMap.put(
UnHealthyContainerStates.NEGATIVE_SIZE, new HashMap<>());
unhealthyContainerStateStatsMap.put(
UnHealthyContainerStates.REPLICA_MISMATCH, new HashMap<>());
}

private ContainerHealthStatus setCurrentContainer(long recordId)
Expand Down Expand Up @@ -352,7 +354,7 @@ private void processContainer(ContainerInfo container, long currentTime,
containerReplicas, placementPolicy,
reconContainerMetadataManager, conf);

if (h.isHealthilyReplicated() || h.isDeleted()) {
if ((h.isHealthilyReplicated() && !h.isDataChecksumMismatched()) || h.isDeleted()) {
return;
}
// For containers deleted in SCM, we sync the container state here.
Expand Down Expand Up @@ -486,7 +488,7 @@ public static class ContainerHealthRecords {
*/
public static boolean retainOrUpdateRecord(
ContainerHealthStatus container, UnhealthyContainersRecord rec) {
boolean returnValue = false;
boolean returnValue;
switch (UnHealthyContainerStates.valueOf(rec.getContainerState())) {
case MISSING:
returnValue = container.isMissing() && !container.isEmpty();
Expand All @@ -500,6 +502,9 @@ public static boolean retainOrUpdateRecord(
case OVER_REPLICATED:
returnValue = keepOverReplicatedRecord(container, rec);
break;
case REPLICA_MISMATCH:
returnValue = keepReplicaMismatchRecord(container, rec);
break;
default:
returnValue = false;
}
Expand Down Expand Up @@ -528,7 +533,7 @@ public static List<UnhealthyContainers> generateUnhealthyRecords(
Map<UnHealthyContainerStates, Map<String, Long>>
unhealthyContainerStateStatsMap) {
List<UnhealthyContainers> records = new ArrayList<>();
if (container.isHealthilyReplicated() || container.isDeleted()) {
if ((container.isHealthilyReplicated() && !container.isDataChecksumMismatched()) || container.isDeleted()) {
return records;
}

Expand Down Expand Up @@ -593,6 +598,16 @@ public static List<UnhealthyContainers> generateUnhealthyRecords(
unhealthyContainerStateStatsMap);
}

if (container.isDataChecksumMismatched()
&& !recordForStateExists.contains(
UnHealthyContainerStates.REPLICA_MISMATCH.toString())) {
records.add(recordForState(
container, UnHealthyContainerStates.REPLICA_MISMATCH, time));
populateContainerStats(container,
UnHealthyContainerStates.REPLICA_MISMATCH,
unhealthyContainerStateStatsMap);
}

return records;
}

Expand Down Expand Up @@ -650,6 +665,17 @@ private static boolean keepMisReplicatedRecord(
return false;
}

private static boolean keepReplicaMismatchRecord(
ContainerHealthStatus container, UnhealthyContainersRecord rec) {
if (container.isDataChecksumMismatched()) {
updateExpectedReplicaCount(rec, container.getReplicationFactor());
updateActualReplicaCount(rec, container.getReplicaCount());
updateReplicaDelta(rec, container.replicaDelta());
return true;
}
return false;
}

/**
* With a Jooq record, if you update any field in the record, the record
* is marked as changed, even if you updated it to the same value as it is
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,17 +32,20 @@ public class ContainerHistory implements Serializable {
private long lastSeenTime;
private long lastBcsId;
private String state;
private long dataChecksum;

@SuppressWarnings("parameternumber")
public ContainerHistory(long containerId, String datanodeUuid,
String datanodeHost, long firstSeenTime,
long lastSeenTime, long lastBcsId, String state) {
long lastSeenTime, long lastBcsId, String state, long dataChecksum) {
this.containerId = containerId;
this.datanodeUuid = datanodeUuid;
this.datanodeHost = datanodeHost;
this.firstSeenTime = firstSeenTime;
this.lastSeenTime = lastSeenTime;
this.lastBcsId = lastBcsId;
this.state = state;
this.dataChecksum = dataChecksum;
}

// Default constructor, used by jackson lib for object deserialization.
Expand Down Expand Up @@ -100,4 +103,12 @@ public String getState() {
public void setState(String state) {
this.state = state;
}

public long getDataChecksum() {
return dataChecksum;
}

public void setDataChecksum(long dataChecksum) {
this.dataChecksum = dataChecksum;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,14 +41,16 @@ public class ContainerReplicaHistory {

private long bcsId;
private String state;
private long dataChecksum;

public ContainerReplicaHistory(UUID id, Long firstSeenTime,
Long lastSeenTime, long bcsId, String state) {
Long lastSeenTime, long bcsId, String state, long dataChecksum) {
this.uuid = id;
this.firstSeenTime = firstSeenTime;
this.lastSeenTime = lastSeenTime;
this.bcsId = bcsId;
this.state = state;
this.dataChecksum = dataChecksum;
}

public long getBcsId() {
Expand Down Expand Up @@ -83,16 +85,24 @@ public void setState(String state) {
this.state = state;
}

public long getDataChecksum() {
return dataChecksum;
}

public void setDataChecksum(long dataChecksum) {
this.dataChecksum = dataChecksum;
}

public static ContainerReplicaHistory fromProto(
ContainerReplicaHistoryProto proto) {
return new ContainerReplicaHistory(UUID.fromString(proto.getUuid()),
proto.getFirstSeenTime(), proto.getLastSeenTime(), proto.getBcsId(),
proto.getState());
proto.getState(), proto.getDataChecksum());
}

public ContainerReplicaHistoryProto toProto() {
return ContainerReplicaHistoryProto.newBuilder().setUuid(uuid.toString())
.setFirstSeenTime(firstSeenTime).setLastSeenTime(lastSeenTime)
.setBcsId(bcsId).setState(state).build();
.setBcsId(bcsId).setState(state).setDataChecksum(dataChecksum).build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -280,14 +280,15 @@ public void updateContainerReplica(ContainerID containerID,
boolean flushToDB = false;
long bcsId = replica.getSequenceId() != null ? replica.getSequenceId() : -1;
String state = replica.getState().toString();
long dataChecksum = replica.getDataChecksum();

// If replica doesn't exist in in-memory map, add to DB and add to map
if (replicaLastSeenMap == null) {
// putIfAbsent to avoid TOCTOU
replicaHistoryMap.putIfAbsent(id,
new ConcurrentHashMap<UUID, ContainerReplicaHistory>() {{
put(uuid, new ContainerReplicaHistory(uuid, currTime, currTime,
bcsId, state));
bcsId, state, dataChecksum));
}});
flushToDB = true;
} else {
Expand All @@ -297,7 +298,7 @@ public void updateContainerReplica(ContainerID containerID,
// New Datanode
replicaLastSeenMap.put(uuid,
new ContainerReplicaHistory(uuid, currTime, currTime, bcsId,
state));
state, dataChecksum));
flushToDB = true;
} else {
// if the object exists, only update the last seen time & bcsId fields
Expand All @@ -308,7 +309,7 @@ public void updateContainerReplica(ContainerID containerID,
}

if (flushToDB) {
upsertContainerHistory(id, uuid, currTime, bcsId, state);
upsertContainerHistory(id, uuid, currTime, bcsId, state, dataChecksum);
}
}

Expand All @@ -325,6 +326,7 @@ public void removeContainerReplica(ContainerID containerID,
final DatanodeDetails dnInfo = replica.getDatanodeDetails();
final UUID uuid = dnInfo.getUuid();
String state = replica.getState().toString();
long dataChecksum = replica.getDataChecksum();

final Map<UUID, ContainerReplicaHistory> replicaLastSeenMap =
replicaHistoryMap.get(id);
Expand All @@ -333,7 +335,7 @@ public void removeContainerReplica(ContainerID containerID,
if (ts != null) {
// Flush to DB, then remove from in-memory map
upsertContainerHistory(id, uuid, ts.getLastSeenTime(), ts.getBcsId(),
state);
state, dataChecksum);
replicaLastSeenMap.remove(uuid);
}
}
Expand Down Expand Up @@ -392,9 +394,10 @@ public List<ContainerHistory> getAllContainerHistory(long containerID) {
final long lastSeenTime = entry.getValue().getLastSeenTime();
long bcsId = entry.getValue().getBcsId();
String state = entry.getValue().getState();
long dataChecksum = entry.getValue().getDataChecksum();

resList.add(new ContainerHistory(containerID, uuid.toString(), hostname,
firstSeenTime, lastSeenTime, bcsId, state));
firstSeenTime, lastSeenTime, bcsId, state, dataChecksum));
}
return resList;
}
Expand Down Expand Up @@ -429,19 +432,20 @@ public void flushReplicaHistoryMapToDB(boolean clearMap) {
}

public void upsertContainerHistory(long containerID, UUID uuid, long time,
long bcsId, String state) {
long bcsId, String state, long dataChecksum) {
Map<UUID, ContainerReplicaHistory> tsMap;
try {
tsMap = cdbServiceProvider.getContainerReplicaHistory(containerID);
ContainerReplicaHistory ts = tsMap.get(uuid);
if (ts == null) {
// New entry
tsMap.put(uuid, new ContainerReplicaHistory(uuid, time, time, bcsId,
state));
state, dataChecksum));
} else {
// Entry exists, update last seen time and put it back to DB.
ts.setLastSeenTime(time);
ts.setState(state);
ts.setDataChecksum(dataChecksum);
}
cdbServiceProvider.storeContainerReplicaHistory(containerID, tsMap);
} catch (IOException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@
public enum ReconLayoutFeature {
// Represents the starting point for Recon's layout versioning system.
INITIAL_VERSION(0, "Recon Layout Versioning Introduction"),
TASK_STATUS_STATISTICS(1, "Recon Task Status Statistics Tracking Introduced");
TASK_STATUS_STATISTICS(1, "Recon Task Status Statistics Tracking Introduced"),
UNHEALTHY_CONTAINER_REPLICA_MISMATCH(2, "Adding replica mismatch state to the unhealthy container table");

private final int version;
private final String description;
Expand Down
Loading