Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -424,6 +424,7 @@ public static boolean isReadOnly(
case ListContainer:
case ListChunk:
case GetCommittedBlockLength:
case Echo:
return true;
case CloseContainer:
case WriteChunk:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import java.nio.ByteBuffer;
import java.util.List;
import java.util.function.Function;

import org.apache.commons.lang3.RandomUtils;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.BlockData;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChunkInfo;
Expand All @@ -42,6 +44,7 @@
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Type;
import org.apache.hadoop.ozone.common.ChunkBuffer;
import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
import org.apache.ratis.thirdparty.com.google.protobuf.UnsafeByteOperations;

import static org.apache.hadoop.hdds.scm.utils.ClientCommandsUtils.getReadChunkVersion;

Expand Down Expand Up @@ -319,6 +322,31 @@ public static ContainerCommandResponseProto getFinalizeBlockResponse(
.build();
}

public static ContainerCommandResponseProto getEchoResponse(
ContainerCommandRequestProto msg) {

ContainerProtos.EchoRequestProto echoRequest = msg.getEcho();
int responsePayload = echoRequest.getPayloadSizeResp();

int sleepTimeMs = echoRequest.getSleepTimeMs();
try {
if (sleepTimeMs > 0) {
Thread.sleep(sleepTimeMs);
}
} catch (InterruptedException e) {
throw new RuntimeException(e);
}

ContainerProtos.EchoResponseProto.Builder echo =
ContainerProtos.EchoResponseProto
.newBuilder()
.setPayload(UnsafeByteOperations.unsafeWrap(RandomUtils.nextBytes(responsePayload)));

return getSuccessResponseBuilder(msg)
.setEcho(echo)
.build();
}

private ContainerCommandResponseBuilders() {
throw new UnsupportedOperationException("no instances");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Type;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.WriteChunkRequestProto;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.FinalizeBlockRequestProto;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.EchoRequestProto;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.EchoResponseProto;
import org.apache.hadoop.hdds.scm.XceiverClientReply;
import org.apache.hadoop.hdds.scm.XceiverClientSpi;
import org.apache.hadoop.hdds.scm.XceiverClientSpi.Validator;
Expand All @@ -65,6 +67,7 @@
import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.security.token.OzoneBlockTokenIdentifier;
import org.apache.hadoop.hdds.tracing.TracingUtil;
import org.apache.hadoop.ozone.common.Checksum;
import org.apache.hadoop.ozone.common.ChecksumData;
import org.apache.hadoop.security.token.Token;
Expand Down Expand Up @@ -661,6 +664,41 @@ public static GetSmallFileResponseProto readSmallFile(XceiverClientSpi client,
return response.getGetSmallFile();
}

/**
* Send an echo to DataNode.
*
* @return EchoResponseProto
*/
public static EchoResponseProto echo(XceiverClientSpi client, String encodedContainerID,
long containerID, ByteString payloadReqBytes, int payloadRespSizeKB, int sleepTimeMs) throws IOException {
ContainerProtos.EchoRequestProto getEcho =
EchoRequestProto
.newBuilder()
.setPayload(payloadReqBytes)
.setPayloadSizeResp(payloadRespSizeKB)
.setSleepTimeMs(sleepTimeMs)
.build();
String id = client.getPipeline().getClosestNode().getUuidString();

ContainerCommandRequestProto.Builder builder = ContainerCommandRequestProto
.newBuilder()
.setCmdType(Type.Echo)
.setContainerID(containerID)
.setDatanodeUuid(id)
.setEcho(getEcho);
if (!encodedContainerID.isEmpty()) {
builder.setEncodedToken(encodedContainerID);
}
String traceId = TracingUtil.exportCurrentSpan();
if (traceId != null) {
builder.setTraceID(traceId);
}
ContainerCommandRequestProto request = builder.build();
ContainerCommandResponseProto response =
client.sendCommand(request, getValidatorList());
return response.getEcho();
}

/**
* Validates a response from a container protocol call. Any non-successful
* return code is mapped to a corresponding exception and thrown.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,8 @@ public enum DNAction implements AuditAction {
CLOSE_CONTAINER,
GET_COMMITTED_BLOCK_LENGTH,
STREAM_INIT,
FINALIZE_BLOCK;
FINALIZE_BLOCK,
ECHO;

@Override
public String getAction() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -807,6 +807,7 @@ private static DNAction getAuditAction(Type cmdType) {
case GetCommittedBlockLength : return DNAction.GET_COMMITTED_BLOCK_LENGTH;
case StreamInit : return DNAction.STREAM_INIT;
case FinalizeBlock : return DNAction.FINALIZE_BLOCK;
case Echo : return DNAction.ECHO;
default :
LOG.debug("Invalid command type - {}", cmdType);
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@
import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.PUT_SMALL_FILE_ERROR;
import static org.apache.hadoop.hdds.scm.protocolPB.ContainerCommandResponseBuilders.getBlockDataResponse;
import static org.apache.hadoop.hdds.scm.protocolPB.ContainerCommandResponseBuilders.getBlockLengthResponse;
import static org.apache.hadoop.hdds.scm.protocolPB.ContainerCommandResponseBuilders.getEchoResponse;
import static org.apache.hadoop.hdds.scm.protocolPB.ContainerCommandResponseBuilders.getFinalizeBlockResponse;
import static org.apache.hadoop.hdds.scm.protocolPB.ContainerCommandResponseBuilders.getGetSmallFileResponseSuccess;
import static org.apache.hadoop.hdds.scm.protocolPB.ContainerCommandResponseBuilders.getListBlockResponse;
Expand Down Expand Up @@ -279,6 +280,8 @@ static ContainerCommandResponseProto dispatchRequest(KeyValueHandler handler,
return handler.handleGetCommittedBlockLength(request, kvContainer);
case FinalizeBlock:
return handler.handleFinalizeBlock(request, kvContainer);
case Echo:
return handler.handleEcho(request, kvContainer);
default:
return null;
}
Expand Down Expand Up @@ -611,6 +614,11 @@ ContainerCommandResponseProto handleFinalizeBlock(
return getFinalizeBlockResponse(request, responseData);
}

ContainerCommandResponseProto handleEcho(
ContainerCommandRequestProto request, KeyValueContainer kvContainer) {
return getEchoResponse(request);
}

/**
* Handle Get Block operation. Calls BlockManager to process the request.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ enum Type {
StreamWrite = 20;

FinalizeBlock = 21;
Echo = 22;
}


Expand Down Expand Up @@ -215,6 +216,7 @@ message ContainerCommandRequestProto {
optional uint32 version = 24;

optional FinalizeBlockRequestProto finalizeBlock = 25;
optional EchoRequestProto echo = 26;
}

message ContainerCommandResponseProto {
Expand Down Expand Up @@ -247,6 +249,7 @@ message ContainerCommandResponseProto {
optional GetCommittedBlockLengthResponseProto getCommittedBlockLength = 21;

optional FinalizeBlockResponseProto finalizeBlock = 22;
optional EchoResponseProto echo = 23;
}

message ContainerDataProto {
Expand Down Expand Up @@ -390,6 +393,16 @@ message ListBlockResponseProto {
repeated BlockData blockData = 1;
}

message EchoRequestProto {
optional bytes payload = 1;
optional int32 payloadSizeResp = 2;
optional int32 sleepTimeMs = 3;
}

message EchoResponseProto {
optional bytes payload = 1;
}

// Chunk Operations

message ChunkInfo {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ public void createContainer(XceiverClientSpi client,
}
}

private String getEncodedContainerToken(long containerId) throws IOException {
public String getEncodedContainerToken(long containerId) throws IOException {
if (!containerTokenEnabled) {
return "";
}
Expand Down
21 changes: 21 additions & 0 deletions hadoop-ozone/dist/src/main/smoketest/freon/echoRPCLoad.robot
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,27 @@ ${PREFIX} ${EMPTY}
${n} 1

*** Test Cases ***
Get Container ID
${result} = Execute ozone admin container create
${containerID} = Execute ozone admin container list --count 1 --state=OPEN | grep -o '"containerID" *: *[^,}]*' | awk -F'[:,]' '{print $2}' | tr -d '" '
Set Suite Variable ${containerID}

[Read] Ozone DataNode Echo RPC Load Generator with request payload and response payload
${result} = Execute ozone freon dne -t=1 -n=${n} --payload-req=1 --payload-resp=1 --container-id=${containerID}
Should contain ${result} Successful executions: ${n}

[Read] Ozone DataNode Echo RPC Load Generator with request payload and empty response payload
${result} = Execute ozone freon dne -t=1 -n=${n} --payload-req=1 --container-id=${containerID}
Should contain ${result} Successful executions: ${n}

[Read] Ozone DataNode Echo RPC Load Generator with empty request payload and response payload
${result} = Execute ozone freon dne -t=1 -n=${n} --payload-resp=1 --container-id=${containerID}
Should contain ${result} Successful executions: ${n}

[Read] Ozone DataNode Echo RPC Load Generator with empty request payload and empty response payload no sleep time one xceiver client
${result} = Execute ozone freon dne -t=1 -n=${n} --sleep-time-ms=0 --clients=1 --container-id=${containerID}
Should contain ${result} Successful executions: ${n}

[Read] Ozone Echo RPC Load Generator with request payload and response payload
${result} = Execute ozone freon ome -t=1 -n=${n} --payload-req=1 --payload-resp=1
Should contain ${result} Successful executions: ${n}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@
import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.ozone.container.ContainerTestHelper;
import org.apache.hadoop.ozone.container.common.SCMTestUtils;
import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
import org.apache.ratis.thirdparty.com.google.protobuf.UnsafeByteOperations;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
Expand Down Expand Up @@ -188,6 +190,23 @@ public void testReadWriteWithBCSId() throws Exception {
assertEquals("data123", readData);
xceiverClientManager.releaseClient(client, false);
}

@Test
public void testEcho() throws Exception {
ContainerWithPipeline container =
storageContainerLocationClient.allocateContainer(
SCMTestUtils.getReplicationType(ozoneConfig),
HddsProtos.ReplicationFactor.ONE, OzoneConsts.OZONE);
XceiverClientSpi client = xceiverClientManager
.acquireClient(container.getPipeline());
ContainerProtocolCalls.createContainer(client,
container.getContainerInfo().getContainerID(), null);
ByteString byteString = UnsafeByteOperations.unsafeWrap(new byte[0]);
ContainerProtos.EchoResponseProto response =
ContainerProtocolCalls.echo(client, "", container.getContainerInfo().getContainerID(), byteString, 1, 0);
assertEquals(1, response.getPayload().size());
xceiverClientManager.releaseClient(client, false);
}
}


Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.ozone.freon;

import org.apache.hadoop.hdds.conf.DatanodeRatisServerConfig;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.ratis.conf.RatisClientConfig;
import org.apache.hadoop.hdds.scm.XceiverClientManager;
import org.apache.hadoop.hdds.scm.XceiverClientSpi;
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
import org.apache.hadoop.hdds.scm.protocolPB.StorageContainerLocationProtocolClientSideTranslatorPB;
import org.apache.hadoop.hdds.scm.storage.ContainerProtocolCalls;
import org.apache.hadoop.ozone.MiniOzoneCluster;
import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.ozone.container.common.SCMTestUtils;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import picocli.CommandLine;

import java.time.Duration;

import static org.junit.jupiter.api.Assertions.assertEquals;

/**
* Tests Freon, with MiniOzoneCluster and validate data.
*/
public class TestDNRPCLoadGenerator {

private static MiniOzoneCluster cluster = null;
private static ContainerWithPipeline container;

private static void startCluster(OzoneConfiguration conf) throws Exception {
DatanodeRatisServerConfig ratisServerConfig =
conf.getObject(DatanodeRatisServerConfig.class);
ratisServerConfig.setRequestTimeOut(Duration.ofSeconds(3));
ratisServerConfig.setWatchTimeOut(Duration.ofSeconds(10));
conf.setFromObject(ratisServerConfig);

RatisClientConfig.RaftConfig raftClientConfig =
conf.getObject(RatisClientConfig.RaftConfig.class);
raftClientConfig.setRpcRequestTimeout(Duration.ofSeconds(3));
raftClientConfig.setRpcWatchRequestTimeout(Duration.ofSeconds(10));
conf.setFromObject(raftClientConfig);

cluster = MiniOzoneCluster.newBuilder(conf)
.setNumDatanodes(5).build();
cluster.waitForClusterToBeReady();
cluster.waitForPipelineTobeReady(HddsProtos.ReplicationFactor.THREE,
180000);

StorageContainerLocationProtocolClientSideTranslatorPB
storageContainerLocationClient = cluster
.getStorageContainerLocationClient();
container =
storageContainerLocationClient.allocateContainer(
SCMTestUtils.getReplicationType(conf),
HddsProtos.ReplicationFactor.ONE, OzoneConsts.OZONE);
XceiverClientManager xceiverClientManager = new XceiverClientManager(conf);
XceiverClientSpi client = xceiverClientManager
.acquireClient(container.getPipeline());
ContainerProtocolCalls.createContainer(client,
container.getContainerInfo().getContainerID(), null);
}

static void shutdownCluster() {
if (cluster != null) {
cluster.shutdown();
}
}

@BeforeAll
public static void init() throws Exception {
OzoneConfiguration conf = new OzoneConfiguration();
startCluster(conf);
}

@AfterAll
public static void shutdown() {
shutdownCluster();
}

@Test
public void test() {
DNRPCLoadGenerator randomKeyGenerator =
new DNRPCLoadGenerator(cluster.getConf());
CommandLine cmd = new CommandLine(randomKeyGenerator);
int exitCode = cmd.execute(
"--container-id", Long.toString(container.getContainerInfo().getContainerID()),
"--clients", "5",
"-t", "10");
assertEquals(0, exitCode);
}
}
Loading