Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,19 @@
*/
package org.apache.hadoop.ozone.freon;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Set;
import java.util.List;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Arrays;
import java.util.concurrent.Callable;
import java.util.stream.Collectors;

import org.apache.hadoop.hdds.cli.HddsVersionProvider;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChecksumData;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChecksumType;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChunkInfo;
Expand Down Expand Up @@ -75,70 +82,119 @@ public class DatanodeChunkGenerator extends BaseFreonGenerator implements
description = "Pipeline to use. By default the first RATIS/THREE "
+ "pipeline will be used.",
defaultValue = "")
private String pipelineId;
private String pipelineIds;

private XceiverClientSpi xceiverClientSpi;
@Option(names = {"-d", "--datanodes"},
description = "Datanodes to use." +
" Test will write to all the existing pipelines " +
"which this datanode is member of.",
defaultValue = "")
private String datanodes;

private XceiverClientManager xceiverClientManager;
private List<XceiverClientSpi> xceiverClients;

private Timer timer;

private ByteString dataToWrite;
private ChecksumData checksumProtobuf;


@Override
public Void call() throws Exception {

init();

OzoneConfiguration ozoneConf = createOzoneConfiguration();
xceiverClientManager =
new XceiverClientManager(ozoneConf);
if (OzoneSecurityUtil.isSecurityEnabled(ozoneConf)) {
throw new IllegalArgumentException(
"Datanode chunk generator is not supported in secure environment");
}

try (StorageContainerLocationProtocol scmLocationClient =
createStorageContainerLocationClient(ozoneConf)) {
List<Pipeline> pipelines = scmLocationClient.listPipelines();
Pipeline pipeline;
if (pipelineId != null && pipelineId.length() > 0) {
pipeline = pipelines.stream()
.filter(p -> p.getId().toString().equals(pipelineId))
.findFirst()
.orElseThrow(() -> new IllegalArgumentException(
"Pipeline ID is defined, but there is no such pipeline: "
+ pipelineId));
List<String> pipelinesFromCmd = Arrays.asList(pipelineIds.split(","));

List<String> datanodeHosts = Arrays.asList(this.datanodes.split(","));

Set<Pipeline> pipelines;

try (StorageContainerLocationProtocol scmLocationClient =
createStorageContainerLocationClient(ozoneConf)) {
List<Pipeline> pipelinesFromSCM = scmLocationClient.listPipelines();
Pipeline firstPipeline;
init();
if (!arePipelinesOrDatanodesProvided()) {
//default behaviour if no arguments provided
firstPipeline = pipelinesFromSCM.stream()
.filter(p -> p.getFactor() == ReplicationFactor.THREE)
.findFirst()
.orElseThrow(() -> new IllegalArgumentException(
"Pipeline ID is NOT defined, and no pipeline " +
"has been found with factor=THREE"));
XceiverClientSpi xceiverClientSpi = xceiverClientManager
.acquireClient(firstPipeline);
xceiverClients = new ArrayList<>();
xceiverClients.add(xceiverClientSpi);
} else {
pipeline = pipelines.stream()
.filter(p -> p.getFactor() == ReplicationFactor.THREE)
.findFirst()
.orElseThrow(() -> new IllegalArgumentException(
"Pipeline ID is NOT defined, and no pipeline " +
"has been found with factor=THREE"));
LOG.info("Using pipeline {}", pipeline.getId());
xceiverClients = new ArrayList<>();
pipelines = new HashSet<>();
for(String pipelineId:pipelinesFromCmd){
List<Pipeline> selectedPipelines = pipelinesFromSCM.stream()
.filter((p -> p.getId().toString()
.equals("PipelineID=" + pipelineId)
|| pipelineContainsDatanode(p, datanodeHosts)))
.collect(Collectors.toList());
pipelines.addAll(selectedPipelines);
}
for (Pipeline p:pipelines){
LOG.info("Writing to pipeline: " + p.getId());
xceiverClients.add(xceiverClientManager.acquireClient(p));
}
if (pipelines.isEmpty()){
throw new IllegalArgumentException(
"Couldn't find the any/the selected pipeline");
}
}
runTest();
} finally {
for (XceiverClientSpi xceiverClientSpi : xceiverClients) {
if (xceiverClientSpi != null) {
xceiverClientSpi.close();
}
}
}
return null;
}

private boolean pipelineContainsDatanode(Pipeline p,
List<String> datanodeHosts) {
for (DatanodeDetails dn:p.getNodes()){
if (datanodeHosts.contains(dn.getHostName())){
return true;
}
}
return false;
}

try (XceiverClientManager xceiverClientManager =
new XceiverClientManager(ozoneConf)) {
xceiverClientSpi = xceiverClientManager.acquireClient(pipeline);
private boolean arePipelinesOrDatanodesProvided() {
return !(pipelineIds.equals("") && datanodes.equals(""));
}

timer = getMetrics().timer("chunk-write");

byte[] data = RandomStringUtils.randomAscii(chunkSize)
.getBytes(StandardCharsets.UTF_8);
private void runTest()
throws IOException {

dataToWrite = ByteString.copyFrom(data);
timer = getMetrics().timer("chunk-write");

Checksum checksum = new Checksum(ChecksumType.CRC32, chunkSize);
checksumProtobuf = checksum.computeChecksum(data).getProtoBufMessage();
byte[] data = RandomStringUtils.randomAscii(chunkSize)
.getBytes(StandardCharsets.UTF_8);

runTests(this::writeChunk);
}
} finally {
if (xceiverClientSpi != null) {
xceiverClientSpi.close();
}
}
return null;
dataToWrite = ByteString.copyFrom(data);

Checksum checksum = new Checksum(ChecksumType.CRC32, chunkSize);
checksumProtobuf = checksum.computeChecksum(data).getProtoBufMessage();

runTests(this::writeChunk);
}

private void writeChunk(long stepNo)
Expand All @@ -165,7 +221,19 @@ private void writeChunk(long stepNo)
.setChunkData(chunkInfo)
.setData(dataToWrite);

String id = xceiverClientSpi.getPipeline().getFirstNode().getUuidString();
XceiverClientSpi clientSpi = xceiverClients.get(
(int) (stepNo%(xceiverClients.size())));
sendWriteChunkRequest(blockId, writeChunkRequest,
clientSpi);

}

private void sendWriteChunkRequest(DatanodeBlockID blockId,
WriteChunkRequestProto.Builder writeChunkRequest,
XceiverClientSpi xceiverClientSpi) throws Exception {
DatanodeDetails datanodeDetails = xceiverClientSpi.
getPipeline().getFirstNode();
String id = datanodeDetails.getUuidString();

ContainerCommandRequestProto.Builder builder =
ContainerCommandRequestProto
Expand All @@ -188,7 +256,6 @@ private void writeChunk(long stepNo)
}
return null;
});

}

}