Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@
import org.apache.hadoop.hdds.conf.ConfigurationException;
import org.apache.hadoop.hdds.conf.ConfigurationSource;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandRequestProto;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandRequestProtoOrBuilder;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.hdds.scm.ha.SCMHAUtils;
import org.apache.hadoop.hdds.scm.ha.SCMNodeInfo;
Expand Down Expand Up @@ -354,7 +354,7 @@ public static String getHostName(ConfigurationSource conf)
* @return True if its readOnly , false otherwise.
*/
public static boolean isReadOnly(
ContainerProtos.ContainerCommandRequestProto proto) {
ContainerCommandRequestProtoOrBuilder proto) {
switch (proto.getCmdType()) {
case ReadContainer:
case ReadChunk:
Expand Down Expand Up @@ -394,12 +394,15 @@ public static boolean isReadOnly(
public static boolean requireBlockToken(
ContainerProtos.Type cmdType) {
switch (cmdType) {
case ReadChunk:
case DeleteBlock:
case DeleteChunk:
case GetBlock:
case WriteChunk:
case GetCommittedBlockLength:
case GetSmallFile:
case PutBlock:
case PutSmallFile:
case GetSmallFile:
case ReadChunk:
case WriteChunk:
return true;
default:
return false;
Expand All @@ -412,7 +415,6 @@ public static boolean requireContainerToken(
case CloseContainer:
case CreateContainer:
case DeleteContainer:
case ListContainer:
case ReadContainer:
case UpdateContainer:
return true;
Expand All @@ -426,44 +428,66 @@ public static boolean requireContainerToken(
* @param msg container command
* @return block ID.
*/
public static BlockID getBlockID(ContainerCommandRequestProto msg) {
public static BlockID getBlockID(ContainerCommandRequestProtoOrBuilder msg) {
ContainerProtos.DatanodeBlockID blockID = null;
switch (msg.getCmdType()) {
case ReadChunk:
if (msg.hasReadChunk()) {
return BlockID.getFromProtobuf(msg.getReadChunk().getBlockID());
case DeleteBlock:
if (msg.hasDeleteBlock()) {
blockID = msg.getDeleteBlock().getBlockID();
}
return null;
break;
case DeleteChunk:
if (msg.hasDeleteChunk()) {
blockID = msg.getDeleteChunk().getBlockID();
}
break;
case GetBlock:
if (msg.hasGetBlock()) {
return BlockID.getFromProtobuf(msg.getGetBlock().getBlockID());
blockID = msg.getGetBlock().getBlockID();
}
return null;
case WriteChunk:
if (msg.hasWriteChunk()) {
return BlockID.getFromProtobuf(msg.getWriteChunk().getBlockID());
break;
case GetCommittedBlockLength:
if (msg.hasGetCommittedBlockLength()) {
blockID = msg.getGetCommittedBlockLength().getBlockID();
}
return null;
break;
case GetSmallFile:
if (msg.hasGetSmallFile()) {
blockID = msg.getGetSmallFile().getBlock().getBlockID();
}
break;
case ListChunk:
if (msg.hasListChunk()) {
blockID = msg.getListChunk().getBlockID();
}
break;
case PutBlock:
if (msg.hasPutBlock()) {
return BlockID.getFromProtobuf(msg.getPutBlock().getBlockData()
.getBlockID());
blockID = msg.getPutBlock().getBlockData().getBlockID();
}
return null;
break;
case PutSmallFile:
if (msg.hasPutSmallFile()) {
return BlockID.getFromProtobuf(msg.getPutSmallFile().getBlock()
.getBlockData().getBlockID());
blockID = msg.getPutSmallFile().getBlock().getBlockData().getBlockID();
}
return null;
case GetSmallFile:
if (msg.hasGetSmallFile()) {
return BlockID.getFromProtobuf(msg.getGetSmallFile().getBlock()
.getBlockID());
break;
case ReadChunk:
if (msg.hasReadChunk()) {
blockID = msg.getReadChunk().getBlockID();
}
return null;
break;
case WriteChunk:
if (msg.hasWriteChunk()) {
blockID = msg.getWriteChunk().getBlockID();
}
break;
default:
return null;
break;
}

return blockID != null
? BlockID.getFromProtobuf(blockID)
: null;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandRequestProto;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandRequestProto.Builder;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandResponseProto;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.DatanodeBlockID;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.KeyValue;
import org.apache.hadoop.hdds.scm.pipeline.MockPipeline;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
Expand Down Expand Up @@ -135,6 +136,17 @@ public static ContainerCommandRequestProto getWriteChunkRequest(
public static ContainerCommandRequestProto getWriteChunkRequest(
Pipeline pipeline, BlockID blockID, int datalen, int seq, String token)
throws IOException {
Builder builder = newWriteChunkRequestBuilder(pipeline, blockID, datalen,
seq);
if (!Strings.isNullOrEmpty(token)) {
builder.setEncodedToken(token);
}
return builder.build();
}

public static Builder newWriteChunkRequestBuilder(
Pipeline pipeline, BlockID blockID, int datalen, int seq)
throws IOException {
LOG.trace("writeChunk {} (blockID={}) to pipeline={}",
datalen, blockID, pipeline);
ContainerProtos.WriteChunkRequestProto.Builder writeRequest =
Expand All @@ -156,11 +168,8 @@ public static ContainerCommandRequestProto getWriteChunkRequest(
request.setContainerID(blockID.getContainerID());
request.setWriteChunk(writeRequest);
request.setDatanodeUuid(pipeline.getFirstNode().getUuidString());
if (!Strings.isNullOrEmpty(token)) {
request.setEncodedToken(token);
}

return request.build();
return request;
}

/**
Expand Down Expand Up @@ -225,19 +234,25 @@ public static ContainerCommandRequestProto getReadSmallFileRequest(
* Returns a read Request.
*
* @param pipeline pipeline.
* @param request writeChunkRequest.
* @param writeChunk writeChunkRequest.
* @return Request.
*/
public static ContainerCommandRequestProto getReadChunkRequest(
Pipeline pipeline, ContainerProtos.WriteChunkRequestProto request)
Pipeline pipeline, ContainerProtos.WriteChunkRequestProto writeChunk)
throws IOException {
return newReadChunkRequestBuilder(pipeline, writeChunk).build();
}

public static Builder newReadChunkRequestBuilder(Pipeline pipeline,
ContainerProtos.WriteChunkRequestProtoOrBuilder writeChunk)
throws IOException {
LOG.trace("readChunk blockID={} from pipeline={}",
request.getBlockID(), pipeline);
writeChunk.getBlockID(), pipeline);

ContainerProtos.ReadChunkRequestProto.Builder readRequest =
ContainerProtos.ReadChunkRequestProto.newBuilder();
readRequest.setBlockID(request.getBlockID());
readRequest.setChunkData(request.getChunkData());
readRequest.setBlockID(writeChunk.getBlockID());
readRequest.setChunkData(writeChunk.getChunkData());
readRequest.setReadChunkVersion(ContainerProtos.ReadChunkVersion.V1);

Builder newRequest =
Expand All @@ -246,7 +261,7 @@ public static ContainerCommandRequestProto getReadChunkRequest(
newRequest.setContainerID(readRequest.getBlockID().getContainerID());
newRequest.setReadChunk(readRequest);
newRequest.setDatanodeUuid(pipeline.getFirstNode().getUuidString());
return newRequest.build();
return newRequest;
}

/**
Expand All @@ -259,6 +274,12 @@ public static ContainerCommandRequestProto getReadChunkRequest(
public static ContainerCommandRequestProto getDeleteChunkRequest(
Pipeline pipeline, ContainerProtos.WriteChunkRequestProto writeRequest)
throws IOException {
return newDeleteChunkRequestBuilder(pipeline, writeRequest).build();
}

public static Builder newDeleteChunkRequestBuilder(Pipeline pipeline,
ContainerProtos.WriteChunkRequestProtoOrBuilder writeRequest)
throws IOException {
LOG.trace("deleteChunk blockID={} from pipeline={}",
writeRequest.getBlockID(), pipeline);

Expand All @@ -275,7 +296,7 @@ public static ContainerCommandRequestProto getDeleteChunkRequest(
request.setContainerID(writeRequest.getBlockID().getContainerID());
request.setDeleteChunk(deleteRequest);
request.setDatanodeUuid(pipeline.getFirstNode().getUuidString());
return request.build();
return request;
}

/**
Expand Down Expand Up @@ -404,8 +425,18 @@ public static ContainerCommandRequestProto getPutBlockRequest(
Pipeline pipeline, String token,
ContainerProtos.WriteChunkRequestProto writeRequest)
throws IOException {
LOG.trace("putBlock: {} to pipeline={} with token {}",
writeRequest.getBlockID(), pipeline, token);
Builder builder = newPutBlockRequestBuilder(pipeline, writeRequest);
if (!Strings.isNullOrEmpty(token)) {
builder.setEncodedToken(token);
}
return builder.build();
}

public static Builder newPutBlockRequestBuilder(Pipeline pipeline,
ContainerProtos.WriteChunkRequestProtoOrBuilder writeRequest)
throws IOException {
LOG.trace("putBlock: {} to pipeline={}",
writeRequest.getBlockID(), pipeline);

ContainerProtos.PutBlockRequestProto.Builder putRequest =
ContainerProtos.PutBlockRequestProto.newBuilder();
Expand All @@ -424,10 +455,7 @@ public static ContainerCommandRequestProto getPutBlockRequest(
request.setContainerID(blockData.getContainerID());
request.setPutBlock(putRequest);
request.setDatanodeUuid(pipeline.getFirstNode().getUuidString());
if (!Strings.isNullOrEmpty(token)) {
request.setEncodedToken(token);
}
return request.build();
return request;
}

/**
Expand All @@ -440,9 +468,13 @@ public static ContainerCommandRequestProto getPutBlockRequest(
public static ContainerCommandRequestProto getBlockRequest(
Pipeline pipeline, ContainerProtos.PutBlockRequestProto putBlockRequest)
throws IOException {
ContainerProtos.DatanodeBlockID blockID =
putBlockRequest.getBlockData().getBlockID();
LOG.trace("getKey: blockID={}", blockID);
return newGetBlockRequestBuilder(pipeline, putBlockRequest).build();
}

public static Builder newGetBlockRequestBuilder(
Pipeline pipeline, ContainerProtos.PutBlockRequestProtoOrBuilder putBlock)
throws IOException {
DatanodeBlockID blockID = putBlock.getBlockData().getBlockID();

ContainerProtos.GetBlockRequestProto.Builder getRequest =
ContainerProtos.GetBlockRequestProto.newBuilder();
Expand All @@ -454,7 +486,7 @@ public static ContainerCommandRequestProto getBlockRequest(
request.setContainerID(blockID.getContainerID());
request.setGetBlock(getRequest);
request.setDatanodeUuid(pipeline.getFirstNode().getUuidString());
return request.build();
return request;
}

/**
Expand All @@ -478,8 +510,13 @@ public static void verifyGetBlock(ContainerCommandRequestProto request,
public static ContainerCommandRequestProto getDeleteBlockRequest(
Pipeline pipeline, ContainerProtos.PutBlockRequestProto putBlockRequest)
throws IOException {
ContainerProtos.DatanodeBlockID blockID = putBlockRequest.getBlockData()
.getBlockID();
return newDeleteBlockRequestBuilder(pipeline, putBlockRequest).build();
}

public static Builder newDeleteBlockRequestBuilder(Pipeline pipeline,
ContainerProtos.PutBlockRequestProtoOrBuilder putBlockRequest)
throws IOException {
DatanodeBlockID blockID = putBlockRequest.getBlockData().getBlockID();
LOG.trace("deleteBlock: name={}", blockID);
ContainerProtos.DeleteBlockRequestProto.Builder delRequest =
ContainerProtos.DeleteBlockRequestProto.newBuilder();
Expand All @@ -490,7 +527,23 @@ public static ContainerCommandRequestProto getDeleteBlockRequest(
request.setContainerID(blockID.getContainerID());
request.setDeleteBlock(delRequest);
request.setDatanodeUuid(pipeline.getFirstNode().getUuidString());
return request.build();
return request;
}

public static Builder newGetCommittedBlockLengthBuilder(Pipeline pipeline,
ContainerProtos.PutBlockRequestProtoOrBuilder putBlock)
throws IOException {
DatanodeBlockID blockID = putBlock.getBlockData().getBlockID();

ContainerProtos.GetCommittedBlockLengthRequestProto.Builder req =
ContainerProtos.GetCommittedBlockLengthRequestProto.newBuilder()
.setBlockID(blockID);

return ContainerCommandRequestProto.newBuilder()
.setCmdType(ContainerProtos.Type.GetCommittedBlockLength)
.setContainerID(blockID.getContainerID())
.setDatanodeUuid(pipeline.getFirstNode().getUuidString())
.setGetCommittedBlockLength(req);
}

/**
Expand Down
Loading