Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.hadoop.hdds.annotation.InterfaceStability;
import org.apache.hadoop.hdds.scm.DatanodeAdminError;
import org.apache.hadoop.hdds.scm.container.ContainerReplicaInfo;
import org.apache.hadoop.hdds.scm.container.ReplicationManagerReport;
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
import org.apache.hadoop.hdds.scm.container.ContainerInfo;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
Expand Down Expand Up @@ -319,6 +320,14 @@ Map<String, Pair<Boolean, String>> getSafeModeRuleStatuses()
*/
boolean getReplicationManagerStatus() throws IOException;

/**
* Returns the latest container summary report generated by Replication
* Manager.
* @return The latest ReplicationManagerReport.
* @throws IOException
*/
ReplicationManagerReport getReplicationManagerReport() throws IOException;

/**
* Start ContainerBalancer.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.LongAdder;
import java.util.stream.Collectors;

/**
* This class is used by ReplicationManager. Each time ReplicationManager runs,
Expand Down Expand Up @@ -96,6 +97,22 @@ public String getDescription() {
private final Map<String, List<ContainerID>> containerSample
= new ConcurrentHashMap<>();

public static ReplicationManagerReport fromProtobuf(
HddsProtos.ReplicationManagerReportProto proto) {
ReplicationManagerReport report = new ReplicationManagerReport();
report.setTimestamp(proto.getTimestamp());
for (HddsProtos.KeyIntValue stat : proto.getStatList()) {
report.setStat(stat.getKey(), stat.getValue());
}
for (HddsProtos.KeyContainerIDList sample : proto.getStatSampleList()) {
report.setSample(sample.getKey(), sample.getContainerList()
.stream()
.map(c -> ContainerID.getFromProtobuf(c))
.collect(Collectors.toList()));
}
return report;
}

public ReplicationManagerReport() {
stats = createStatsMap();
}
Expand Down Expand Up @@ -129,16 +146,62 @@ public long getReportTimeStamp() {
return reportTimeStamp;
}

/**
* Get the stat for the given LifeCycleState. If there is no stat available
* for that stat -1 is returned.
* @param stat The requested stat.
* @return The stat value or -1 if it is not present
*/
public long getStat(HddsProtos.LifeCycleState stat) {
return getStat(stat.toString());
}

/**
* Get the stat for the given HealthState. If there is no stat available
* for that stat -1 is returned.
* @param stat The requested stat.
* @return The stat value or -1 if it is not present
*/
public long getStat(HealthState stat) {
return getStat(stat.toString());
}

/**
* Returns the stat requested, or -1 if it does not exist.
* @param stat The request stat
* @return The value of the stat or -1 if it does not exist.
*/
private long getStat(String stat) {
return stats.get(stat).longValue();
LongAdder val = stats.get(stat);
if (val == null) {
return -1;
}
return val.longValue();
}

protected void setTimestamp(long timestamp) {
this.reportTimeStamp = timestamp;
}

protected void setStat(String stat, long value) {
LongAdder adder = getStatAndEnsurePresent(stat);
if (adder.longValue() != 0) {
throw new IllegalStateException(stat + " is expected to be zero");
}
adder.add(value);
}

protected void setSample(String stat, List<ContainerID> sample) {
// First get the stat, as we should not receive a sample for a stat which
// does not exist.
getStatAndEnsurePresent(stat);
// Now check there is not already a sample for this stat
List<ContainerID> existingSample = containerSample.get(stat);
if (existingSample != null) {
throw new IllegalStateException(stat
+ " is not expected to have existing samples");
}
containerSample.put(stat, sample);
}

public List<ContainerID> getSample(HddsProtos.LifeCycleState stat) {
Expand All @@ -160,11 +223,15 @@ private List<ContainerID> getSample(String stat) {
}

private void increment(String stat) {
getStatAndEnsurePresent(stat).increment();
}

private LongAdder getStatAndEnsurePresent(String stat) {
LongAdder adder = stats.get(stat);
if (adder == null) {
throw new IllegalArgumentException("Unexpected stat " + stat);
}
adder.increment();
return adder;
}

private void incrementAndSample(String stat, ContainerID container) {
Expand All @@ -189,4 +256,28 @@ private Map<String, LongAdder> createStatsMap() {
return map;
}

public HddsProtos.ReplicationManagerReportProto toProtobuf() {
HddsProtos.ReplicationManagerReportProto.Builder proto =
HddsProtos.ReplicationManagerReportProto.newBuilder();
proto.setTimestamp(getReportTimeStamp());

for (Map.Entry<String, LongAdder> e : stats.entrySet()) {
proto.addStat(HddsProtos.KeyIntValue.newBuilder()
.setKey(e.getKey())
.setValue(e.getValue().longValue())
.build());
}

for (Map.Entry<String, List<ContainerID>> e : containerSample.entrySet()) {
HddsProtos.KeyContainerIDList.Builder sample
= HddsProtos.KeyContainerIDList.newBuilder();
sample.setKey(e.getKey());
for (ContainerID container : e.getValue()) {
sample.addContainer(container.getProtobuf());
}
proto.addStatSample(sample.build());
}
return proto.build();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.hadoop.hdds.scm.ScmInfo;
import org.apache.hadoop.hdds.scm.container.ContainerID;
import org.apache.hadoop.hdds.scm.container.ContainerInfo;
import org.apache.hadoop.hdds.scm.container.ReplicationManagerReport;
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.ozone.upgrade.UpgradeFinalizer.StatusAndMessages;
Expand Down Expand Up @@ -317,6 +318,14 @@ Map<String, Pair<Boolean, String>> getSafeModeRuleStatuses()
*/
boolean getReplicationManagerStatus() throws IOException;

/**
* Returns the latest container summary report generated by Replication
* Manager.
* @return The latest ReplicationManagerReport.
* @throws IOException
*/
ReplicationManagerReport getReplicationManagerReport() throws IOException;

/**
* Start ContainerBalancer.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,8 @@ public enum SCMAction implements AuditAction {
STOP_CONTAINER_BALANCER,
GET_CONTAINER_BALANCER_STATUS,
GET_CONTAINER_WITH_PIPELINE_BATCH,
ADD_SCM;
ADD_SCM,
GET_REPLICATION_MANAGER_REPORT;

@Override
public String getAction() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,10 @@
import org.junit.Before;
import org.junit.Test;

import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.ThreadLocalRandom;

/**
* Tests for the ReplicationManagerReport class.
Expand Down Expand Up @@ -110,4 +113,50 @@ public void testSamplesAreLimited() {
Assert.assertEquals(new ContainerID(i), sample.get(i));
}
}

@Test
public void testSerializeToProtoAndBack() {
report.setTimestamp(12345);
Random rand = ThreadLocalRandom.current();
for (HddsProtos.LifeCycleState s : HddsProtos.LifeCycleState.values()) {
report.setStat(s.toString(), rand.nextInt(Integer.MAX_VALUE));
}
for (ReplicationManagerReport.HealthState s :
ReplicationManagerReport.HealthState.values()) {
report.setStat(s.toString(), rand.nextInt(Integer.MAX_VALUE));
List<ContainerID> containers = new ArrayList<>();
for (int i = 0; i < 10; i++) {
containers.add(ContainerID.valueOf(rand.nextInt(Integer.MAX_VALUE)));
}
report.setSample(s.toString(), containers);
}
HddsProtos.ReplicationManagerReportProto proto = report.toProtobuf();
ReplicationManagerReport newReport
= ReplicationManagerReport.fromProtobuf(proto);
Assert.assertEquals(report.getReportTimeStamp(),
newReport.getReportTimeStamp());

for (HddsProtos.LifeCycleState s : HddsProtos.LifeCycleState.values()) {
Assert.assertEquals(report.getStat(s), newReport.getStat(s));
}

for (ReplicationManagerReport.HealthState s :
ReplicationManagerReport.HealthState.values()) {
Assert.assertTrue(report.getSample(s).equals(newReport.getSample(s)));
}
}

@Test(expected = IllegalStateException.class)
public void testStatCannotBeSetTwice() {
report.setStat(HddsProtos.LifeCycleState.CLOSED.toString(), 10);
report.setStat(HddsProtos.LifeCycleState.CLOSED.toString(), 10);
}

@Test(expected = IllegalStateException.class)
public void testSampleCannotBeSetTwice() {
List<ContainerID> containers = new ArrayList<>();
containers.add(ContainerID.valueOf(1));
report.setSample(HddsProtos.LifeCycleState.CLOSED.toString(), containers);
report.setSample(HddsProtos.LifeCycleState.CLOSED.toString(), containers);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@
import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.PipelineResponseProto;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.RecommissionNodesRequestProto;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.RecommissionNodesResponseProto;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.ReplicationManagerReportRequestProto;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.ReplicationManagerReportResponseProto;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.ReplicationManagerStatusRequestProto;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.ReplicationManagerStatusResponseProto;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.ContainerBalancerStatusRequestProto;
Expand All @@ -87,6 +89,7 @@
import org.apache.hadoop.hdds.scm.ScmInfo;
import org.apache.hadoop.hdds.scm.container.ContainerID;
import org.apache.hadoop.hdds.scm.container.ContainerInfo;
import org.apache.hadoop.hdds.scm.container.ReplicationManagerReport;
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.scm.protocol.StorageContainerLocationProtocol;
Expand Down Expand Up @@ -758,6 +761,20 @@ public boolean getReplicationManagerStatus() throws IOException {

}

@Override
public ReplicationManagerReport getReplicationManagerReport()
throws IOException {
ReplicationManagerReportRequestProto request =
ReplicationManagerReportRequestProto.newBuilder()
.setTraceID(TracingUtil.exportCurrentSpan())
.build();
ReplicationManagerReportResponseProto response =
submitRequest(Type.GetReplicationManagerReport,
builder -> builder.setReplicationManagerReportRequest(request))
.getGetReplicationManagerReportResponse();
return ReplicationManagerReport.fromProtobuf(response.getReport());
}

@Override
public boolean startContainerBalancer(
Optional<Double> threshold, Optional<Integer> iterations,
Expand Down
11 changes: 11 additions & 0 deletions hadoop-hdds/interface-admin/src/main/proto/ScmAdminProtocol.proto
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ message ScmContainerLocationRequest {
optional QueryUpgradeFinalizationProgressRequestProto queryUpgradeFinalizationProgressRequest = 37;
optional GetContainerCountRequestProto getContainerCountRequest = 38;
optional GetContainerReplicasRequestProto getContainerReplicasRequest = 39;
optional ReplicationManagerReportRequestProto replicationManagerReportRequest = 40;
}

message ScmContainerLocationResponse {
Expand Down Expand Up @@ -123,6 +124,7 @@ message ScmContainerLocationResponse {
optional QueryUpgradeFinalizationProgressResponseProto queryUpgradeFinalizationProgressResponse = 37;
optional GetContainerCountResponseProto getContainerCountResponse = 38;
optional GetContainerReplicasResponseProto getContainerReplicasResponse = 39;
optional ReplicationManagerReportResponseProto getReplicationManagerReportResponse = 40;

enum Status {
OK = 1;
Expand Down Expand Up @@ -168,6 +170,7 @@ enum Type {
QueryUpgradeFinalizationProgress = 32;
GetContainerCount = 33;
GetContainerReplicas = 34;
GetReplicationManagerReport = 35;
}

/**
Expand Down Expand Up @@ -468,6 +471,14 @@ message ReplicationManagerStatusResponseProto {
required bool isRunning = 1;
}

message ReplicationManagerReportRequestProto {
optional string traceID = 1;
}

message ReplicationManagerReportResponseProto {
required ReplicationManagerReportProto report = 1;
}

message FinalizeScmUpgradeRequestProto {
required string upgradeClientId = 1;
}
Expand Down
16 changes: 16 additions & 0 deletions hadoop-hdds/interface-client/src/main/proto/hdds.proto
Original file line number Diff line number Diff line change
Expand Up @@ -388,3 +388,19 @@ message SCMContainerReplicaProto {
required int64 keyCount = 6;
required int64 bytesUsed = 7;
}

message KeyContainerIDList {
required string key = 1;
repeated ContainerID container = 2;
}

message KeyIntValue {
required string key = 1;
optional int64 value = 2;
}

message ReplicationManagerReportProto {
required int64 timestamp = 1;
repeated KeyIntValue stat = 2;
repeated KeyContainerIDList statSample = 3;
}
Original file line number Diff line number Diff line change
Expand Up @@ -370,6 +370,11 @@ public synchronized void stop() {
* This in intended to be used in tests.
*/
public synchronized void processAll() {
if (!shouldRun()) {
LOG.info("Replication Manager is not ready to run until {}ms after " +
"safemode exit", waitTimeInMillis);
return;
}
final long start = clock.millis();
final List<ContainerInfo> containers =
containerManager.getContainers();
Expand Down
Loading