Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions hadoop-ozone/tools/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,10 @@
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,9 @@

package org.apache.hadoop.ozone.debug.replicas.chunk;

import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.fasterxml.jackson.core.JsonFactory;
import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.File;
import java.io.IOException;
import java.util.Arrays;
Expand All @@ -27,14 +28,12 @@
import org.apache.hadoop.hdds.client.ECReplicationConfig;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.GetBlockResponseProto;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.scm.XceiverClientManager;
import org.apache.hadoop.hdds.scm.XceiverClientSpi;
import org.apache.hadoop.hdds.scm.cli.ContainerOperationClient;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.scm.storage.ContainerProtocolCalls;
import org.apache.hadoop.hdds.server.JsonUtils;
import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.ozone.client.OzoneClient;
import org.apache.hadoop.ozone.container.common.impl.ContainerLayoutVersion;
Expand All @@ -60,21 +59,17 @@ private String getChunkLocationPath(String containerLocation) {
}

@Override
@SuppressWarnings("checkstyle:methodlength")
protected void execute(OzoneClient client, OzoneAddress address)
throws IOException {
try (ContainerOperationClient containerOperationClient = new ContainerOperationClient(getOzoneConf());
XceiverClientManager xceiverClientManager = containerOperationClient.getXceiverClientManager()) {
OzoneManagerProtocol ozoneManagerClient = client.getObjectStore().getClientProxy().getOzoneManagerClient();
address.ensureKeyAddress();
ObjectNode result = JsonUtils.createObjectNode(null);
String volumeName = address.getVolumeName();
String bucketName = address.getBucketName();
String keyName = address.getKeyName();

result.put("volumeName", volumeName);
result.put("bucketName", bucketName);
result.put("name", keyName);

OmKeyArgs keyArgs = new OmKeyArgs.Builder().setVolumeName(volumeName)
.setBucketName(bucketName).setKeyName(keyName).build();
OmKeyInfo keyInfo = ozoneManagerClient.lookupKey(keyArgs);
Expand All @@ -95,113 +90,156 @@ protected void execute(OzoneClient client, OzoneAddress address)
}
ContainerLayoutVersion containerLayoutVersion = ContainerLayoutVersion
.getConfiguredVersion(getConf());
ArrayNode responseArrayList = result.putArray("keyLocations");
for (OmKeyLocationInfo keyLocation : locationInfos) {
Pipeline keyPipeline = keyLocation.getPipeline();
boolean isECKey =
keyPipeline.getReplicationConfig().getReplicationType() ==
HddsProtos.ReplicationType.EC;
Pipeline pipeline;
if (!isECKey && keyPipeline.getType() != HddsProtos.ReplicationType.STAND_ALONE) {
pipeline = keyPipeline.copyForRead();
} else {
pipeline = keyPipeline;
}
XceiverClientSpi xceiverClient = xceiverClientManager.acquireClientForReadData(pipeline);
try {
Map<DatanodeDetails, ContainerProtos.GetBlockResponseProto>
responses =
ContainerProtocolCalls.getBlockFromAllNodes(xceiverClient,
keyLocation.getBlockID().getDatanodeBlockIDProtobuf(),
keyLocation.getToken());
Map<DatanodeDetails, ContainerProtos.ReadContainerResponseProto> readContainerResponses =
containerOperationClient.readContainerFromAllNodes(
keyLocation.getContainerID(), pipeline);
ArrayNode responseFromAllNodes = responseArrayList.addArray();
for (Map.Entry<DatanodeDetails, ContainerProtos.GetBlockResponseProto> entry : responses.entrySet()) {
DatanodeDetails datanodeDetails = entry.getKey();
GetBlockResponseProto blockResponse = entry.getValue();

if (blockResponse == null || !blockResponse.hasBlockData()) {
System.err.printf("GetBlock call failed on %s datanode and %s block.%n",
datanodeDetails.getHostName(), keyLocation.getBlockID());
continue;
}

ContainerProtos.BlockData blockData = blockResponse.getBlockData();
ContainerProtos.ChunkInfo chunkInfo = blockData.getChunksCount() > 0 ?
blockData.getChunks(0) : null;
// Use Jackson streaming for all JSON generation
ObjectMapper mapper = new ObjectMapper();
JsonFactory jsonFactory = mapper.getFactory();

try (JsonGenerator jsonGen = jsonFactory.createGenerator(System.out)) {
jsonGen.useDefaultPrettyPrinter();

jsonGen.writeStartObject();
jsonGen.writeStringField("volumeName", volumeName);
jsonGen.writeStringField("bucketName", bucketName);
jsonGen.writeStringField("name", keyName);

// Start keyLocations array
jsonGen.writeArrayFieldStart("keyLocations");
for (OmKeyLocationInfo keyLocation : locationInfos) {
jsonGen.writeStartArray();

Pipeline keyPipeline = keyLocation.getPipeline();
boolean isECKey =
keyPipeline.getReplicationConfig().getReplicationType() ==
HddsProtos.ReplicationType.EC;
Pipeline pipeline;
if (!isECKey && keyPipeline.getType() != HddsProtos.ReplicationType.STAND_ALONE) {
pipeline = keyPipeline.copyForRead();
} else {
pipeline = keyPipeline;
}
XceiverClientSpi xceiverClient = xceiverClientManager.acquireClientForReadData(pipeline);
try {
Map<DatanodeDetails, ContainerProtos.ReadContainerResponseProto> readContainerResponses =
containerOperationClient.readContainerFromAllNodes(keyLocation.getContainerID(), pipeline);

// Process each datanode individually
for (DatanodeDetails datanodeDetails : pipeline.getNodes()) {
try {
// Get block from THIS ONE datanode only
ContainerProtos.GetBlockResponseProto blockResponse =
ContainerProtocolCalls.getBlock(xceiverClient,
keyLocation.getBlockID(),
keyLocation.getToken(),
pipeline.getReplicaIndexes());

if (blockResponse == null || !blockResponse.hasBlockData()) {
System.err.printf("GetBlock call failed on %s datanode and %s block.%n",
datanodeDetails.getHostName(), keyLocation.getBlockID());
continue;
}

ContainerProtos.BlockData blockData = blockResponse.getBlockData();
ContainerProtos.ChunkInfo chunkInfo = blockData.getChunksCount() > 0 ?
blockData.getChunks(0) : null;

String fileName = "";
if (chunkInfo != null) {
ContainerProtos.ContainerDataProto containerData =
readContainerResponses.get(datanodeDetails).getContainerData();
fileName = containerLayoutVersion.getChunkFile(new File(
String fileName = "";
if (chunkInfo != null) {
ContainerProtos.ContainerDataProto containerData =
readContainerResponses.get(datanodeDetails).getContainerData();
fileName = containerLayoutVersion.getChunkFile(new File(
getChunkLocationPath(containerData.getContainerPath())),
keyLocation.getBlockID(),
chunkInfo.getChunkName()).toString();
}

ObjectNode jsonObj = responseFromAllNodes.addObject();
ObjectNode dnObj = jsonObj.putObject("datanode");
dnObj.put("hostname", datanodeDetails.getHostName());
dnObj.put("ip", datanodeDetails.getIpAddress());
dnObj.put("uuid", datanodeDetails.getUuidString());

jsonObj.put("file", fileName);

ObjectNode blockDataNode = jsonObj.putObject("blockData");
ObjectNode blockIdNode = blockDataNode.putObject("blockID");
blockIdNode.put("containerID", blockData.getBlockID().getContainerID());
blockIdNode.put("localID", blockData.getBlockID().getLocalID());
blockIdNode.put("blockCommitSequenceId", blockData.getBlockID().getBlockCommitSequenceId());
blockDataNode.put("size", blockData.getSize());

ArrayNode chunkArray = blockDataNode.putArray("chunks");
for (ContainerProtos.ChunkInfo chunk : blockData.getChunksList()) {
ObjectNode chunkNode = chunkArray.addObject();
chunkNode.put("offset", chunk.getOffset());
chunkNode.put("len", chunk.getLen());

if (chunk.hasChecksumData()) {
ArrayNode checksums = chunkNode.putArray("checksums");
for (ByteString bs : chunk.getChecksumData().getChecksumsList()) {
checksums.add(StringUtils.byteToHexString(bs.toByteArray()));
keyLocation.getBlockID(),
chunkInfo.getChunkName()).toString();
}
chunkNode.put("checksumType", chunk.getChecksumData().getType().name());
chunkNode.put("bytesPerChecksum", chunk.getChecksumData().getBytesPerChecksum());
}

if (chunk.hasStripeChecksum()) {
byte[] stripeBytes = chunk.getStripeChecksum().toByteArray();
int checksumLen = chunk.getChecksumData().getChecksumsList().get(0).size();
// Start writing this datanode's response object
jsonGen.writeStartObject();

jsonGen.writeObjectFieldStart("datanode");
jsonGen.writeStringField("hostname", datanodeDetails.getHostName());
jsonGen.writeStringField("ip", datanodeDetails.getIpAddress());
jsonGen.writeStringField("uuid", datanodeDetails.getUuidString());
jsonGen.writeEndObject();

jsonGen.writeStringField("file", fileName);

// Write block data
jsonGen.writeObjectFieldStart("blockData");
jsonGen.writeObjectFieldStart("blockID");
jsonGen.writeNumberField("containerID", blockData.getBlockID().getContainerID());
jsonGen.writeNumberField("localID", blockData.getBlockID().getLocalID());
jsonGen.writeNumberField("blockCommitSequenceId", blockData.getBlockID().getBlockCommitSequenceId());
jsonGen.writeEndObject();

jsonGen.writeNumberField("size", blockData.getSize());

// Write chunks array
jsonGen.writeArrayFieldStart("chunks");
for (ContainerProtos.ChunkInfo chunk : blockData.getChunksList()) {
jsonGen.writeStartObject();
jsonGen.writeNumberField("offset", chunk.getOffset());
jsonGen.writeNumberField("len", chunk.getLen());

if (chunk.hasChecksumData()) {
jsonGen.writeArrayFieldStart("checksums");
for (ByteString bs : chunk.getChecksumData().getChecksumsList()) {
jsonGen.writeString(StringUtils.byteToHexString(bs.toByteArray()));
}

jsonGen.writeEndArray();
jsonGen.writeStringField("checksumType", chunk.getChecksumData().getType().name());
jsonGen.writeNumberField("bytesPerChecksum", chunk.getChecksumData().getBytesPerChecksum());
}

if (chunk.hasStripeChecksum()) {
byte[] stripeBytes = chunk.getStripeChecksum().toByteArray();
int checksumLen = chunk.getChecksumData().getChecksumsList().get(0).size();

jsonGen.writeArrayFieldStart("stripeChecksum");
for (int i = 0; i <= stripeBytes.length - checksumLen; i += checksumLen) {
byte[] slice = Arrays.copyOfRange(stripeBytes, i, i + checksumLen);
jsonGen.writeString(StringUtils.byteToHexString(slice));
}
jsonGen.writeEndArray();
}
jsonGen.writeEndObject();
}

ArrayNode stripeChecksums = chunkNode.putArray("stripeChecksum");
for (int i = 0; i <= stripeBytes.length - checksumLen; i += checksumLen) {
byte[] slice = Arrays.copyOfRange(stripeBytes, i, i + checksumLen);
stripeChecksums.add(StringUtils.byteToHexString(slice));
jsonGen.writeEndArray(); // End chunks array
jsonGen.writeEndObject(); // End blockData object

if (isECKey) {
int replicaIndex = keyPipeline.getReplicaIndex(datanodeDetails);
int dataCount = ((ECReplicationConfig) keyPipeline.getReplicationConfig()).getData();
// Index is 1-based,
// e.g. for RS-3-2 we will have data indexes 1,2,3 and parity indexes 4,5
ChunkType chunkType = (replicaIndex > dataCount) ? ChunkType.PARITY : ChunkType.DATA;
jsonGen.writeStringField("chunkType", chunkType.name());
jsonGen.writeNumberField("replicaIndex", replicaIndex);
}
}
}
jsonGen.writeEndObject(); // End this datanode's response object

if (isECKey) {
int replicaIndex = keyPipeline.getReplicaIndex(entry.getKey());
int dataCount = ((ECReplicationConfig) keyPipeline.getReplicationConfig()).getData();
// Index is 1-based,
// e.g. for RS-3-2 we will have data indexes 1,2,3 and parity indexes 4,5
ChunkType chunkType = (replicaIndex > dataCount) ? ChunkType.PARITY : ChunkType.DATA;
jsonObj.put("chunkType", chunkType.name());
jsonObj.put("replicaIndex", replicaIndex);
jsonGen.flush();
} catch (Exception e) {
System.err.printf("Error getting block from datanode %s: %s%n",
datanodeDetails.getHostName(), e.getMessage());
}
}
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
xceiverClientManager.releaseClientForReadData(xceiverClient, false);
}
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
xceiverClientManager.releaseClientForReadData(xceiverClient, false);

jsonGen.writeEndArray();
}

jsonGen.writeEndArray(); // End keyLocations array
jsonGen.writeEndObject(); // End root object
jsonGen.flush();
System.out.println();
}
String prettyJson = JsonUtils.toJsonStringWithDefaultPrettyPrinter(result);
System.out.println(prettyJson);
}
}
}