Skip to content
Closed
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.Collectors;

import static org.apache.hadoop.hdds.scm.pipeline.RatisPipelineUtils.pipelineLeaderFormat;

/**
* SCM Pipeline Manager implementation.
* All the write operations for pipelines must come via PipelineManager.
Expand Down Expand Up @@ -697,6 +699,12 @@ public Map<String, Integer> getPipelineInfo() throws NotLeaderException {
return pipelineInfo;
}

@Override
public Map<String, Map<String, Map<String, String>>> getPipelineLeaders() {
List<Pipeline> pipelines = getPipelines();
return pipelineLeaderFormat(pipelines);
}

/**
* Get SafeMode status.
* @return boolean
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,4 +36,6 @@ public interface PipelineManagerMXBean {
*/
Map<String, Integer> getPipelineInfo() throws NotLeaderException;

Map<String, Map<String, Map<String, String>>> getPipelineLeaders();

}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@

import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;

import org.apache.hadoop.hdds.client.RatisReplicationConfig;
Expand Down Expand Up @@ -117,4 +120,50 @@ static List<Pipeline> checkPipelineContainSameDatanodes(
p.sameDatanodes(pipeline)))
.collect(Collectors.toList());
}

/**
* Returns a map containing pipeline information which includes
* pipeline-id & the corresponding roles of all datanodes along with
* their respective datanode-id
* part of the pipeline.
*
* @param pipelines input pipeline
* @return map containing pipeline details
*/
public static Map<String, Map<String, Map<String, String>>>
pipelineLeaderFormat(List<Pipeline> pipelines) {
final Map<String, Map<String, Map<String, String>>> pipelineInfo =
new ConcurrentHashMap<>();

pipelines.forEach(pipeline -> {
String leaderNode = "";
List<DatanodeDetails> dataNodes = pipeline.getNodes();
UUID pipelineId = pipeline.getId().getId();

try {
leaderNode = pipeline.getLeaderNode().getHostName();
} catch (IOException ioEx) {
LOG.warn("Cannot get leader node for pipeline {}",
pipelineId, ioEx);
}

int numOfNodes = dataNodes.size();
Map<String, Map<String, String>> nodeInfo = new ConcurrentHashMap<>();
for (int cnt = 0; cnt < numOfNodes; cnt++) {
Map<String, String> info = new ConcurrentHashMap<>();
DatanodeDetails node = dataNodes.get(cnt);
String role =
node.getHostName().equals(leaderNode) ? "Leader" : "Follower";
String dataNodeUUID = node.getUuidString();
String hostName = node.getHostName();
info.put("Role", role);
info.put("UUID", dataNodeUUID);
info.put("HostName", hostName);
nodeInfo.put("DataNode-" + cnt, info);
}
pipelineInfo.put(pipelineId.toString(), nodeInfo);
});
return pipelineInfo;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,11 @@ public Map<String, Integer> getPipelineInfo() {
return null;
}

@Override
public Map<String, Map<String, Map<String, String>>> getPipelineLeaders() {
return null;
}

@Override
public void acquireReadLock() {

Expand Down