diff --git a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/DatanodeChunkGenerator.java b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/DatanodeChunkGenerator.java index a6e28324d38a..c0c58d03595f 100644 --- a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/DatanodeChunkGenerator.java +++ b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/DatanodeChunkGenerator.java @@ -16,12 +16,19 @@ */ package org.apache.hadoop.ozone.freon; +import java.io.IOException; import java.nio.charset.StandardCharsets; +import java.util.Set; import java.util.List; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.Arrays; import java.util.concurrent.Callable; +import java.util.stream.Collectors; import org.apache.hadoop.hdds.cli.HddsVersionProvider; import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChecksumData; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChecksumType; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChunkInfo; @@ -75,70 +82,119 @@ public class DatanodeChunkGenerator extends BaseFreonGenerator implements description = "Pipeline to use. By default the first RATIS/THREE " + "pipeline will be used.", defaultValue = "") - private String pipelineId; + private String pipelineIds; - private XceiverClientSpi xceiverClientSpi; + @Option(names = {"-d", "--datanodes"}, + description = "Datanodes to use." + + " Test will write to all the existing pipelines " + + "which this datanode is member of.", + defaultValue = "") + private String datanodes; + + private XceiverClientManager xceiverClientManager; + private List xceiverClients; private Timer timer; private ByteString dataToWrite; private ChecksumData checksumProtobuf; + @Override public Void call() throws Exception { - init(); OzoneConfiguration ozoneConf = createOzoneConfiguration(); + xceiverClientManager = + new XceiverClientManager(ozoneConf); if (OzoneSecurityUtil.isSecurityEnabled(ozoneConf)) { throw new IllegalArgumentException( "Datanode chunk generator is not supported in secure environment"); } - try (StorageContainerLocationProtocol scmLocationClient = - createStorageContainerLocationClient(ozoneConf)) { - List pipelines = scmLocationClient.listPipelines(); - Pipeline pipeline; - if (pipelineId != null && pipelineId.length() > 0) { - pipeline = pipelines.stream() - .filter(p -> p.getId().toString().equals(pipelineId)) - .findFirst() - .orElseThrow(() -> new IllegalArgumentException( - "Pipeline ID is defined, but there is no such pipeline: " - + pipelineId)); + List pipelinesFromCmd = Arrays.asList(pipelineIds.split(",")); + List datanodeHosts = Arrays.asList(this.datanodes.split(",")); + + Set pipelines; + + try (StorageContainerLocationProtocol scmLocationClient = + createStorageContainerLocationClient(ozoneConf)) { + List pipelinesFromSCM = scmLocationClient.listPipelines(); + Pipeline firstPipeline; + init(); + if (!arePipelinesOrDatanodesProvided()) { + //default behaviour if no arguments provided + firstPipeline = pipelinesFromSCM.stream() + .filter(p -> p.getFactor() == ReplicationFactor.THREE) + .findFirst() + .orElseThrow(() -> new IllegalArgumentException( + "Pipeline ID is NOT defined, and no pipeline " + + "has been found with factor=THREE")); + XceiverClientSpi xceiverClientSpi = xceiverClientManager + .acquireClient(firstPipeline); + xceiverClients = new ArrayList<>(); + xceiverClients.add(xceiverClientSpi); } else { - pipeline = pipelines.stream() - .filter(p -> p.getFactor() == ReplicationFactor.THREE) - .findFirst() - .orElseThrow(() -> new IllegalArgumentException( - "Pipeline ID is NOT defined, and no pipeline " + - "has been found with factor=THREE")); - LOG.info("Using pipeline {}", pipeline.getId()); + xceiverClients = new ArrayList<>(); + pipelines = new HashSet<>(); + for(String pipelineId:pipelinesFromCmd){ + List selectedPipelines = pipelinesFromSCM.stream() + .filter((p -> p.getId().toString() + .equals("PipelineID=" + pipelineId) + || pipelineContainsDatanode(p, datanodeHosts))) + .collect(Collectors.toList()); + pipelines.addAll(selectedPipelines); + } + for (Pipeline p:pipelines){ + LOG.info("Writing to pipeline: " + p.getId()); + xceiverClients.add(xceiverClientManager.acquireClient(p)); + } + if (pipelines.isEmpty()){ + throw new IllegalArgumentException( + "Couldn't find the any/the selected pipeline"); + } + } + runTest(); + } finally { + for (XceiverClientSpi xceiverClientSpi : xceiverClients) { + if (xceiverClientSpi != null) { + xceiverClientSpi.close(); + } + } + } + return null; + } + + private boolean pipelineContainsDatanode(Pipeline p, + List datanodeHosts) { + for (DatanodeDetails dn:p.getNodes()){ + if (datanodeHosts.contains(dn.getHostName())){ + return true; } + } + return false; + } - try (XceiverClientManager xceiverClientManager = - new XceiverClientManager(ozoneConf)) { - xceiverClientSpi = xceiverClientManager.acquireClient(pipeline); + private boolean arePipelinesOrDatanodesProvided() { + return !(pipelineIds.equals("") && datanodes.equals("")); + } - timer = getMetrics().timer("chunk-write"); - byte[] data = RandomStringUtils.randomAscii(chunkSize) - .getBytes(StandardCharsets.UTF_8); + private void runTest() + throws IOException { - dataToWrite = ByteString.copyFrom(data); + timer = getMetrics().timer("chunk-write"); - Checksum checksum = new Checksum(ChecksumType.CRC32, chunkSize); - checksumProtobuf = checksum.computeChecksum(data).getProtoBufMessage(); + byte[] data = RandomStringUtils.randomAscii(chunkSize) + .getBytes(StandardCharsets.UTF_8); - runTests(this::writeChunk); - } - } finally { - if (xceiverClientSpi != null) { - xceiverClientSpi.close(); - } - } - return null; + dataToWrite = ByteString.copyFrom(data); + + Checksum checksum = new Checksum(ChecksumType.CRC32, chunkSize); + checksumProtobuf = checksum.computeChecksum(data).getProtoBufMessage(); + + runTests(this::writeChunk); } private void writeChunk(long stepNo) @@ -165,7 +221,19 @@ private void writeChunk(long stepNo) .setChunkData(chunkInfo) .setData(dataToWrite); - String id = xceiverClientSpi.getPipeline().getFirstNode().getUuidString(); + XceiverClientSpi clientSpi = xceiverClients.get( + (int) (stepNo%(xceiverClients.size()))); + sendWriteChunkRequest(blockId, writeChunkRequest, + clientSpi); + + } + + private void sendWriteChunkRequest(DatanodeBlockID blockId, + WriteChunkRequestProto.Builder writeChunkRequest, + XceiverClientSpi xceiverClientSpi) throws Exception { + DatanodeDetails datanodeDetails = xceiverClientSpi. + getPipeline().getFirstNode(); + String id = datanodeDetails.getUuidString(); ContainerCommandRequestProto.Builder builder = ContainerCommandRequestProto @@ -188,7 +256,6 @@ private void writeChunk(long stepNo) } return null; }); - } }