From f36a8ce13f9b10e7a654202613ee5d88a5694bba Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Elek=20M=C3=A1rton?= Date: Wed, 30 Sep 2020 14:01:43 +0200 Subject: [PATCH 1/3] HDDS-4298. Use an interface in Ozone client instead of XceiverClientManager --- .../hadoop/hdds/scm/XceiverClientFactory.java | 44 +++++++++++++++ .../hadoop/hdds/scm/XceiverClientManager.java | 39 +++++++------- .../hdds/scm/storage/BlockInputStream.java | 54 ++++++++++--------- .../hdds/scm/storage/BlockOutputStream.java | 22 ++++---- .../client/io/BlockOutputStreamEntry.java | 22 ++++---- .../client/io/BlockOutputStreamEntryPool.java | 22 ++++---- .../ozone/client/io/KeyInputStream.java | 36 ++++++------- .../ozone/client/io/KeyOutputStream.java | 42 ++++++++------- .../ozone/client/rpc/TestReadRetries.java | 35 ++++++------ 9 files changed, 183 insertions(+), 133 deletions(-) create mode 100644 hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientFactory.java diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientFactory.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientFactory.java new file mode 100644 index 000000000000..184645d59df2 --- /dev/null +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientFactory.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdds.scm; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.function.Function; + +import org.apache.hadoop.hdds.scm.pipeline.Pipeline; + +import org.apache.ratis.thirdparty.com.google.protobuf.ByteString; + +/** + * Interface to provide XceiverClient when needed. + */ +public interface XceiverClientFactory { + + Function byteBufferToByteStringConversion(); + + XceiverClientSpi acquireClient(Pipeline pipeline) throws IOException; + + void releaseClient(XceiverClientSpi xceiverClient, boolean invalidateClient); + + XceiverClientSpi acquireClientForReadData(Pipeline pipeline) + throws IOException; + + void releaseClientForReadData(XceiverClientSpi xceiverClient, boolean b); + +} diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientManager.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientManager.java index 430e6e25ded6..e07a5d2d5dea 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientManager.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientManager.java @@ -18,41 +18,40 @@ package org.apache.hadoop.hdds.scm; -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Preconditions; -import com.google.common.cache.Cache; -import com.google.common.cache.CacheBuilder; -import com.google.common.cache.RemovalListener; -import com.google.common.cache.RemovalNotification; +import java.io.Closeable; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.security.cert.CertificateException; +import java.security.cert.X509Certificate; +import java.util.concurrent.Callable; +import java.util.concurrent.TimeUnit; +import java.util.function.Function; import org.apache.hadoop.hdds.conf.Config; import org.apache.hadoop.hdds.conf.ConfigGroup; import org.apache.hadoop.hdds.conf.ConfigType; import org.apache.hadoop.hdds.conf.ConfigurationSource; -import org.apache.hadoop.hdds.scm.pipeline.Pipeline; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; +import org.apache.hadoop.hdds.scm.pipeline.Pipeline; import org.apache.hadoop.hdds.security.exception.SCMSecurityException; import org.apache.hadoop.hdds.security.x509.certificate.utils.CertificateCodec; import org.apache.hadoop.ozone.OzoneConfigKeys; import org.apache.hadoop.ozone.OzoneSecurityUtil; import org.apache.hadoop.security.UserGroupInformation; -import org.apache.ratis.thirdparty.com.google.protobuf.ByteString; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.Closeable; -import java.io.IOException; -import java.nio.ByteBuffer; -import java.security.cert.CertificateException; -import java.security.cert.X509Certificate; -import java.util.concurrent.Callable; -import java.util.concurrent.TimeUnit; -import java.util.function.Function; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import com.google.common.cache.Cache; +import com.google.common.cache.CacheBuilder; +import com.google.common.cache.RemovalListener; +import com.google.common.cache.RemovalNotification; import static java.util.concurrent.TimeUnit.MILLISECONDS; import static org.apache.hadoop.hdds.conf.ConfigTag.OZONE; import static org.apache.hadoop.hdds.conf.ConfigTag.PERFORMANCE; import static org.apache.hadoop.hdds.scm.exceptions.SCMException.ResultCodes.NO_REPLICA_FOUND; +import org.apache.ratis.thirdparty.com.google.protobuf.ByteString; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * XceiverClientManager is responsible for the lifecycle of XceiverClient @@ -66,7 +65,7 @@ * without reestablishing connection. But the connection will be closed if * not being used for a period of time. */ -public class XceiverClientManager implements Closeable { +public class XceiverClientManager implements Closeable, XceiverClientFactory { private static final Logger LOG = LoggerFactory.getLogger(XceiverClientManager.class); //TODO : change this to SCM configuration class diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java index bcc3cead9715..748d0bfc8cd1 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java @@ -18,32 +18,33 @@ package org.apache.hadoop.hdds.scm.storage; -import com.google.common.annotations.VisibleForTesting; +import java.io.EOFException; +import java.io.IOException; +import java.io.InputStream; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.function.Function; +import org.apache.hadoop.fs.Seekable; +import org.apache.hadoop.hdds.client.BlockID; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChunkInfo; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.DatanodeBlockID; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.GetBlockResponseProto; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; +import org.apache.hadoop.hdds.scm.XceiverClientFactory; +import org.apache.hadoop.hdds.scm.XceiverClientManager; +import org.apache.hadoop.hdds.scm.XceiverClientSpi; import org.apache.hadoop.hdds.scm.container.ContainerNotFoundException; import org.apache.hadoop.hdds.scm.pipeline.Pipeline; import org.apache.hadoop.hdds.security.token.OzoneBlockTokenIdentifier; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.Token; -import org.apache.hadoop.fs.Seekable; -import org.apache.hadoop.hdds.scm.XceiverClientManager; -import org.apache.hadoop.hdds.scm.XceiverClientSpi; -import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChunkInfo; -import org.apache.hadoop.hdds.client.BlockID; -import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.DatanodeBlockID; -import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.GetBlockResponseProto; + +import com.google.common.annotations.VisibleForTesting; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.EOFException; -import java.io.IOException; -import java.io.InputStream; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; -import java.util.function.Function; - /** * An {@link InputStream} called from KeyInputStream to read a block from the * container. @@ -62,7 +63,7 @@ public class BlockInputStream extends InputStream implements Seekable { private Pipeline pipeline; private final Token token; private final boolean verifyChecksum; - private XceiverClientManager xceiverClientManager; + private XceiverClientFactory xceiverClientFactory; private XceiverClientSpi xceiverClient; private boolean initialized = false; @@ -99,23 +100,24 @@ public class BlockInputStream extends InputStream implements Seekable { public BlockInputStream(BlockID blockId, long blockLen, Pipeline pipeline, Token token, boolean verifyChecksum, - XceiverClientManager xceiverClientManager, + XceiverClientFactory xceiverClientFctry, Function refreshPipelineFunction) { this.blockID = blockId; this.length = blockLen; this.pipeline = pipeline; this.token = token; this.verifyChecksum = verifyChecksum; - this.xceiverClientManager = xceiverClientManager; + this.xceiverClientFactory = xceiverClientFctry; this.refreshPipelineFunction = refreshPipelineFunction; } public BlockInputStream(BlockID blockId, long blockLen, Pipeline pipeline, Token token, boolean verifyChecksum, - XceiverClientManager xceiverClientManager) { + XceiverClientManager xceiverClientFactory + ) { this(blockId, blockLen, pipeline, token, verifyChecksum, - xceiverClientManager, null); + xceiverClientFactory, null); } /** * Initialize the BlockInputStream. Get the BlockData (list of chunks) from @@ -181,7 +183,7 @@ protected List getChunkInfos() throws IOException { pipeline = Pipeline.newBuilder(pipeline) .setType(HddsProtos.ReplicationType.STAND_ALONE).build(); } - xceiverClient = xceiverClientManager.acquireClientForReadData(pipeline); + xceiverClient = xceiverClientFactory.acquireClientForReadData(pipeline); boolean success = false; List chunks; try { @@ -202,7 +204,7 @@ protected List getChunkInfos() throws IOException { success = true; } finally { if (!success) { - xceiverClientManager.releaseClientForReadData(xceiverClient, false); + xceiverClientFactory.releaseClientForReadData(xceiverClient, false); } } @@ -378,9 +380,9 @@ public boolean seekToNewSource(long targetPos) throws IOException { @Override public synchronized void close() { - if (xceiverClientManager != null && xceiverClient != null) { - xceiverClientManager.releaseClient(xceiverClient, false); - xceiverClientManager = null; + if (xceiverClientFactory != null && xceiverClient != null) { + xceiverClientFactory.releaseClient(xceiverClient, false); + xceiverClientFactory = null; xceiverClient = null; } } diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java index 1a16caf23537..92071754c332 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java @@ -37,7 +37,7 @@ import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChecksumType; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChunkInfo; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.KeyValue; -import org.apache.hadoop.hdds.scm.XceiverClientManager; +import org.apache.hadoop.hdds.scm.XceiverClientFactory; import org.apache.hadoop.hdds.scm.XceiverClientReply; import org.apache.hadoop.hdds.scm.XceiverClientSpi; import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException; @@ -80,7 +80,7 @@ public class BlockOutputStream extends OutputStream { private AtomicReference blockID; private final BlockData.Builder containerBlockData; - private XceiverClientManager xceiverClientManager; + private XceiverClientFactory xceiverClientFactory; private XceiverClientSpi xceiverClient; private final int bytesPerChecksum; private int chunkIndex; @@ -129,7 +129,7 @@ public class BlockOutputStream extends OutputStream { * Creates a new BlockOutputStream. * * @param blockID block ID - * @param xceiverClientManager client manager that controls client + * @param xceiverClientFactory client manager that controls client * @param pipeline pipeline where block will be written * @param bufferPool pool of buffers * @param streamBufferFlushSize flush size @@ -139,7 +139,7 @@ public class BlockOutputStream extends OutputStream { */ @SuppressWarnings("parameternumber") public BlockOutputStream(BlockID blockID, - XceiverClientManager xceiverClientManager, Pipeline pipeline, + XceiverClientFactory xceiverClientFactory, Pipeline pipeline, int streamBufferSize, long streamBufferFlushSize, boolean streamBufferFlushDelay, long streamBufferMaxSize, BufferPool bufferPool, ChecksumType checksumType, @@ -150,8 +150,8 @@ public BlockOutputStream(BlockID blockID, this.containerBlockData = BlockData.newBuilder().setBlockID(blockID.getDatanodeBlockIDProtobuf()) .addMetadata(keyValue); - this.xceiverClientManager = xceiverClientManager; - this.xceiverClient = xceiverClientManager.acquireClient(pipeline); + this.xceiverClientFactory = xceiverClientFactory; + this.xceiverClient = xceiverClientFactory.acquireClient(pipeline); this.streamBufferSize = streamBufferSize; this.streamBufferFlushSize = streamBufferFlushSize; this.streamBufferMaxSize = streamBufferMaxSize; @@ -477,7 +477,7 @@ ContainerCommandResponseProto> executePutBlock(boolean close, @Override public void flush() throws IOException { - if (xceiverClientManager != null && xceiverClient != null + if (xceiverClientFactory != null && xceiverClient != null && bufferPool != null && bufferPool.getSize() > 0 && (!streamBufferFlushDelay || writtenDataLength - totalDataFlushedLength >= streamBufferSize)) { @@ -543,7 +543,7 @@ private void handleFlush(boolean close) @Override public void close() throws IOException { - if (xceiverClientManager != null && xceiverClient != null + if (xceiverClientFactory != null && xceiverClient != null && bufferPool != null && bufferPool.getSize() > 0) { try { handleFlush(true); @@ -604,10 +604,10 @@ private void setIoException(Exception e) { } public void cleanup(boolean invalidateClient) { - if (xceiverClientManager != null) { - xceiverClientManager.releaseClient(xceiverClient, invalidateClient); + if (xceiverClientFactory != null) { + xceiverClientFactory.releaseClient(xceiverClient, invalidateClient); } - xceiverClientManager = null; + xceiverClientFactory = null; xceiverClient = null; commitWatcher.cleanup(); if (bufferList != null) { diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntry.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntry.java index 8e1e6405e770..8e3059abc433 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntry.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntry.java @@ -19,13 +19,13 @@ import java.io.IOException; import java.io.OutputStream; +import java.util.Collection; +import java.util.Collections; -import com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.hdds.client.BlockID; import org.apache.hadoop.hdds.protocol.DatanodeDetails; -import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos - .ChecksumType; -import org.apache.hadoop.hdds.scm.XceiverClientManager; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChecksumType; +import org.apache.hadoop.hdds.scm.XceiverClientFactory; import org.apache.hadoop.hdds.scm.pipeline.Pipeline; import org.apache.hadoop.hdds.scm.storage.BlockOutputStream; import org.apache.hadoop.hdds.scm.storage.BufferPool; @@ -33,8 +33,7 @@ import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.Token; -import java.util.Collection; -import java.util.Collections; +import com.google.common.annotations.VisibleForTesting; /** * Helper class used inside {@link BlockOutputStream}. @@ -44,7 +43,7 @@ public final class BlockOutputStreamEntry extends OutputStream { private OutputStream outputStream; private BlockID blockID; private final String key; - private final XceiverClientManager xceiverClientManager; + private final XceiverClientFactory xceiverClientManager; private final Pipeline pipeline; private final ChecksumType checksumType; private final int bytesPerChecksum; @@ -64,7 +63,7 @@ public final class BlockOutputStreamEntry extends OutputStream { @SuppressWarnings({"parameternumber", "squid:S00107"}) private BlockOutputStreamEntry(BlockID blockID, String key, - XceiverClientManager xceiverClientManager, + XceiverClientFactory xceiverClientManager, Pipeline pipeline, String requestId, int chunkSize, long length, int streamBufferSize, long streamBufferFlushSize, boolean streamBufferFlushDelay, long streamBufferMaxSize, @@ -215,7 +214,7 @@ public static class Builder { private BlockID blockID; private String key; - private XceiverClientManager xceiverClientManager; + private XceiverClientFactory xceiverClientManager; private Pipeline pipeline; private String requestId; private int chunkSize; @@ -250,7 +249,8 @@ public Builder setKey(String keys) { return this; } - public Builder setXceiverClientManager(XceiverClientManager + public Builder setXceiverClientManager( + XceiverClientFactory xClientManager) { this.xceiverClientManager = xClientManager; return this; @@ -333,7 +333,7 @@ public String getKey() { return key; } - public XceiverClientManager getXceiverClientManager() { + public XceiverClientFactory getXceiverClientManager() { return xceiverClientManager; } diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntryPool.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntryPool.java index 712d1199a335..c8effd846c23 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntryPool.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntryPool.java @@ -25,7 +25,7 @@ import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; -import org.apache.hadoop.hdds.scm.XceiverClientManager; +import org.apache.hadoop.hdds.scm.XceiverClientFactory; import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList; import org.apache.hadoop.hdds.scm.pipeline.PipelineID; import org.apache.hadoop.hdds.scm.storage.BufferPool; @@ -37,11 +37,11 @@ import org.apache.hadoop.ozone.om.helpers.OmMultipartCommitUploadPartInfo; import org.apache.hadoop.ozone.om.protocol.OzoneManagerProtocol; import org.apache.hadoop.security.UserGroupInformation; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * This class manages the stream entries list and handles block allocation @@ -56,7 +56,7 @@ public class BlockOutputStreamEntryPool { private int currentStreamIndex; private final OzoneManagerProtocol omClient; private final OmKeyArgs keyArgs; - private final XceiverClientManager xceiverClientManager; + private final XceiverClientFactory xceiverClientFactory; private final int chunkSize; private final String requestID; private final int streamBufferSize; @@ -81,7 +81,7 @@ public BlockOutputStreamEntryPool(OzoneManagerProtocol omClient, long size, long watchTimeout, ContainerProtos.ChecksumType checksumType, int bytesPerChecksum, String uploadID, int partNumber, boolean isMultipart, OmKeyInfo info, - XceiverClientManager xceiverClientManager, long openID) { + XceiverClientFactory xceiverClientFactory, long openID) { streamEntries = new ArrayList<>(); currentStreamIndex = 0; this.omClient = omClient; @@ -90,7 +90,7 @@ public BlockOutputStreamEntryPool(OzoneManagerProtocol omClient, .setType(type).setFactor(factor).setDataSize(info.getDataSize()) .setIsMultipartKey(isMultipart).setMultipartUploadID(uploadID) .setMultipartUploadPartNumber(partNumber).build(); - this.xceiverClientManager = xceiverClientManager; + this.xceiverClientFactory = xceiverClientFactory; this.chunkSize = chunkSize; this.requestID = requestId; this.streamBufferSize = bufferSize; @@ -122,7 +122,7 @@ public BlockOutputStreamEntryPool(OzoneManagerProtocol omClient, this.bufferPool = new BufferPool(streamBufferSize, (int) (streamBufferMaxSize / streamBufferSize), - xceiverClientManager.byteBufferToByteStringConversion()); + xceiverClientFactory.byteBufferToByteStringConversion()); } /** @@ -135,7 +135,7 @@ public BlockOutputStreamEntryPool(OzoneManagerProtocol omClient, streamEntries = new ArrayList<>(); omClient = null; keyArgs = null; - xceiverClientManager = null; + xceiverClientFactory = null; chunkSize = 0; requestID = null; streamBufferSize = 0; @@ -185,7 +185,7 @@ private void addKeyLocationInfo(OmKeyLocationInfo subKeyInfo) new BlockOutputStreamEntry.Builder() .setBlockID(subKeyInfo.getBlockID()) .setKey(keyArgs.getKeyName()) - .setXceiverClientManager(xceiverClientManager) + .setXceiverClientManager(xceiverClientFactory) .setPipeline(subKeyInfo.getPipeline()) .setRequestId(requestID) .setChunkSize(chunkSize) @@ -257,8 +257,8 @@ List getStreamEntries() { return streamEntries; } - XceiverClientManager getXceiverClientManager() { - return xceiverClientManager; + XceiverClientFactory getXceiverClientFactory() { + return xceiverClientFactory; } String getKeyName() { diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyInputStream.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyInputStream.java index 769035a5e5d0..f8f6cd3fea7e 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyInputStream.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyInputStream.java @@ -17,29 +17,29 @@ */ package org.apache.hadoop.ozone.client.io; -import com.google.common.annotations.VisibleForTesting; +import java.io.EOFException; +import java.io.IOException; +import java.io.InputStream; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.function.Function; +import java.util.stream.Collectors; -import org.apache.commons.collections.CollectionUtils; import org.apache.hadoop.fs.FSExceptionMessages; import org.apache.hadoop.fs.Seekable; import org.apache.hadoop.hdds.client.BlockID; -import org.apache.hadoop.hdds.scm.XceiverClientManager; +import org.apache.hadoop.hdds.scm.XceiverClientFactory; import org.apache.hadoop.hdds.scm.pipeline.Pipeline; import org.apache.hadoop.hdds.scm.storage.BlockInputStream; import org.apache.hadoop.ozone.om.helpers.OmKeyInfo; import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo; + +import com.google.common.annotations.VisibleForTesting; +import org.apache.commons.collections.CollectionUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.EOFException; -import java.io.IOException; -import java.io.InputStream; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; -import java.util.function.Function; -import java.util.stream.Collectors; - /** * Maintaining a list of BlockInputStream. Read based on offset. */ @@ -82,21 +82,21 @@ public KeyInputStream() { * For each block in keyInfo, add a BlockInputStream to blockStreams. */ public static LengthInputStream getFromOmKeyInfo(OmKeyInfo keyInfo, - XceiverClientManager xceiverClientManager, + XceiverClientFactory xceiverClientFactory, boolean verifyChecksum, Function retryFunction) { List keyLocationInfos = keyInfo .getLatestVersionLocations().getBlocksLatestVersionOnly(); KeyInputStream keyInputStream = new KeyInputStream(); keyInputStream.initialize(keyInfo, keyLocationInfos, - xceiverClientManager, verifyChecksum, retryFunction); + xceiverClientFactory, verifyChecksum, retryFunction); return new LengthInputStream(keyInputStream, keyInputStream.length); } private synchronized void initialize(OmKeyInfo keyInfo, List blockInfos, - XceiverClientManager xceiverClientManager, + XceiverClientFactory xceiverClientFactory, boolean verifyChecksum, Function retryFunction) { this.key = keyInfo.getKeyName(); this.blockOffsets = new long[blockInfos.size()]; @@ -110,7 +110,7 @@ private synchronized void initialize(OmKeyInfo keyInfo, // We also pass in functional reference which is used to refresh the // pipeline info for a given OM Key location info. - addStream(omKeyLocationInfo, xceiverClientManager, + addStream(omKeyLocationInfo, xceiverClientFactory, verifyChecksum, keyLocationInfo -> { OmKeyInfo newKeyInfo = retryFunction.apply(keyInfo); BlockID blockID = keyLocationInfo.getBlockID(); @@ -140,12 +140,12 @@ private synchronized void initialize(OmKeyInfo keyInfo, * the block for the first time. */ private synchronized void addStream(OmKeyLocationInfo blockInfo, - XceiverClientManager xceiverClientMngr, + XceiverClientFactory xceiverClientFactory, boolean verifyChecksum, Function refreshPipelineFunction) { blockStreams.add(new BlockInputStream(blockInfo.getBlockID(), blockInfo.getLength(), blockInfo.getPipeline(), blockInfo.getToken(), - verifyChecksum, xceiverClientMngr, + verifyChecksum, xceiverClientFactory, blockID -> refreshPipelineFunction.apply(blockInfo))); } diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java index 843155c372b2..7c4085fb3e54 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java @@ -17,12 +17,23 @@ */ package org.apache.hadoop.ozone.client.io; -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Preconditions; +import java.io.IOException; +import java.io.InterruptedIOException; +import java.io.OutputStream; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.function.Function; +import java.util.stream.Collectors; + import org.apache.hadoop.fs.FSExceptionMessages; import org.apache.hadoop.fs.FileEncryptionInfo; import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChecksumType; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType; +import org.apache.hadoop.hdds.scm.XceiverClientFactory; +import org.apache.hadoop.hdds.scm.XceiverClientManager; import org.apache.hadoop.hdds.scm.client.HddsClientUtils; import org.apache.hadoop.hdds.scm.container.ContainerID; import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList; @@ -31,25 +42,20 @@ import org.apache.hadoop.hdds.scm.pipeline.PipelineID; import org.apache.hadoop.io.retry.RetryPolicies; import org.apache.hadoop.io.retry.RetryPolicy; -import org.apache.hadoop.ozone.om.helpers.*; -import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType; -import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor; +import org.apache.hadoop.ozone.om.helpers.OmKeyInfo; +import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo; +import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfoGroup; +import org.apache.hadoop.ozone.om.helpers.OmMultipartCommitUploadPartInfo; +import org.apache.hadoop.ozone.om.helpers.OpenKeySession; import org.apache.hadoop.ozone.om.protocol.OzoneManagerProtocol; -import org.apache.hadoop.hdds.scm.XceiverClientManager; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; import org.apache.ratis.protocol.AlreadyClosedException; import org.apache.ratis.protocol.RaftRetryFailureException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.IOException; -import java.io.InterruptedIOException; -import java.io.OutputStream; -import java.util.List; -import java.util.Collection; -import java.util.Map; -import java.util.function.Function; -import java.util.stream.Collectors; - /** * Maintaining a list of BlockInputStream. Write based on offset. * @@ -105,8 +111,8 @@ public List getStreamEntries() { } @VisibleForTesting - public XceiverClientManager getXceiverClientManager() { - return blockOutputStreamEntryPool.getXceiverClientManager(); + public XceiverClientFactory getXceiverClientFactory() { + return blockOutputStreamEntryPool.getXceiverClientFactory(); } @VisibleForTesting @@ -121,7 +127,7 @@ public int getRetryCount() { @SuppressWarnings({"parameternumber", "squid:S00107"}) public KeyOutputStream(OpenKeySession handler, - XceiverClientManager xceiverClientManager, + XceiverClientFactory xceiverClientManager, OzoneManagerProtocol omClient, int chunkSize, String requestId, ReplicationFactor factor, ReplicationType type, int bufferSize, long bufferFlushSize, boolean isBufferFlushDelay, diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestReadRetries.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestReadRetries.java index 8c187f4a67c1..96e0d974ffe3 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestReadRetries.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestReadRetries.java @@ -17,6 +17,11 @@ package org.apache.hadoop.ozone.client.rpc; +import java.io.IOException; +import java.util.HashMap; +import java.util.List; +import java.util.UUID; + import org.apache.hadoop.hdds.client.ReplicationFactor; import org.apache.hadoop.hdds.client.ReplicationType; import org.apache.hadoop.hdds.conf.OzoneConfiguration; @@ -29,39 +34,32 @@ import org.apache.hadoop.hdds.scm.container.ContainerID; import org.apache.hadoop.hdds.scm.container.ContainerInfo; import org.apache.hadoop.hdds.scm.pipeline.Pipeline; -import org.apache.hadoop.hdds.scm.protocolPB - .StorageContainerLocationProtocolClientSideTranslatorPB; +import org.apache.hadoop.hdds.scm.protocolPB.StorageContainerLocationProtocolClientSideTranslatorPB; import org.apache.hadoop.ozone.MiniOzoneCluster; -import org.apache.hadoop.ozone.client.OzoneKey; -import org.apache.hadoop.ozone.client.OzoneVolume; -import org.apache.hadoop.ozone.client.OzoneBucket; -import org.apache.hadoop.ozone.client.OzoneKeyDetails; -import org.apache.hadoop.ozone.client.OzoneKeyLocation; import org.apache.hadoop.ozone.client.ObjectStore; +import org.apache.hadoop.ozone.client.OzoneBucket; import org.apache.hadoop.ozone.client.OzoneClient; import org.apache.hadoop.ozone.client.OzoneClientFactory; +import org.apache.hadoop.ozone.client.OzoneKey; +import org.apache.hadoop.ozone.client.OzoneKeyDetails; +import org.apache.hadoop.ozone.client.OzoneKeyLocation; +import org.apache.hadoop.ozone.client.OzoneVolume; import org.apache.hadoop.ozone.client.io.KeyOutputStream; import org.apache.hadoop.ozone.client.io.OzoneInputStream; import org.apache.hadoop.ozone.client.io.OzoneOutputStream; import org.apache.hadoop.ozone.om.OzoneManager; import org.apache.hadoop.ozone.om.helpers.OmKeyArgs; import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo; -import org.junit.Rule; -import org.junit.BeforeClass; + import org.junit.AfterClass; import org.junit.Assert; +import static org.junit.Assert.fail; +import org.junit.BeforeClass; +import org.junit.Rule; import org.junit.Test; - import org.junit.rules.ExpectedException; import org.junit.rules.Timeout; -import java.io.IOException; -import java.util.HashMap; -import java.util.List; -import java.util.UUID; - -import static org.junit.Assert.fail; - /** * Test read retries from multiple nodes in the pipeline. */ @@ -147,7 +145,8 @@ public void testPutKeyAndGetKeyThreeNodes() ReplicationFactor.THREE, new HashMap<>()); KeyOutputStream groupOutputStream = (KeyOutputStream) out.getOutputStream(); - XceiverClientManager manager = groupOutputStream.getXceiverClientManager(); + XceiverClientManager manager = + (XceiverClientManager) groupOutputStream.getXceiverClientFactory(); out.write(value.getBytes()); out.close(); // First, confirm the key info from the client matches the info in OM. From 08dda9ade16b0befb2e643d64906337cd2fa7e34 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Elek=20M=C3=A1rton?= Date: Mon, 5 Oct 2020 12:30:11 +0200 Subject: [PATCH 2/3] move out unsafeByteBufferConversion from the new interface --- .../hadoop/hdds/scm/XceiverClientFactory.java | 6 ------ .../hadoop/hdds/scm/XceiverClientManager.java | 7 ------- .../hadoop/hdds/scm/storage/BufferPool.java | 2 +- .../hadoop/hdds/scm/ByteStringConversion.java | 18 +++++++----------- .../container/keyvalue/KeyValueHandler.java | 14 +++++++++++--- .../client/io/BlockOutputStreamEntryPool.java | 11 ++++++++--- .../ozone/client/io/KeyOutputStream.java | 14 +++++++++++--- .../hadoop/ozone/client/rpc/RpcClient.java | 6 ++++++ 8 files changed, 44 insertions(+), 34 deletions(-) diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientFactory.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientFactory.java index 184645d59df2..dc35cd560b93 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientFactory.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientFactory.java @@ -18,20 +18,14 @@ package org.apache.hadoop.hdds.scm; import java.io.IOException; -import java.nio.ByteBuffer; -import java.util.function.Function; import org.apache.hadoop.hdds.scm.pipeline.Pipeline; -import org.apache.ratis.thirdparty.com.google.protobuf.ByteString; - /** * Interface to provide XceiverClient when needed. */ public interface XceiverClientFactory { - Function byteBufferToByteStringConversion(); - XceiverClientSpi acquireClient(Pipeline pipeline) throws IOException; void releaseClient(XceiverClientSpi xceiverClient, boolean invalidateClient); diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientManager.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientManager.java index e07a5d2d5dea..eaf0503dd7c1 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientManager.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientManager.java @@ -20,12 +20,10 @@ import java.io.Closeable; import java.io.IOException; -import java.nio.ByteBuffer; import java.security.cert.CertificateException; import java.security.cert.X509Certificate; import java.util.concurrent.Callable; import java.util.concurrent.TimeUnit; -import java.util.function.Function; import org.apache.hadoop.hdds.conf.Config; import org.apache.hadoop.hdds.conf.ConfigGroup; @@ -49,7 +47,6 @@ import static org.apache.hadoop.hdds.conf.ConfigTag.OZONE; import static org.apache.hadoop.hdds.conf.ConfigTag.PERFORMANCE; import static org.apache.hadoop.hdds.scm.exceptions.SCMException.ResultCodes.NO_REPLICA_FOUND; -import org.apache.ratis.thirdparty.com.google.protobuf.ByteString; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -277,10 +274,6 @@ public void close() { } } - public Function byteBufferToByteStringConversion(){ - return ByteStringConversion.createByteBufferConversion(conf); - } - /** * Get xceiver client metric. */ diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BufferPool.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BufferPool.java index dc27d4bd882a..94fa87a71e2d 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BufferPool.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BufferPool.java @@ -42,7 +42,7 @@ public class BufferPool { public BufferPool(int bufferSize, int capacity) { this(bufferSize, capacity, - ByteStringConversion.createByteBufferConversion(null)); + ByteStringConversion.createByteBufferConversion(false)); } public BufferPool(int bufferSize, int capacity, diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ByteStringConversion.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ByteStringConversion.java index dc44392db26b..b5f6e4812110 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ByteStringConversion.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ByteStringConversion.java @@ -17,14 +17,14 @@ */ package org.apache.hadoop.hdds.scm; -import org.apache.hadoop.hdds.conf.ConfigurationSource; +import java.nio.ByteBuffer; +import java.util.function.Function; + import org.apache.hadoop.ozone.OzoneConfigKeys; + import org.apache.ratis.thirdparty.com.google.protobuf.ByteString; import org.apache.ratis.thirdparty.com.google.protobuf.UnsafeByteOperations; -import java.nio.ByteBuffer; -import java.util.function.Function; - /** * Helper class to create a conversion function from ByteBuffer to ByteString * based on the property @@ -38,17 +38,13 @@ private ByteStringConversion(){} // no instantiation. * Creates the conversion function to be used to convert ByteBuffers to * ByteString instances to be used in protobuf messages. * - * @param config the Ozone configuration * @return the conversion function defined by - * {@link OzoneConfigKeys#OZONE_UNSAFEBYTEOPERATIONS_ENABLED} + * {@link OzoneConfigKeys#OZONE_UNSAFEBYTEOPERATIONS_ENABLED} * @see ByteBuffer */ public static Function createByteBufferConversion( - ConfigurationSource config){ - boolean unsafeEnabled = - config!=null && config.getBoolean( - OzoneConfigKeys.OZONE_UNSAFEBYTEOPERATIONS_ENABLED, - OzoneConfigKeys.OZONE_UNSAFEBYTEOPERATIONS_ENABLED_DEFAULT); + boolean unsafeEnabled + ) { if (unsafeEnabled) { return UnsafeByteOperations::unsafeWrap; } else { diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java index e0de6ff90f86..70f4ffc0f6d7 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java @@ -47,6 +47,7 @@ import org.apache.hadoop.hdds.scm.ByteStringConversion; import org.apache.hadoop.hdds.scm.ScmConfigKeys; import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException; +import org.apache.hadoop.ozone.OzoneConfigKeys; import org.apache.hadoop.ozone.common.ChunkBuffer; import org.apache.hadoop.ozone.container.common.helpers.BlockData; import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo; @@ -127,14 +128,21 @@ public KeyValueHandler(ConfigurationSource config, String datanodeId, } catch (Exception e) { throw new RuntimeException(e); } - maxContainerSize = (long)config.getStorageSize( + maxContainerSize = (long) config.getStorageSize( ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE, - ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE_DEFAULT, StorageUnit.BYTES); + ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE_DEFAULT, StorageUnit.BYTES); // this handler lock is used for synchronizing createContainer Requests, // so using a fair lock here. containerCreationLock = new AutoCloseableLock(new ReentrantLock(true)); + + boolean isUnsafeByteBufferConversionEnabled = + conf.getBoolean( + OzoneConfigKeys.OZONE_UNSAFEBYTEOPERATIONS_ENABLED, + OzoneConfigKeys.OZONE_UNSAFEBYTEOPERATIONS_ENABLED_DEFAULT); + byteBufferToByteString = - ByteStringConversion.createByteBufferConversion(conf); + ByteStringConversion + .createByteBufferConversion(isUnsafeByteBufferConversionEnabled); } @VisibleForTesting diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntryPool.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntryPool.java index c8effd846c23..71784c5050d9 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntryPool.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntryPool.java @@ -25,6 +25,7 @@ import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; +import org.apache.hadoop.hdds.scm.ByteStringConversion; import org.apache.hadoop.hdds.scm.XceiverClientFactory; import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList; import org.apache.hadoop.hdds.scm.pipeline.PipelineID; @@ -73,7 +74,8 @@ public class BlockOutputStreamEntryPool { private final ExcludeList excludeList; @SuppressWarnings({"parameternumber", "squid:S00107"}) - public BlockOutputStreamEntryPool(OzoneManagerProtocol omClient, + public BlockOutputStreamEntryPool( + OzoneManagerProtocol omClient, int chunkSize, String requestId, HddsProtos.ReplicationFactor factor, HddsProtos.ReplicationType type, int bufferSize, long bufferFlushSize, @@ -81,7 +83,9 @@ public BlockOutputStreamEntryPool(OzoneManagerProtocol omClient, long size, long watchTimeout, ContainerProtos.ChecksumType checksumType, int bytesPerChecksum, String uploadID, int partNumber, boolean isMultipart, OmKeyInfo info, - XceiverClientFactory xceiverClientFactory, long openID) { + boolean unsafeByteBufferConversion, + XceiverClientFactory xceiverClientFactory, long openID + ) { streamEntries = new ArrayList<>(); currentStreamIndex = 0; this.omClient = omClient; @@ -122,7 +126,8 @@ public BlockOutputStreamEntryPool(OzoneManagerProtocol omClient, this.bufferPool = new BufferPool(streamBufferSize, (int) (streamBufferMaxSize / streamBufferSize), - xceiverClientFactory.byteBufferToByteStringConversion()); + ByteStringConversion + .createByteBufferConversion(unsafeByteBufferConversion)); } /** diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java index c65c2645e3ca..03cdb721d51d 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java @@ -134,14 +134,16 @@ public KeyOutputStream(OpenKeySession handler, long bufferMaxSize, long size, long watchTimeout, ChecksumType checksumType, int bytesPerChecksum, String uploadID, int partNumber, boolean isMultipart, - int maxRetryCount, long retryInterval) { + int maxRetryCount, long retryInterval, + boolean unsafeByteBufferConversion) { OmKeyInfo info = handler.getKeyInfo(); blockOutputStreamEntryPool = new BlockOutputStreamEntryPool(omClient, chunkSize, requestId, factor, type, bufferSize, bufferFlushSize, isBufferFlushDelay, bufferMaxSize, size, watchTimeout, checksumType, bytesPerChecksum, uploadID, partNumber, - isMultipart, info, xceiverClientManager, handler.getId()); + isMultipart, info, unsafeByteBufferConversion, + xceiverClientManager, handler.getId()); // Retrieve the file encryption key info, null if file is not in // encrypted bucket. this.feInfo = info.getFileEncryptionInfo(); @@ -560,6 +562,7 @@ public static class Builder { private boolean isMultipartKey; private int maxRetryCount; private long retryInterval; + private boolean unsafeByteBufferConversion; public Builder setMultipartUploadID(String uploadID) { this.multipartUploadID = uploadID; @@ -656,6 +659,11 @@ public Builder setRetryInterval(long retryIntervalInMS) { return this; } + public Builder enableUnsafeByteBufferConversion(boolean enabled) { + this.unsafeByteBufferConversion = enabled; + return this; + } + public KeyOutputStream build() { return new KeyOutputStream(openHandler, xceiverManager, omClient, chunkSize, requestID, factor, type, @@ -663,7 +671,7 @@ public KeyOutputStream build() { streamBufferMaxSize, blockSize, watchTimeout, checksumType, bytesPerChecksum, multipartUploadID, multipartNumber, isMultipartKey, - maxRetryCount, retryInterval); + maxRetryCount, retryInterval, unsafeByteBufferConversion); } } diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java index 94fbe52498ca..c61d0eb2074d 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java @@ -132,6 +132,7 @@ public class RpcClient implements ClientProtocol { private final int chunkSize; private final ChecksumType checksumType; private final int bytesPerChecksum; + private final boolean unsafeByteBufferConversion; private boolean verifyChecksum; private final UserGroupInformation ugi; private final ACLType userRights; @@ -211,6 +212,10 @@ public RpcClient(ConfigurationSource conf, String omServiceId) blockSize = (long) conf.getStorageSize(OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE, OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE_DEFAULT, StorageUnit.BYTES); + unsafeByteBufferConversion = conf.getBoolean( + OzoneConfigKeys.OZONE_UNSAFEBYTEOPERATIONS_ENABLED, + OzoneConfigKeys.OZONE_UNSAFEBYTEOPERATIONS_ENABLED_DEFAULT); + int configuredChecksumSize = (int) conf.getStorageSize( OzoneConfigKeys.OZONE_CLIENT_BYTES_PER_CHECKSUM, OzoneConfigKeys.OZONE_CLIENT_BYTES_PER_CHECKSUM_DEFAULT, @@ -1279,6 +1284,7 @@ private OzoneOutputStream createOutputStream(OpenKeySession openKey, .setBytesPerChecksum(bytesPerChecksum) .setMaxRetryCount(maxRetryCount) .setRetryInterval(retryInterval) + .enableUnsafeByteBufferConversion(unsafeByteBufferConversion) .build(); keyOutputStream .addPreallocateBlocks(openKey.getKeyInfo().getLatestVersionLocations(), From 3ef56cfad1e28a51380b0f6fee771f94d578f02c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Elek=20M=C3=A1rton?= Date: Mon, 5 Oct 2020 13:53:31 +0200 Subject: [PATCH 3/3] remove cast form unit test --- .../apache/hadoop/ozone/client/rpc/TestReadRetries.java | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestReadRetries.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestReadRetries.java index 96e0d974ffe3..914845931df5 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestReadRetries.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestReadRetries.java @@ -28,7 +28,7 @@ import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.hdds.scm.ScmConfigKeys; -import org.apache.hadoop.hdds.scm.XceiverClientManager; +import org.apache.hadoop.hdds.scm.XceiverClientFactory; import org.apache.hadoop.hdds.scm.XceiverClientRatis; import org.apache.hadoop.hdds.scm.XceiverClientSpi; import org.apache.hadoop.hdds.scm.container.ContainerID; @@ -145,8 +145,7 @@ public void testPutKeyAndGetKeyThreeNodes() ReplicationFactor.THREE, new HashMap<>()); KeyOutputStream groupOutputStream = (KeyOutputStream) out.getOutputStream(); - XceiverClientManager manager = - (XceiverClientManager) groupOutputStream.getXceiverClientFactory(); + XceiverClientFactory factory = groupOutputStream.getXceiverClientFactory(); out.write(value.getBytes()); out.close(); // First, confirm the key info from the client matches the info in OM. @@ -178,7 +177,7 @@ public void testPutKeyAndGetKeyThreeNodes() DatanodeDetails datanodeDetails = datanodes.get(0); Assert.assertNotNull(datanodeDetails); - XceiverClientSpi clientSpi = manager.acquireClient(pipeline); + XceiverClientSpi clientSpi = factory.acquireClient(pipeline); Assert.assertTrue(clientSpi instanceof XceiverClientRatis); XceiverClientRatis ratisClient = (XceiverClientRatis)clientSpi; @@ -206,7 +205,7 @@ public void testPutKeyAndGetKeyThreeNodes() // it should throw an ioException as none of the servers // are available } - manager.releaseClient(clientSpi, false); + factory.releaseClient(clientSpi, false); } private void readKey(OzoneBucket bucket, String keyName, String data)