diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java index 2b5854ca2086..4fb661cbffbb 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java @@ -52,6 +52,7 @@ import org.apache.hadoop.hdds.security.exception.SCMSecurityException; import org.apache.hadoop.hdds.tracing.GrpcClientInterceptor; import org.apache.hadoop.hdds.tracing.TracingUtil; +import org.apache.hadoop.ozone.ClientVersion; import org.apache.hadoop.ozone.OzoneConfigKeys; import org.apache.hadoop.ozone.OzoneConsts; import java.util.concurrent.TimeoutException; @@ -277,6 +278,11 @@ public ContainerCommandResponseProto sendCommand( List datanodeList = pipeline.getNodes(); HashMap> futureHashMap = new HashMap<>(); + if (!request.hasVersion()) { + ContainerCommandRequestProto.Builder builder = ContainerCommandRequestProto.newBuilder(request); + builder.setVersion(ClientVersion.CURRENT.toProtoValue()); + request = builder.build(); + } for (DatanodeDetails dn : datanodeList) { try { futureHashMap.put(dn, sendCommandAsync(request, dn).getResponse()); @@ -337,10 +343,13 @@ private XceiverClientReply sendCommandWithTraceIDAndRetry( return TracingUtil.executeInNewSpan(spanName, () -> { - ContainerCommandRequestProto finalPayload = + ContainerCommandRequestProto.Builder builder = ContainerCommandRequestProto.newBuilder(request) - .setTraceID(TracingUtil.exportCurrentSpan()).build(); - return sendCommandWithRetry(finalPayload, validators); + .setTraceID(TracingUtil.exportCurrentSpan()); + if (!request.hasVersion()) { + builder.setVersion(ClientVersion.CURRENT.toProtoValue()); + } + return sendCommandWithRetry(builder.build(), validators); }); } @@ -490,12 +499,14 @@ public XceiverClientReply sendCommandAsync( try (Scope ignored = GlobalTracer.get().activateSpan(span)) { - ContainerCommandRequestProto finalPayload = + ContainerCommandRequestProto.Builder builder = ContainerCommandRequestProto.newBuilder(request) - .setTraceID(TracingUtil.exportCurrentSpan()) - .build(); + .setTraceID(TracingUtil.exportCurrentSpan()); + if (!request.hasVersion()) { + builder.setVersion(ClientVersion.CURRENT.toProtoValue()); + } XceiverClientReply asyncReply = - sendCommandAsync(finalPayload, pipeline.getFirstNode()); + sendCommandAsync(builder.build(), pipeline.getFirstNode()); if (shouldBlockAndWaitAsyncReply(request)) { asyncReply.getResponse().get(); } diff --git a/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestBlockOutputStreamCorrectness.java b/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestBlockOutputStreamCorrectness.java index bf4830c6fcb5..df55b5bf57ae 100644 --- a/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestBlockOutputStreamCorrectness.java +++ b/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestBlockOutputStreamCorrectness.java @@ -47,6 +47,7 @@ import org.apache.hadoop.hdds.scm.pipeline.MockPipeline; import org.apache.hadoop.hdds.scm.pipeline.Pipeline; import org.apache.hadoop.hdds.scm.pipeline.PipelineID; +import org.apache.hadoop.ozone.ClientVersion; import org.apache.hadoop.ozone.OzoneConsts; import org.apache.hadoop.ozone.container.common.helpers.BlockData; import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo; @@ -228,9 +229,12 @@ public Pipeline getPipeline() { } @Override - public XceiverClientReply sendCommandAsync( - ContainerCommandRequestProto request - ) { + public XceiverClientReply sendCommandAsync(ContainerCommandRequestProto request) { + + if (!request.hasVersion()) { + request = ContainerCommandRequestProto.newBuilder(request) + .setVersion(ClientVersion.CURRENT.toProtoValue()).build(); + } final ContainerCommandResponseProto.Builder builder = ContainerCommandResponseProto.newBuilder() .setResult(Result.SUCCESS) diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/ratis/ContainerCommandRequestMessage.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/ratis/ContainerCommandRequestMessage.java index e1ebde25198c..7ae6e7859046 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/ratis/ContainerCommandRequestMessage.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/ratis/ContainerCommandRequestMessage.java @@ -24,6 +24,7 @@ import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.PutSmallFileRequestProto; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Type; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.WriteChunkRequestProto; +import org.apache.hadoop.ozone.ClientVersion; import org.apache.hadoop.ozone.common.Checksum; import org.apache.ratis.protocol.Message; @@ -44,6 +45,9 @@ public static ContainerCommandRequestMessage toMessage( if (traceId != null) { b.setTraceID(traceId); } + if (!request.hasVersion()) { + b.setVersion(ClientVersion.CURRENT.toProtoValue()); + } ByteString data = ByteString.EMPTY; if (request.getCmdType() == Type.WriteChunk) { diff --git a/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/ratis/TestContainerCommandRequestMessage.java b/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/ratis/TestContainerCommandRequestMessage.java index ef65335a6c01..05fc9cb40b0b 100644 --- a/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/ratis/TestContainerCommandRequestMessage.java +++ b/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/ratis/TestContainerCommandRequestMessage.java @@ -31,6 +31,7 @@ import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.PutSmallFileRequestProto; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Type; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.WriteChunkRequestProto; +import org.apache.hadoop.ozone.ClientVersion; import org.apache.hadoop.ozone.common.Checksum; import org.apache.hadoop.ozone.common.ChecksumData; import org.apache.hadoop.ozone.common.OzoneChecksumException; @@ -91,6 +92,7 @@ static ContainerCommandRequestProto newPutSmallFile( .setContainerID(blockID.getContainerID()) .setDatanodeUuid(UUID.randomUUID().toString()) .setPutSmallFile(putSmallFileRequest) + .setVersion(ClientVersion.CURRENT.toProtoValue()) .build(); } @@ -113,6 +115,7 @@ static ContainerCommandRequestProto newWriteChunk( .setContainerID(blockID.getContainerID()) .setDatanodeUuid(UUID.randomUUID().toString()) .setWriteChunk(writeChunkRequest) + .setVersion(ClientVersion.CURRENT.toProtoValue()) .build(); } diff --git a/hadoop-hdds/common/src/test/java/org/apache/hadoop/ozone/container/ContainerTestHelper.java b/hadoop-hdds/common/src/test/java/org/apache/hadoop/ozone/container/ContainerTestHelper.java index 7ade596add7a..cf96b2145d7e 100644 --- a/hadoop-hdds/common/src/test/java/org/apache/hadoop/ozone/container/ContainerTestHelper.java +++ b/hadoop-hdds/common/src/test/java/org/apache/hadoop/ozone/container/ContainerTestHelper.java @@ -38,6 +38,7 @@ import org.apache.hadoop.hdds.scm.pipeline.MockPipeline; import org.apache.hadoop.hdds.scm.pipeline.Pipeline; import org.apache.hadoop.hdds.utils.UniqueId; +import org.apache.hadoop.ozone.ClientVersion; import org.apache.hadoop.ozone.common.Checksum; import org.apache.hadoop.ozone.common.ChunkBuffer; import org.apache.hadoop.ozone.common.OzoneChecksumException; @@ -540,6 +541,11 @@ public static byte[] generateData(int length, boolean random) { return data; } + public static ContainerCommandRequestProto getDummyCommandRequestProto( + ContainerProtos.Type cmdType) { + return getDummyCommandRequestProto(ClientVersion.CURRENT, cmdType, 0); + } + /** * Construct fake protobuf messages for various types of requests. * This is tedious, however necessary to test. Protobuf classes are final @@ -549,16 +555,17 @@ public static byte[] generateData(int length, boolean random) { * @return */ public static ContainerCommandRequestProto getDummyCommandRequestProto( - ContainerProtos.Type cmdType) { + ClientVersion clientVersion, ContainerProtos.Type cmdType, int replicaIndex) { final Builder builder = ContainerCommandRequestProto.newBuilder() + .setVersion(clientVersion.toProtoValue()) .setCmdType(cmdType) .setContainerID(DUMMY_CONTAINER_ID) .setDatanodeUuid(DATANODE_UUID); final DatanodeBlockID fakeBlockId = DatanodeBlockID.newBuilder() - .setContainerID(DUMMY_CONTAINER_ID).setLocalID(1) + .setContainerID(DUMMY_CONTAINER_ID).setLocalID(1).setReplicaIndex(replicaIndex) .setBlockCommitSequenceId(101).build(); final ContainerProtos.ChunkInfo fakeChunkInfo = diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/impl/TestKeyValueStreamDataChannel.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/impl/TestKeyValueStreamDataChannel.java index c97040d1b376..63045f76136b 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/impl/TestKeyValueStreamDataChannel.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/impl/TestKeyValueStreamDataChannel.java @@ -24,6 +24,7 @@ import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.PutBlockRequestProto; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Type; import org.apache.hadoop.hdds.ratis.ContainerCommandRequestMessage; +import org.apache.hadoop.ozone.ClientVersion; import org.apache.hadoop.ozone.container.keyvalue.impl.KeyValueStreamDataChannel.Buffers; import org.apache.hadoop.ozone.container.keyvalue.impl.KeyValueStreamDataChannel.WriteMethod; import org.apache.ratis.client.api.DataStreamOutput; @@ -70,7 +71,7 @@ public class TestKeyValueStreamDataChannel { public static final Logger LOG = LoggerFactory.getLogger(TestKeyValueStreamDataChannel.class); - static final ContainerCommandRequestProto PUT_BLOCK_PROTO + private static final ContainerCommandRequestProto PUT_BLOCK_PROTO = ContainerCommandRequestProto.newBuilder() .setCmdType(Type.PutBlock) .setPutBlock(PutBlockRequestProto.newBuilder().setBlockData( @@ -78,6 +79,7 @@ public class TestKeyValueStreamDataChannel { .setContainerID(222).setLocalID(333).build()).build())) .setDatanodeUuid("datanodeId") .setContainerID(111L) + .setVersion(ClientVersion.CURRENT.toProtoValue()) .build(); static final int PUT_BLOCK_PROTO_SIZE = PUT_BLOCK_PROTO.toByteString().size(); static {