Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@
import org.slf4j.LoggerFactory;

import static org.apache.hadoop.hdds.HddsUtils.processForDebug;
import static org.apache.hadoop.hdds.scm.storage.ContainerProtocolCalls.getContainerCommandRequestProtoBuilder;

/**
* {@link XceiverClientSpi} implementation, the standalone client.
Expand Down Expand Up @@ -337,8 +338,7 @@ private XceiverClientReply sendCommandWithTraceIDAndRetry(

return TracingUtil.executeInNewSpan(spanName,
() -> {
ContainerCommandRequestProto finalPayload =
ContainerCommandRequestProto.newBuilder(request)
ContainerCommandRequestProto finalPayload = getContainerCommandRequestProtoBuilder(request)
.setTraceID(TracingUtil.exportCurrentSpan()).build();
return sendCommandWithRetry(finalPayload, validators);
});
Expand Down Expand Up @@ -490,8 +490,7 @@ public XceiverClientReply sendCommandAsync(

try (Scope ignored = GlobalTracer.get().activateSpan(span)) {

ContainerCommandRequestProto finalPayload =
ContainerCommandRequestProto.newBuilder(request)
ContainerCommandRequestProto finalPayload = getContainerCommandRequestProtoBuilder(request)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also set version conditionally here.

.setTraceID(TracingUtil.exportCurrentSpan())
.build();
XceiverClientReply asyncReply =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;

import static org.apache.hadoop.hdds.scm.storage.ContainerProtocolCalls.getContainerCommandRequestProtoBuilder;
import static org.apache.hadoop.hdds.scm.storage.ContainerProtocolCalls.putBlockAsync;

/**
Expand Down Expand Up @@ -204,7 +205,7 @@ private DataStreamOutput setupStream(Pipeline pipeline) throws IOException {
// it or remove it completely if possible
String id = pipeline.getFirstNode().getUuidString();
ContainerProtos.ContainerCommandRequestProto.Builder builder =
ContainerProtos.ContainerCommandRequestProto.newBuilder()
getContainerCommandRequestProtoBuilder()
.setCmdType(ContainerProtos.Type.StreamInit)
.setContainerID(blockID.get().getContainerID())
.setDatanodeUuid(id).setWriteChunk(writeChunkRequest);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,15 +32,16 @@
import org.apache.ratis.thirdparty.com.google.protobuf.InvalidProtocolBufferException;
import org.apache.ratis.util.JavaUtils;

import static org.apache.hadoop.hdds.scm.storage.ContainerProtocolCalls.getContainerCommandRequestProtoBuilder;

/**
* Implementing the {@link Message} interface
* for {@link ContainerCommandRequestProto}.
*/
public final class ContainerCommandRequestMessage implements Message {
public static ContainerCommandRequestMessage toMessage(
ContainerCommandRequestProto request, String traceId) {
final ContainerCommandRequestProto.Builder b
= ContainerCommandRequestProto.newBuilder(request);
final ContainerCommandRequestProto.Builder b = getContainerCommandRequestProtoBuilder(request);
if (traceId != null) {
b.setTraceID(traceId);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.security.token.OzoneBlockTokenIdentifier;
import org.apache.hadoop.hdds.tracing.TracingUtil;
import org.apache.hadoop.ozone.ClientVersion;
import org.apache.hadoop.ozone.common.Checksum;
import org.apache.hadoop.ozone.common.ChecksumData;
import org.apache.hadoop.security.token.Token;
Expand Down Expand Up @@ -97,6 +98,28 @@ public final class ContainerProtocolCalls {
private ContainerProtocolCalls() {
}

/**
* Creates a ContainerCommandRequestProto with version set.
*/
public static ContainerCommandRequestProto.Builder getContainerCommandRequestProtoBuilder(int version) {
return getContainerCommandRequestProtoBuilder(null, version);
}

public static ContainerCommandRequestProto.Builder getContainerCommandRequestProtoBuilder() {
return getContainerCommandRequestProtoBuilder(ClientVersion.CURRENT.toProtoValue());
}

public static ContainerCommandRequestProto.Builder getContainerCommandRequestProtoBuilder(
ContainerCommandRequestProto req, int version) {
return (req == null ?
ContainerCommandRequestProto.newBuilder() : ContainerCommandRequestProto.newBuilder(req)).setVersion(version);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

req may already have version set. newBuilder(req) preserves that in the builder, but setVersion overwrites it. This is a problem if called via getContainerCommandRequestProtoBuilder(req), without explicit version.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We would have to build an object again there. Is it something we should do?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If that is the case we should just send a builder than sending the request object. That is also one viable change.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

or we can just add a validation check and throw an exception if the version is not set.


public static ContainerCommandRequestProto.Builder getContainerCommandRequestProtoBuilder(
ContainerCommandRequestProto req) {
return getContainerCommandRequestProtoBuilder(req, ClientVersion.CURRENT.toProtoValue());
}

/**
* Calls the container protocol to list blocks in container.
*
Expand Down Expand Up @@ -125,7 +148,7 @@ public static ListBlockResponseProto listBlock(XceiverClientSpi xceiverClient,
xceiverClient.getPipeline().getFirstNode().getUuidString();

ContainerCommandRequestProto.Builder builder =
ContainerCommandRequestProto.newBuilder()
getContainerCommandRequestProtoBuilder()
.setCmdType(Type.ListBlock)
.setContainerID(containerID)
.setDatanodeUuid(datanodeID)
Expand Down Expand Up @@ -195,8 +218,7 @@ public static GetBlockResponseProto getBlock(XceiverClientSpi xceiverClient,
GetBlockRequestProto.Builder readBlockRequest = GetBlockRequestProto
.newBuilder()
.setBlockID(datanodeBlockID);
ContainerCommandRequestProto.Builder builder = ContainerCommandRequestProto
.newBuilder()
ContainerCommandRequestProto.Builder builder = getContainerCommandRequestProtoBuilder()
.setCmdType(Type.GetBlock)
.setContainerID(datanodeBlockID.getContainerID())
.setGetBlock(readBlockRequest);
Expand Down Expand Up @@ -255,7 +277,7 @@ private static GetBlockResponseProto getBlock(XceiverClientSpi xceiverClient,
setBlockID(blockID.getDatanodeBlockIDProtobuf());
String id = xceiverClient.getPipeline().getFirstNode().getUuidString();
ContainerCommandRequestProto.Builder builder =
ContainerCommandRequestProto.newBuilder()
getContainerCommandRequestProtoBuilder()
.setCmdType(Type.GetCommittedBlockLength)
.setContainerID(blockID.getContainerID())
.setDatanodeUuid(id)
Expand Down Expand Up @@ -301,11 +323,11 @@ public static ContainerCommandRequestProto getPutBlockRequest(
.setBlockData(containerBlockData)
.setEof(eof);
final String id = pipeline.getFirstNode().getUuidString();
ContainerCommandRequestProto.Builder builder =
ContainerCommandRequestProto.newBuilder().setCmdType(Type.PutBlock)
.setContainerID(containerBlockData.getBlockID().getContainerID())
.setDatanodeUuid(id)
.setPutBlock(createBlockRequest);
ContainerCommandRequestProto.Builder builder = getContainerCommandRequestProtoBuilder()
.setCmdType(Type.PutBlock)
.setContainerID(containerBlockData.getBlockID().getContainerID())
.setDatanodeUuid(id)
.setPutBlock(createBlockRequest);
if (tokenString != null) {
builder.setEncodedToken(tokenString);
}
Expand Down Expand Up @@ -333,7 +355,7 @@ public static ContainerProtos.ReadChunkResponseProto readChunk(
.setChunkData(chunk)
.setReadChunkVersion(ContainerProtos.ReadChunkVersion.V1);
ContainerCommandRequestProto.Builder builder =
ContainerCommandRequestProto.newBuilder().setCmdType(Type.ReadChunk)
getContainerCommandRequestProtoBuilder().setCmdType(Type.ReadChunk)
.setContainerID(blockID.getContainerID())
.setReadChunk(readChunkRequest);
if (token != null) {
Expand Down Expand Up @@ -421,7 +443,7 @@ public static XceiverClientReply writeChunkAsync(
.setData(data);
String id = xceiverClient.getPipeline().getFirstNode().getUuidString();
ContainerCommandRequestProto.Builder builder =
ContainerCommandRequestProto.newBuilder()
getContainerCommandRequestProtoBuilder()
.setCmdType(Type.WriteChunk)
.setContainerID(blockID.getContainerID())
.setDatanodeUuid(id)
Expand Down Expand Up @@ -479,7 +501,7 @@ public static PutSmallFileResponseProto writeSmallFile(

String id = client.getPipeline().getFirstNode().getUuidString();
ContainerCommandRequestProto.Builder builder =
ContainerCommandRequestProto.newBuilder()
getContainerCommandRequestProtoBuilder()
.setCmdType(Type.PutSmallFile)
.setContainerID(blockID.getContainerID())
.setDatanodeUuid(id)
Expand Down Expand Up @@ -546,7 +568,7 @@ public static void createContainer(XceiverClientSpi client,

String id = client.getPipeline().getFirstNode().getUuidString();
ContainerCommandRequestProto.Builder request =
ContainerCommandRequestProto.newBuilder();
getContainerCommandRequestProtoBuilder();
if (encodedToken != null) {
request.setEncodedToken(encodedToken);
}
Expand Down Expand Up @@ -576,7 +598,7 @@ public static void deleteContainer(XceiverClientSpi client, long containerID,
String id = client.getPipeline().getFirstNode().getUuidString();

ContainerCommandRequestProto.Builder request =
ContainerCommandRequestProto.newBuilder();
getContainerCommandRequestProtoBuilder();
request.setCmdType(ContainerProtos.Type.DeleteContainer);
request.setContainerID(containerID);
request.setDeleteContainer(deleteRequest);
Expand All @@ -602,7 +624,7 @@ public static void closeContainer(XceiverClientSpi client,
String id = client.getPipeline().getFirstNode().getUuidString();

ContainerCommandRequestProto.Builder request =
ContainerCommandRequestProto.newBuilder();
getContainerCommandRequestProtoBuilder();
request.setCmdType(Type.CloseContainer);
request.setContainerID(containerID);
request.setCloseContainer(CloseContainerRequestProto.getDefaultInstance());
Expand All @@ -629,7 +651,7 @@ public static ReadContainerResponseProto readContainer(
String id = client.getPipeline().getFirstNode().getUuidString();

ContainerCommandRequestProto.Builder request =
ContainerCommandRequestProto.newBuilder();
getContainerCommandRequestProtoBuilder();
request.setCmdType(Type.ReadContainer);
request.setContainerID(containerID);
request.setReadContainer(ReadContainerRequestProto.getDefaultInstance());
Expand Down Expand Up @@ -667,8 +689,7 @@ public static GetSmallFileResponseProto readSmallFile(XceiverClientSpi client,
.build();
String id = client.getPipeline().getClosestNode().getUuidString();

ContainerCommandRequestProto.Builder builder = ContainerCommandRequestProto
.newBuilder()
ContainerCommandRequestProto.Builder builder = getContainerCommandRequestProtoBuilder()
.setCmdType(Type.GetSmallFile)
.setContainerID(blockID.getContainerID())
.setDatanodeUuid(id)
Expand Down Expand Up @@ -778,8 +799,7 @@ public static List<Validator> toValidatorList(Validator validator) {
HashMap<DatanodeDetails, GetBlockResponseProto> datanodeToResponseMap
= new HashMap<>();
String id = xceiverClient.getPipeline().getFirstNode().getUuidString();
ContainerCommandRequestProto.Builder builder = ContainerCommandRequestProto
.newBuilder()
ContainerCommandRequestProto.Builder builder = getContainerCommandRequestProtoBuilder()
.setCmdType(Type.GetBlock)
.setContainerID(datanodeBlockID.getContainerID())
.setDatanodeUuid(id)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
/**
* Versioning for protocol clients.
*/
public enum ClientVersion implements ComponentVersion {
public enum ClientVersion implements ComponentVersion, Comparable<ClientVersion> {

DEFAULT_VERSION(0, "Initial version"),

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@

import static org.junit.jupiter.api.Assertions.assertEquals;

import static org.apache.hadoop.hdds.scm.storage.ContainerProtocolCalls.getContainerCommandRequestProtoBuilder;

/** Testing {@link ContainerCommandRequestMessage}. */
public class TestContainerCommandRequestMessage {
static final Random RANDOM = new Random();
Expand Down Expand Up @@ -86,7 +88,7 @@ static ContainerCommandRequestProto newPutSmallFile(
.setBlock(putBlockRequest)
.setData(data)
.build();
return ContainerCommandRequestProto.newBuilder()
return getContainerCommandRequestProtoBuilder()
.setCmdType(Type.PutSmallFile)
.setContainerID(blockID.getContainerID())
.setDatanodeUuid(UUID.randomUUID().toString())
Expand All @@ -108,7 +110,7 @@ static ContainerCommandRequestProto newWriteChunk(
.setBlockID(blockID.getDatanodeBlockIDProtobuf())
.setChunkData(chunk)
.setData(data);
return ContainerCommandRequestProto.newBuilder()
return getContainerCommandRequestProtoBuilder()
.setCmdType(Type.WriteChunk)
.setContainerID(blockID.getContainerID())
.setDatanodeUuid(UUID.randomUUID().toString())
Expand Down
Loading