Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,14 @@
*/
package org.apache.hadoop.hdds.scm;

import java.io.Closeable;
import java.io.IOException;

import org.apache.hadoop.hdds.scm.pipeline.Pipeline;

/**
* Interface to provide XceiverClient when needed.
*/
public interface XceiverClientFactory extends Closeable {
public interface XceiverClientFactory extends AutoCloseable {

XceiverClientSpi acquireClient(Pipeline pipeline) throws IOException;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

package org.apache.hadoop.hdds.scm;

import java.io.Closeable;
import java.io.IOException;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -62,7 +61,7 @@
* without reestablishing connection. But the connection will be closed if
* not being used for a period of time.
*/
public class XceiverClientManager implements Closeable, XceiverClientFactory {
public class XceiverClientManager implements XceiverClientFactory {
private static final Logger LOG =
LoggerFactory.getLogger(XceiverClientManager.class);
//TODO : change this to SCM configuration class
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@

import org.slf4j.Logger;

import java.io.Closeable;
import java.util.Arrays;
import java.util.Collection;

Expand All @@ -40,11 +39,11 @@ private IOUtils() {
* null.
* @param closeables the objects to close
*/
public static void cleanupWithLogger(Logger logger, Closeable... closeables) {
public static void cleanupWithLogger(Logger logger, AutoCloseable... closeables) {
if (closeables == null) {
return;
}
for (Closeable c : closeables) {
for (AutoCloseable c : closeables) {
if (c != null) {
try {
c.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import org.apache.hadoop.hdds.scm.XceiverClientManager;
import org.apache.hadoop.hdds.scm.XceiverClientSpi;
import org.apache.hadoop.hdds.scm.container.replication.ReplicationManager.ReplicationManagerConfiguration;
import org.apache.hadoop.hdds.utils.IOUtils;
import org.apache.hadoop.ozone.HddsDatanodeService;
import org.apache.hadoop.ozone.MiniOzoneCluster;
import org.apache.hadoop.ozone.container.common.statemachine.DatanodeStateMachine;
Expand Down Expand Up @@ -88,13 +89,8 @@ static void setup() throws Exception {
}

@AfterAll
static void tearDown() throws IOException {
if (clientFactory != null) {
clientFactory.close();
}
if (cluster != null) {
cluster.shutdown();
}
static void tearDown() {
IOUtils.closeQuietly(clientFactory, cluster);
}

@ParameterizedTest
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,10 +67,6 @@
public class ChunkKeyHandler extends KeyHandler implements
SubcommandWithParent {

private XceiverClientManager xceiverClientManager;
private XceiverClientSpi xceiverClient;
private OzoneManagerProtocol ozoneManagerClient;

@CommandLine.ParentCommand
private OzoneDebug parent;

Expand All @@ -81,11 +77,9 @@ private String getChunkLocationPath(String containerLocation) {
@Override
protected void execute(OzoneClient client, OzoneAddress address)
throws IOException, OzoneClientException {
try (ContainerOperationClient containerOperationClient = new
ContainerOperationClient(parent.getOzoneConf())) {
xceiverClientManager = containerOperationClient.getXceiverClientManager();
ozoneManagerClient =
client.getObjectStore().getClientProxy().getOzoneManagerClient();
try (ContainerOperationClient containerOperationClient = new ContainerOperationClient(parent.getOzoneConf());
XceiverClientManager xceiverClientManager = containerOperationClient.getXceiverClientManager()) {
OzoneManagerProtocol ozoneManagerClient = client.getObjectStore().getClientProxy().getOzoneManagerClient();
address.ensureKeyAddress();
JsonElement element;
JsonObject result = new JsonObject();
Expand Down Expand Up @@ -127,80 +121,82 @@ protected void execute(OzoneClient client, OzoneAddress address)
} else {
pipeline = keyPipeline;
}
xceiverClient = xceiverClientManager.acquireClientForReadData(pipeline);
// Datanode is queried to get chunk information.Thus querying the
// OM,SCM and datanode helps us get chunk location information
ContainerProtos.DatanodeBlockID datanodeBlockID =
keyLocation.getBlockID().getDatanodeBlockIDProtobuf();
// doing a getBlock on all nodes
Map<DatanodeDetails, ContainerProtos.GetBlockResponseProto>
responses = null;
Map<DatanodeDetails, ContainerProtos.ReadContainerResponseProto>
readContainerResponses = null;
XceiverClientSpi xceiverClient = xceiverClientManager.acquireClientForReadData(pipeline);
try {
responses = ContainerProtocolCalls.getBlockFromAllNodes(xceiverClient,
datanodeBlockID, keyLocation.getToken());
readContainerResponses =
containerOperationClient.readContainerFromAllNodes(
keyLocation.getContainerID(), pipeline);
} catch (InterruptedException e) {
LOG.error("Execution interrupted due to " + e);
Thread.currentThread().interrupt();
}
JsonArray responseFromAllNodes = new JsonArray();
for (Map.Entry<DatanodeDetails, ContainerProtos.GetBlockResponseProto>
entry : responses.entrySet()) {
chunkPaths.clear();
JsonObject jsonObj = new JsonObject();
if (entry.getValue() == null) {
LOG.error("Cant execute getBlock on this node");
continue;
}
tempchunks = entry.getValue().getBlockData().getChunksList();
ContainerProtos.ContainerDataProto containerData =
readContainerResponses.get(entry.getKey()).getContainerData();
for (ContainerProtos.ChunkInfo chunkInfo : tempchunks) {
String fileName = containerLayoutVersion.getChunkFile(new File(
getChunkLocationPath(containerData.getContainerPath())),
keyLocation.getBlockID(),
ChunkInfo.getFromProtoBuf(chunkInfo)).toString();
chunkPaths.add(fileName);
ChunkDetails chunkDetails = new ChunkDetails();
chunkDetails.setChunkName(fileName);
chunkDetails.setChunkOffset(chunkInfo.getOffset());
chunkDetailsList.add(chunkDetails);
}
containerChunkInfoVerbose.setContainerPath(containerData
.getContainerPath());
containerChunkInfoVerbose.setPipeline(keyPipeline);
containerChunkInfoVerbose.setChunkInfos(chunkDetailsList);
containerChunkInfo.setFiles(chunkPaths);
containerChunkInfo.setPipelineID(keyPipeline.getId().getId());
if (isECKey) {
ChunkType blockChunksType =
isECParityBlock(keyPipeline, entry.getKey()) ?
ChunkType.PARITY : ChunkType.DATA;
containerChunkInfoVerbose.setChunkType(blockChunksType);
containerChunkInfo.setChunkType(blockChunksType);
// Datanode is queried to get chunk information.Thus querying the
// OM,SCM and datanode helps us get chunk location information
ContainerProtos.DatanodeBlockID datanodeBlockID =
keyLocation.getBlockID().getDatanodeBlockIDProtobuf();
// doing a getBlock on all nodes
Map<DatanodeDetails, ContainerProtos.GetBlockResponseProto>
responses = null;
Map<DatanodeDetails, ContainerProtos.ReadContainerResponseProto>
readContainerResponses = null;
try {
responses = ContainerProtocolCalls.getBlockFromAllNodes(xceiverClient,
datanodeBlockID, keyLocation.getToken());
readContainerResponses =
containerOperationClient.readContainerFromAllNodes(
keyLocation.getContainerID(), pipeline);
} catch (InterruptedException e) {
LOG.error("Execution interrupted due to " + e);
Thread.currentThread().interrupt();
}
Gson gson = new GsonBuilder().create();
if (isVerbose()) {
element = gson.toJsonTree(containerChunkInfoVerbose);
} else {
element = gson.toJsonTree(containerChunkInfo);
JsonArray responseFromAllNodes = new JsonArray();
for (Map.Entry<DatanodeDetails, ContainerProtos.GetBlockResponseProto>
entry : responses.entrySet()) {
chunkPaths.clear();
JsonObject jsonObj = new JsonObject();
if (entry.getValue() == null) {
LOG.error("Cant execute getBlock on this node");
continue;
}
tempchunks = entry.getValue().getBlockData().getChunksList();
ContainerProtos.ContainerDataProto containerData =
readContainerResponses.get(entry.getKey()).getContainerData();
for (ContainerProtos.ChunkInfo chunkInfo : tempchunks) {
String fileName = containerLayoutVersion.getChunkFile(new File(
getChunkLocationPath(containerData.getContainerPath())),
keyLocation.getBlockID(),
ChunkInfo.getFromProtoBuf(chunkInfo)).toString();
chunkPaths.add(fileName);
ChunkDetails chunkDetails = new ChunkDetails();
chunkDetails.setChunkName(fileName);
chunkDetails.setChunkOffset(chunkInfo.getOffset());
chunkDetailsList.add(chunkDetails);
}
containerChunkInfoVerbose.setContainerPath(containerData
.getContainerPath());
containerChunkInfoVerbose.setPipeline(keyPipeline);
containerChunkInfoVerbose.setChunkInfos(chunkDetailsList);
containerChunkInfo.setFiles(chunkPaths);
containerChunkInfo.setPipelineID(keyPipeline.getId().getId());
if (isECKey) {
ChunkType blockChunksType =
isECParityBlock(keyPipeline, entry.getKey()) ?
ChunkType.PARITY : ChunkType.DATA;
containerChunkInfoVerbose.setChunkType(blockChunksType);
containerChunkInfo.setChunkType(blockChunksType);
}
Gson gson = new GsonBuilder().create();
if (isVerbose()) {
element = gson.toJsonTree(containerChunkInfoVerbose);
} else {
element = gson.toJsonTree(containerChunkInfo);
}
jsonObj.addProperty("Datanode-HostName", entry.getKey()
.getHostName());
jsonObj.addProperty("Datanode-IP", entry.getKey()
.getIpAddress());
jsonObj.addProperty("Container-ID", containerId);
jsonObj.addProperty("Block-ID", keyLocation.getLocalID());
jsonObj.add("Locations", element);
responseFromAllNodes.add(jsonObj);
}
jsonObj.addProperty("Datanode-HostName", entry.getKey()
.getHostName());
jsonObj.addProperty("Datanode-IP", entry.getKey()
.getIpAddress());
jsonObj.addProperty("Container-ID", containerId);
jsonObj.addProperty("Block-ID", keyLocation.getLocalID());
jsonObj.add("Locations", element);
responseFromAllNodes.add(jsonObj);
responseArrayList.add(responseFromAllNodes);
} finally {
xceiverClientManager.releaseClientForReadData(xceiverClient, false);
}
responseArrayList.add(responseFromAllNodes);
xceiverClientManager.releaseClientForReadData(xceiverClient, false);
xceiverClient = null;
}
result.add("KeyLocations", responseArrayList);
Gson gson2 = new GsonBuilder().setPrettyPrinting().create();
Expand Down