diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsUtils.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsUtils.java index ee1c9669a1b1..e1188f1cd1e6 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsUtils.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsUtils.java @@ -424,6 +424,7 @@ public static boolean isReadOnly( case ListContainer: case ListChunk: case GetCommittedBlockLength: + case Echo: return true; case CloseContainer: case WriteChunk: diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/ContainerCommandResponseBuilders.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/ContainerCommandResponseBuilders.java index 9acb0e5c33a7..86336e9bc7b6 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/ContainerCommandResponseBuilders.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/ContainerCommandResponseBuilders.java @@ -21,6 +21,8 @@ import java.nio.ByteBuffer; import java.util.List; import java.util.function.Function; + +import org.apache.commons.lang3.RandomUtils; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.BlockData; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChunkInfo; @@ -42,6 +44,7 @@ import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Type; import org.apache.hadoop.ozone.common.ChunkBuffer; import org.apache.ratis.thirdparty.com.google.protobuf.ByteString; +import org.apache.ratis.thirdparty.com.google.protobuf.UnsafeByteOperations; import static org.apache.hadoop.hdds.scm.utils.ClientCommandsUtils.getReadChunkVersion; @@ -319,6 +322,31 @@ public static ContainerCommandResponseProto getFinalizeBlockResponse( .build(); } + public static ContainerCommandResponseProto getEchoResponse( + ContainerCommandRequestProto msg) { + + ContainerProtos.EchoRequestProto echoRequest = msg.getEcho(); + int responsePayload = echoRequest.getPayloadSizeResp(); + + int sleepTimeMs = echoRequest.getSleepTimeMs(); + try { + if (sleepTimeMs > 0) { + Thread.sleep(sleepTimeMs); + } + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + + ContainerProtos.EchoResponseProto.Builder echo = + ContainerProtos.EchoResponseProto + .newBuilder() + .setPayload(UnsafeByteOperations.unsafeWrap(RandomUtils.nextBytes(responsePayload))); + + return getSuccessResponseBuilder(msg) + .setEcho(echo) + .build(); + } + private ContainerCommandResponseBuilders() { throw new UnsupportedOperationException("no instances"); } diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java index 58bb326eb0cd..1453ae56b4f5 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java @@ -57,6 +57,8 @@ import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Type; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.WriteChunkRequestProto; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.FinalizeBlockRequestProto; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.EchoRequestProto; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.EchoResponseProto; import org.apache.hadoop.hdds.scm.XceiverClientReply; import org.apache.hadoop.hdds.scm.XceiverClientSpi; import org.apache.hadoop.hdds.scm.XceiverClientSpi.Validator; @@ -65,6 +67,7 @@ import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException; import org.apache.hadoop.hdds.scm.pipeline.Pipeline; import org.apache.hadoop.hdds.security.token.OzoneBlockTokenIdentifier; +import org.apache.hadoop.hdds.tracing.TracingUtil; import org.apache.hadoop.ozone.common.Checksum; import org.apache.hadoop.ozone.common.ChecksumData; import org.apache.hadoop.security.token.Token; @@ -661,6 +664,41 @@ public static GetSmallFileResponseProto readSmallFile(XceiverClientSpi client, return response.getGetSmallFile(); } + /** + * Send an echo to DataNode. + * + * @return EchoResponseProto + */ + public static EchoResponseProto echo(XceiverClientSpi client, String encodedContainerID, + long containerID, ByteString payloadReqBytes, int payloadRespSizeKB, int sleepTimeMs) throws IOException { + ContainerProtos.EchoRequestProto getEcho = + EchoRequestProto + .newBuilder() + .setPayload(payloadReqBytes) + .setPayloadSizeResp(payloadRespSizeKB) + .setSleepTimeMs(sleepTimeMs) + .build(); + String id = client.getPipeline().getClosestNode().getUuidString(); + + ContainerCommandRequestProto.Builder builder = ContainerCommandRequestProto + .newBuilder() + .setCmdType(Type.Echo) + .setContainerID(containerID) + .setDatanodeUuid(id) + .setEcho(getEcho); + if (!encodedContainerID.isEmpty()) { + builder.setEncodedToken(encodedContainerID); + } + String traceId = TracingUtil.exportCurrentSpan(); + if (traceId != null) { + builder.setTraceID(traceId); + } + ContainerCommandRequestProto request = builder.build(); + ContainerCommandResponseProto response = + client.sendCommand(request, getValidatorList()); + return response.getEcho(); + } + /** * Validates a response from a container protocol call. Any non-successful * return code is mapped to a corresponding exception and thrown. diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/audit/DNAction.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/audit/DNAction.java index d271e7d5d48f..f7a38e3dec8b 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/audit/DNAction.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/audit/DNAction.java @@ -40,7 +40,8 @@ public enum DNAction implements AuditAction { CLOSE_CONTAINER, GET_COMMITTED_BLOCK_LENGTH, STREAM_INIT, - FINALIZE_BLOCK; + FINALIZE_BLOCK, + ECHO; @Override public String getAction() { diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/HddsDispatcher.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/HddsDispatcher.java index f20615d23f8c..8e68eeac53d4 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/HddsDispatcher.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/HddsDispatcher.java @@ -807,6 +807,7 @@ private static DNAction getAuditAction(Type cmdType) { case GetCommittedBlockLength : return DNAction.GET_COMMITTED_BLOCK_LENGTH; case StreamInit : return DNAction.STREAM_INIT; case FinalizeBlock : return DNAction.FINALIZE_BLOCK; + case Echo : return DNAction.ECHO; default : LOG.debug("Invalid command type - {}", cmdType); return null; diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java index 3a945c221241..3d9214c91795 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java @@ -103,6 +103,7 @@ import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.PUT_SMALL_FILE_ERROR; import static org.apache.hadoop.hdds.scm.protocolPB.ContainerCommandResponseBuilders.getBlockDataResponse; import static org.apache.hadoop.hdds.scm.protocolPB.ContainerCommandResponseBuilders.getBlockLengthResponse; +import static org.apache.hadoop.hdds.scm.protocolPB.ContainerCommandResponseBuilders.getEchoResponse; import static org.apache.hadoop.hdds.scm.protocolPB.ContainerCommandResponseBuilders.getFinalizeBlockResponse; import static org.apache.hadoop.hdds.scm.protocolPB.ContainerCommandResponseBuilders.getGetSmallFileResponseSuccess; import static org.apache.hadoop.hdds.scm.protocolPB.ContainerCommandResponseBuilders.getListBlockResponse; @@ -279,6 +280,8 @@ static ContainerCommandResponseProto dispatchRequest(KeyValueHandler handler, return handler.handleGetCommittedBlockLength(request, kvContainer); case FinalizeBlock: return handler.handleFinalizeBlock(request, kvContainer); + case Echo: + return handler.handleEcho(request, kvContainer); default: return null; } @@ -611,6 +614,11 @@ ContainerCommandResponseProto handleFinalizeBlock( return getFinalizeBlockResponse(request, responseData); } + ContainerCommandResponseProto handleEcho( + ContainerCommandRequestProto request, KeyValueContainer kvContainer) { + return getEchoResponse(request); + } + /** * Handle Get Block operation. Calls BlockManager to process the request. */ diff --git a/hadoop-hdds/interface-client/src/main/proto/DatanodeClientProtocol.proto b/hadoop-hdds/interface-client/src/main/proto/DatanodeClientProtocol.proto index 0206a8ea71d4..ccde261de024 100644 --- a/hadoop-hdds/interface-client/src/main/proto/DatanodeClientProtocol.proto +++ b/hadoop-hdds/interface-client/src/main/proto/DatanodeClientProtocol.proto @@ -107,6 +107,7 @@ enum Type { StreamWrite = 20; FinalizeBlock = 21; + Echo = 22; } @@ -215,6 +216,7 @@ message ContainerCommandRequestProto { optional uint32 version = 24; optional FinalizeBlockRequestProto finalizeBlock = 25; + optional EchoRequestProto echo = 26; } message ContainerCommandResponseProto { @@ -247,6 +249,7 @@ message ContainerCommandResponseProto { optional GetCommittedBlockLengthResponseProto getCommittedBlockLength = 21; optional FinalizeBlockResponseProto finalizeBlock = 22; + optional EchoResponseProto echo = 23; } message ContainerDataProto { @@ -390,6 +393,16 @@ message ListBlockResponseProto { repeated BlockData blockData = 1; } +message EchoRequestProto { + optional bytes payload = 1; + optional int32 payloadSizeResp = 2; + optional int32 sleepTimeMs = 3; +} + +message EchoResponseProto { + optional bytes payload = 1; +} + // Chunk Operations message ChunkInfo { diff --git a/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/ContainerOperationClient.java b/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/ContainerOperationClient.java index 499d58b1ff2a..7898ed76b1cd 100644 --- a/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/ContainerOperationClient.java +++ b/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/ContainerOperationClient.java @@ -184,7 +184,7 @@ public void createContainer(XceiverClientSpi client, } } - private String getEncodedContainerToken(long containerId) throws IOException { + public String getEncodedContainerToken(long containerId) throws IOException { if (!containerTokenEnabled) { return ""; } diff --git a/hadoop-ozone/dist/src/main/smoketest/freon/echoRPCLoad.robot b/hadoop-ozone/dist/src/main/smoketest/freon/echoRPCLoad.robot index 32456af48869..c6ea4e63468e 100644 --- a/hadoop-ozone/dist/src/main/smoketest/freon/echoRPCLoad.robot +++ b/hadoop-ozone/dist/src/main/smoketest/freon/echoRPCLoad.robot @@ -23,6 +23,27 @@ ${PREFIX} ${EMPTY} ${n} 1 *** Test Cases *** +Get Container ID + ${result} = Execute ozone admin container create + ${containerID} = Execute ozone admin container list --count 1 --state=OPEN | grep -o '"containerID" *: *[^,}]*' | awk -F'[:,]' '{print $2}' | tr -d '" ' + Set Suite Variable ${containerID} + +[Read] Ozone DataNode Echo RPC Load Generator with request payload and response payload + ${result} = Execute ozone freon dne -t=1 -n=${n} --payload-req=1 --payload-resp=1 --container-id=${containerID} + Should contain ${result} Successful executions: ${n} + +[Read] Ozone DataNode Echo RPC Load Generator with request payload and empty response payload + ${result} = Execute ozone freon dne -t=1 -n=${n} --payload-req=1 --container-id=${containerID} + Should contain ${result} Successful executions: ${n} + +[Read] Ozone DataNode Echo RPC Load Generator with empty request payload and response payload + ${result} = Execute ozone freon dne -t=1 -n=${n} --payload-resp=1 --container-id=${containerID} + Should contain ${result} Successful executions: ${n} + +[Read] Ozone DataNode Echo RPC Load Generator with empty request payload and empty response payload no sleep time one xceiver client + ${result} = Execute ozone freon dne -t=1 -n=${n} --sleep-time-ms=0 --clients=1 --container-id=${containerID} + Should contain ${result} Successful executions: ${n} + [Read] Ozone Echo RPC Load Generator with request payload and response payload ${result} = Execute ozone freon ome -t=1 -n=${n} --payload-req=1 --payload-resp=1 Should contain ${result} Successful executions: ${n} diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/TestContainerSmallFile.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/TestContainerSmallFile.java index 30c4e4cd5b4d..5dab271d9edb 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/TestContainerSmallFile.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/TestContainerSmallFile.java @@ -31,6 +31,8 @@ import org.apache.hadoop.ozone.OzoneConsts; import org.apache.hadoop.ozone.container.ContainerTestHelper; import org.apache.hadoop.ozone.container.common.SCMTestUtils; +import org.apache.ratis.thirdparty.com.google.protobuf.ByteString; +import org.apache.ratis.thirdparty.com.google.protobuf.UnsafeByteOperations; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; @@ -188,6 +190,23 @@ public void testReadWriteWithBCSId() throws Exception { assertEquals("data123", readData); xceiverClientManager.releaseClient(client, false); } + + @Test + public void testEcho() throws Exception { + ContainerWithPipeline container = + storageContainerLocationClient.allocateContainer( + SCMTestUtils.getReplicationType(ozoneConfig), + HddsProtos.ReplicationFactor.ONE, OzoneConsts.OZONE); + XceiverClientSpi client = xceiverClientManager + .acquireClient(container.getPipeline()); + ContainerProtocolCalls.createContainer(client, + container.getContainerInfo().getContainerID(), null); + ByteString byteString = UnsafeByteOperations.unsafeWrap(new byte[0]); + ContainerProtos.EchoResponseProto response = + ContainerProtocolCalls.echo(client, "", container.getContainerInfo().getContainerID(), byteString, 1, 0); + assertEquals(1, response.getPayload().size()); + xceiverClientManager.releaseClient(client, false); + } } diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/freon/TestDNRPCLoadGenerator.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/freon/TestDNRPCLoadGenerator.java new file mode 100644 index 000000000000..d049a7e320cf --- /dev/null +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/freon/TestDNRPCLoadGenerator.java @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.freon; + +import org.apache.hadoop.hdds.conf.DatanodeRatisServerConfig; +import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos; +import org.apache.hadoop.hdds.ratis.conf.RatisClientConfig; +import org.apache.hadoop.hdds.scm.XceiverClientManager; +import org.apache.hadoop.hdds.scm.XceiverClientSpi; +import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline; +import org.apache.hadoop.hdds.scm.protocolPB.StorageContainerLocationProtocolClientSideTranslatorPB; +import org.apache.hadoop.hdds.scm.storage.ContainerProtocolCalls; +import org.apache.hadoop.ozone.MiniOzoneCluster; +import org.apache.hadoop.ozone.OzoneConsts; +import org.apache.hadoop.ozone.container.common.SCMTestUtils; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import picocli.CommandLine; + +import java.time.Duration; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +/** + * Tests Freon, with MiniOzoneCluster and validate data. + */ +public class TestDNRPCLoadGenerator { + + private static MiniOzoneCluster cluster = null; + private static ContainerWithPipeline container; + + private static void startCluster(OzoneConfiguration conf) throws Exception { + DatanodeRatisServerConfig ratisServerConfig = + conf.getObject(DatanodeRatisServerConfig.class); + ratisServerConfig.setRequestTimeOut(Duration.ofSeconds(3)); + ratisServerConfig.setWatchTimeOut(Duration.ofSeconds(10)); + conf.setFromObject(ratisServerConfig); + + RatisClientConfig.RaftConfig raftClientConfig = + conf.getObject(RatisClientConfig.RaftConfig.class); + raftClientConfig.setRpcRequestTimeout(Duration.ofSeconds(3)); + raftClientConfig.setRpcWatchRequestTimeout(Duration.ofSeconds(10)); + conf.setFromObject(raftClientConfig); + + cluster = MiniOzoneCluster.newBuilder(conf) + .setNumDatanodes(5).build(); + cluster.waitForClusterToBeReady(); + cluster.waitForPipelineTobeReady(HddsProtos.ReplicationFactor.THREE, + 180000); + + StorageContainerLocationProtocolClientSideTranslatorPB + storageContainerLocationClient = cluster + .getStorageContainerLocationClient(); + container = + storageContainerLocationClient.allocateContainer( + SCMTestUtils.getReplicationType(conf), + HddsProtos.ReplicationFactor.ONE, OzoneConsts.OZONE); + XceiverClientManager xceiverClientManager = new XceiverClientManager(conf); + XceiverClientSpi client = xceiverClientManager + .acquireClient(container.getPipeline()); + ContainerProtocolCalls.createContainer(client, + container.getContainerInfo().getContainerID(), null); + } + + static void shutdownCluster() { + if (cluster != null) { + cluster.shutdown(); + } + } + + @BeforeAll + public static void init() throws Exception { + OzoneConfiguration conf = new OzoneConfiguration(); + startCluster(conf); + } + + @AfterAll + public static void shutdown() { + shutdownCluster(); + } + + @Test + public void test() { + DNRPCLoadGenerator randomKeyGenerator = + new DNRPCLoadGenerator(cluster.getConf()); + CommandLine cmd = new CommandLine(randomKeyGenerator); + int exitCode = cmd.execute( + "--container-id", Long.toString(container.getContainerInfo().getContainerID()), + "--clients", "5", + "-t", "10"); + assertEquals(0, exitCode); + } +} diff --git a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/DNRPCLoadGenerator.java b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/DNRPCLoadGenerator.java new file mode 100644 index 000000000000..1d1b898a7d95 --- /dev/null +++ b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/DNRPCLoadGenerator.java @@ -0,0 +1,178 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.ozone.freon; + +import com.codahale.metrics.Timer; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import org.apache.hadoop.hdds.scm.client.ClientTrustManager; +import org.apache.hadoop.hdds.security.x509.certificate.client.CACertificateProvider; +import org.apache.hadoop.hdds.utils.HAUtils; +import org.apache.hadoop.ozone.OzoneSecurityUtil; +import org.apache.ratis.thirdparty.com.google.protobuf.ByteString; +import org.apache.hadoop.hdds.cli.HddsVersionProvider; +import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; +import org.apache.hadoop.hdds.scm.XceiverClientFactory; +import org.apache.hadoop.hdds.scm.XceiverClientManager; +import org.apache.hadoop.hdds.scm.XceiverClientSpi; +import org.apache.hadoop.hdds.scm.cli.ContainerOperationClient; +import org.apache.hadoop.hdds.scm.container.ContainerInfo; +import org.apache.hadoop.hdds.scm.pipeline.Pipeline; +import org.apache.hadoop.hdds.scm.storage.ContainerProtocolCalls; +import org.apache.ratis.thirdparty.com.google.protobuf.UnsafeByteOperations; +import picocli.CommandLine; +import picocli.CommandLine.Command; +import picocli.CommandLine.Option; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.Callable; + +import static org.apache.hadoop.ozone.common.PayloadUtils.generatePayloadBytes; + +/** + * Utility to generate RPC request to DN. + */ +@Command(name = "dn-echo", + aliases = "dne", + description = + "Generate echo RPC request to DataNode", + versionProvider = HddsVersionProvider.class, + mixinStandardHelpOptions = true, + showDefaultValues = true) +public class DNRPCLoadGenerator extends BaseFreonGenerator + implements Callable { + + private static final int RPC_PAYLOAD_MULTIPLICATION_FACTOR = 1024; + private static final int MAX_SIZE_KB = 2097151; + private Timer timer; + private OzoneConfiguration configuration; + private ByteString payloadReqBytes; + private int payloadRespSize; + private List clients; + private String encodedContainerToken; + @Option(names = {"--payload-req"}, + description = + "Specifies the size of payload in KB in RPC request. ", + defaultValue = "0") + private int payloadReqSizeKB = 0; + + @Option(names = {"--payload-resp"}, + description = + "Specifies the size of payload in KB in RPC response. ", + defaultValue = "0") + private int payloadRespSizeKB = 0; + + @Option(names = {"--container-id"}, + description = "Send echo to DataNodes associated with this container") + private long containerID; + + @Option(names = {"--sleep-time-ms"}, + description = "Let DataNode to pause for a duration (in milliseconds) for each request", + defaultValue = "0") + private int sleepTimeMs = 0; + + @Option(names = {"--clients"}, + description = "number of xceiver clients", + defaultValue = "1") + private int numClients = 1; + + @CommandLine.ParentCommand + private Freon freon; + + // empy constructor for picocli + DNRPCLoadGenerator() { + } + + @VisibleForTesting + DNRPCLoadGenerator(OzoneConfiguration ozoneConfiguration) { + this.configuration = ozoneConfiguration; + } + + @Override + public Void call() throws Exception { + Preconditions.checkArgument(payloadReqSizeKB >= 0, + "OM echo request payload size should be positive value or zero."); + Preconditions.checkArgument(payloadRespSizeKB >= 0, + "OM echo response payload size should be positive value or zero."); + + if (configuration == null) { + configuration = freon.createOzoneConfiguration(); + } + ContainerOperationClient scmClient = new ContainerOperationClient(configuration); + ContainerInfo containerInfo = scmClient.getContainer(containerID); + + List pipelineList = scmClient.listPipelines(); + Pipeline pipeline = pipelineList.stream() + .filter(p -> p.getId().equals(containerInfo.getPipelineID())) + .findFirst() + .orElse(null); + encodedContainerToken = scmClient.getEncodedContainerToken(containerID); + XceiverClientFactory xceiverClientManager; + if (OzoneSecurityUtil.isSecurityEnabled(configuration)) { + CACertificateProvider caCerts = () -> HAUtils.buildCAX509List(null, configuration); + xceiverClientManager = new XceiverClientManager(configuration, + configuration.getObject(XceiverClientManager.ScmClientConfig.class), + new ClientTrustManager(caCerts, null)); + } else { + xceiverClientManager = new XceiverClientManager(configuration); + } + clients = new ArrayList<>(numClients); + for (int i = 0; i < numClients; i++) { + clients.add(xceiverClientManager.acquireClient(pipeline)); + } + + init(); + payloadReqBytes = UnsafeByteOperations.unsafeWrap(generatePayloadBytes(payloadReqSizeKB)); + payloadRespSize = calculateMaxPayloadSize(payloadRespSizeKB); + timer = getMetrics().timer("rpc-payload"); + try { + runTests(this::sendRPCReq); + } finally { + for (XceiverClientSpi client : clients) { + xceiverClientManager.releaseClient(client, false); + } + xceiverClientManager.close(); + scmClient.close(); + } + return null; + } + + private int calculateMaxPayloadSize(int payloadSizeKB) { + if (payloadSizeKB > 0) { + return Math.min( + Math.toIntExact((long)payloadSizeKB * + RPC_PAYLOAD_MULTIPLICATION_FACTOR), + MAX_SIZE_KB); + } + return 0; + } + + private void sendRPCReq(long l) throws Exception { + timer.time(() -> { + int clientIndex = (numClients == 1) ? 0 : (int)l % numClients; + ContainerProtos.EchoResponseProto response = + ContainerProtocolCalls.echo(clients.get(clientIndex), encodedContainerToken, + containerID, payloadReqBytes, payloadRespSize, sleepTimeMs); + return null; + }); + } +} + + diff --git a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/Freon.java b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/Freon.java index bd5510695fa1..349887a776d3 100644 --- a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/Freon.java +++ b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/Freon.java @@ -73,7 +73,8 @@ OzoneClientKeyReadWriteListOps.class, RangeKeysGenerator.class, DatanodeSimulator.class, - OmMetadataGenerator.class + OmMetadataGenerator.class, + DNRPCLoadGenerator.class }, versionProvider = HddsVersionProvider.class, mixinStandardHelpOptions = true) diff --git a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/OmRPCLoadGenerator.java b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/OmRPCLoadGenerator.java index 958df4c11a14..90807a0e6fe2 100644 --- a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/OmRPCLoadGenerator.java +++ b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/OmRPCLoadGenerator.java @@ -19,7 +19,6 @@ import com.codahale.metrics.Timer; import com.google.common.base.Preconditions; -import org.apache.commons.lang3.RandomUtils; import org.apache.hadoop.hdds.cli.HddsVersionProvider; import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.ozone.om.protocolPB.OzoneManagerProtocolClientSideTranslatorPB; @@ -27,6 +26,8 @@ import picocli.CommandLine.Command; import picocli.CommandLine.Option; +import static org.apache.hadoop.ozone.common.PayloadUtils.generatePayloadBytes; + /** * Utility to generate RPC request to OM with or without payload. */ @@ -88,8 +89,7 @@ public Void call() throws Exception { } init(); - payloadReqBytes = RandomUtils.nextBytes( - calculateMaxPayloadSize(payloadReqSizeKB)); + payloadReqBytes = generatePayloadBytes(payloadReqSizeKB); payloadRespSize = calculateMaxPayloadSize(payloadRespSizeKB); timer = getMetrics().timer("rpc-payload"); try {