diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientFactory.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientFactory.java new file mode 100644 index 000000000000..dc35cd560b93 --- /dev/null +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientFactory.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdds.scm; + +import java.io.IOException; + +import org.apache.hadoop.hdds.scm.pipeline.Pipeline; + +/** + * Interface to provide XceiverClient when needed. + */ +public interface XceiverClientFactory { + + XceiverClientSpi acquireClient(Pipeline pipeline) throws IOException; + + void releaseClient(XceiverClientSpi xceiverClient, boolean invalidateClient); + + XceiverClientSpi acquireClientForReadData(Pipeline pipeline) + throws IOException; + + void releaseClientForReadData(XceiverClientSpi xceiverClient, boolean b); + +} diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientManager.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientManager.java index 430e6e25ded6..eaf0503dd7c1 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientManager.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientManager.java @@ -18,41 +18,37 @@ package org.apache.hadoop.hdds.scm; -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Preconditions; -import com.google.common.cache.Cache; -import com.google.common.cache.CacheBuilder; -import com.google.common.cache.RemovalListener; -import com.google.common.cache.RemovalNotification; +import java.io.Closeable; +import java.io.IOException; +import java.security.cert.CertificateException; +import java.security.cert.X509Certificate; +import java.util.concurrent.Callable; +import java.util.concurrent.TimeUnit; import org.apache.hadoop.hdds.conf.Config; import org.apache.hadoop.hdds.conf.ConfigGroup; import org.apache.hadoop.hdds.conf.ConfigType; import org.apache.hadoop.hdds.conf.ConfigurationSource; -import org.apache.hadoop.hdds.scm.pipeline.Pipeline; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; +import org.apache.hadoop.hdds.scm.pipeline.Pipeline; import org.apache.hadoop.hdds.security.exception.SCMSecurityException; import org.apache.hadoop.hdds.security.x509.certificate.utils.CertificateCodec; import org.apache.hadoop.ozone.OzoneConfigKeys; import org.apache.hadoop.ozone.OzoneSecurityUtil; import org.apache.hadoop.security.UserGroupInformation; -import org.apache.ratis.thirdparty.com.google.protobuf.ByteString; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.Closeable; -import java.io.IOException; -import java.nio.ByteBuffer; -import java.security.cert.CertificateException; -import java.security.cert.X509Certificate; -import java.util.concurrent.Callable; -import java.util.concurrent.TimeUnit; -import java.util.function.Function; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import com.google.common.cache.Cache; +import com.google.common.cache.CacheBuilder; +import com.google.common.cache.RemovalListener; +import com.google.common.cache.RemovalNotification; import static java.util.concurrent.TimeUnit.MILLISECONDS; import static org.apache.hadoop.hdds.conf.ConfigTag.OZONE; import static org.apache.hadoop.hdds.conf.ConfigTag.PERFORMANCE; import static org.apache.hadoop.hdds.scm.exceptions.SCMException.ResultCodes.NO_REPLICA_FOUND; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * XceiverClientManager is responsible for the lifecycle of XceiverClient @@ -66,7 +62,7 @@ * without reestablishing connection. But the connection will be closed if * not being used for a period of time. */ -public class XceiverClientManager implements Closeable { +public class XceiverClientManager implements Closeable, XceiverClientFactory { private static final Logger LOG = LoggerFactory.getLogger(XceiverClientManager.class); //TODO : change this to SCM configuration class @@ -278,10 +274,6 @@ public void close() { } } - public Function byteBufferToByteStringConversion(){ - return ByteStringConversion.createByteBufferConversion(conf); - } - /** * Get xceiver client metric. */ diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java index bcc3cead9715..748d0bfc8cd1 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java @@ -18,32 +18,33 @@ package org.apache.hadoop.hdds.scm.storage; -import com.google.common.annotations.VisibleForTesting; +import java.io.EOFException; +import java.io.IOException; +import java.io.InputStream; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.function.Function; +import org.apache.hadoop.fs.Seekable; +import org.apache.hadoop.hdds.client.BlockID; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChunkInfo; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.DatanodeBlockID; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.GetBlockResponseProto; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; +import org.apache.hadoop.hdds.scm.XceiverClientFactory; +import org.apache.hadoop.hdds.scm.XceiverClientManager; +import org.apache.hadoop.hdds.scm.XceiverClientSpi; import org.apache.hadoop.hdds.scm.container.ContainerNotFoundException; import org.apache.hadoop.hdds.scm.pipeline.Pipeline; import org.apache.hadoop.hdds.security.token.OzoneBlockTokenIdentifier; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.Token; -import org.apache.hadoop.fs.Seekable; -import org.apache.hadoop.hdds.scm.XceiverClientManager; -import org.apache.hadoop.hdds.scm.XceiverClientSpi; -import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChunkInfo; -import org.apache.hadoop.hdds.client.BlockID; -import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.DatanodeBlockID; -import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.GetBlockResponseProto; + +import com.google.common.annotations.VisibleForTesting; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.EOFException; -import java.io.IOException; -import java.io.InputStream; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; -import java.util.function.Function; - /** * An {@link InputStream} called from KeyInputStream to read a block from the * container. @@ -62,7 +63,7 @@ public class BlockInputStream extends InputStream implements Seekable { private Pipeline pipeline; private final Token token; private final boolean verifyChecksum; - private XceiverClientManager xceiverClientManager; + private XceiverClientFactory xceiverClientFactory; private XceiverClientSpi xceiverClient; private boolean initialized = false; @@ -99,23 +100,24 @@ public class BlockInputStream extends InputStream implements Seekable { public BlockInputStream(BlockID blockId, long blockLen, Pipeline pipeline, Token token, boolean verifyChecksum, - XceiverClientManager xceiverClientManager, + XceiverClientFactory xceiverClientFctry, Function refreshPipelineFunction) { this.blockID = blockId; this.length = blockLen; this.pipeline = pipeline; this.token = token; this.verifyChecksum = verifyChecksum; - this.xceiverClientManager = xceiverClientManager; + this.xceiverClientFactory = xceiverClientFctry; this.refreshPipelineFunction = refreshPipelineFunction; } public BlockInputStream(BlockID blockId, long blockLen, Pipeline pipeline, Token token, boolean verifyChecksum, - XceiverClientManager xceiverClientManager) { + XceiverClientManager xceiverClientFactory + ) { this(blockId, blockLen, pipeline, token, verifyChecksum, - xceiverClientManager, null); + xceiverClientFactory, null); } /** * Initialize the BlockInputStream. Get the BlockData (list of chunks) from @@ -181,7 +183,7 @@ protected List getChunkInfos() throws IOException { pipeline = Pipeline.newBuilder(pipeline) .setType(HddsProtos.ReplicationType.STAND_ALONE).build(); } - xceiverClient = xceiverClientManager.acquireClientForReadData(pipeline); + xceiverClient = xceiverClientFactory.acquireClientForReadData(pipeline); boolean success = false; List chunks; try { @@ -202,7 +204,7 @@ protected List getChunkInfos() throws IOException { success = true; } finally { if (!success) { - xceiverClientManager.releaseClientForReadData(xceiverClient, false); + xceiverClientFactory.releaseClientForReadData(xceiverClient, false); } } @@ -378,9 +380,9 @@ public boolean seekToNewSource(long targetPos) throws IOException { @Override public synchronized void close() { - if (xceiverClientManager != null && xceiverClient != null) { - xceiverClientManager.releaseClient(xceiverClient, false); - xceiverClientManager = null; + if (xceiverClientFactory != null && xceiverClient != null) { + xceiverClientFactory.releaseClient(xceiverClient, false); + xceiverClientFactory = null; xceiverClient = null; } } diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java index 1a16caf23537..92071754c332 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java @@ -37,7 +37,7 @@ import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChecksumType; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChunkInfo; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.KeyValue; -import org.apache.hadoop.hdds.scm.XceiverClientManager; +import org.apache.hadoop.hdds.scm.XceiverClientFactory; import org.apache.hadoop.hdds.scm.XceiverClientReply; import org.apache.hadoop.hdds.scm.XceiverClientSpi; import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException; @@ -80,7 +80,7 @@ public class BlockOutputStream extends OutputStream { private AtomicReference blockID; private final BlockData.Builder containerBlockData; - private XceiverClientManager xceiverClientManager; + private XceiverClientFactory xceiverClientFactory; private XceiverClientSpi xceiverClient; private final int bytesPerChecksum; private int chunkIndex; @@ -129,7 +129,7 @@ public class BlockOutputStream extends OutputStream { * Creates a new BlockOutputStream. * * @param blockID block ID - * @param xceiverClientManager client manager that controls client + * @param xceiverClientFactory client manager that controls client * @param pipeline pipeline where block will be written * @param bufferPool pool of buffers * @param streamBufferFlushSize flush size @@ -139,7 +139,7 @@ public class BlockOutputStream extends OutputStream { */ @SuppressWarnings("parameternumber") public BlockOutputStream(BlockID blockID, - XceiverClientManager xceiverClientManager, Pipeline pipeline, + XceiverClientFactory xceiverClientFactory, Pipeline pipeline, int streamBufferSize, long streamBufferFlushSize, boolean streamBufferFlushDelay, long streamBufferMaxSize, BufferPool bufferPool, ChecksumType checksumType, @@ -150,8 +150,8 @@ public BlockOutputStream(BlockID blockID, this.containerBlockData = BlockData.newBuilder().setBlockID(blockID.getDatanodeBlockIDProtobuf()) .addMetadata(keyValue); - this.xceiverClientManager = xceiverClientManager; - this.xceiverClient = xceiverClientManager.acquireClient(pipeline); + this.xceiverClientFactory = xceiverClientFactory; + this.xceiverClient = xceiverClientFactory.acquireClient(pipeline); this.streamBufferSize = streamBufferSize; this.streamBufferFlushSize = streamBufferFlushSize; this.streamBufferMaxSize = streamBufferMaxSize; @@ -477,7 +477,7 @@ ContainerCommandResponseProto> executePutBlock(boolean close, @Override public void flush() throws IOException { - if (xceiverClientManager != null && xceiverClient != null + if (xceiverClientFactory != null && xceiverClient != null && bufferPool != null && bufferPool.getSize() > 0 && (!streamBufferFlushDelay || writtenDataLength - totalDataFlushedLength >= streamBufferSize)) { @@ -543,7 +543,7 @@ private void handleFlush(boolean close) @Override public void close() throws IOException { - if (xceiverClientManager != null && xceiverClient != null + if (xceiverClientFactory != null && xceiverClient != null && bufferPool != null && bufferPool.getSize() > 0) { try { handleFlush(true); @@ -604,10 +604,10 @@ private void setIoException(Exception e) { } public void cleanup(boolean invalidateClient) { - if (xceiverClientManager != null) { - xceiverClientManager.releaseClient(xceiverClient, invalidateClient); + if (xceiverClientFactory != null) { + xceiverClientFactory.releaseClient(xceiverClient, invalidateClient); } - xceiverClientManager = null; + xceiverClientFactory = null; xceiverClient = null; commitWatcher.cleanup(); if (bufferList != null) { diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BufferPool.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BufferPool.java index dc27d4bd882a..94fa87a71e2d 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BufferPool.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BufferPool.java @@ -42,7 +42,7 @@ public class BufferPool { public BufferPool(int bufferSize, int capacity) { this(bufferSize, capacity, - ByteStringConversion.createByteBufferConversion(null)); + ByteStringConversion.createByteBufferConversion(false)); } public BufferPool(int bufferSize, int capacity, diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ByteStringConversion.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ByteStringConversion.java index dc44392db26b..b5f6e4812110 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ByteStringConversion.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ByteStringConversion.java @@ -17,14 +17,14 @@ */ package org.apache.hadoop.hdds.scm; -import org.apache.hadoop.hdds.conf.ConfigurationSource; +import java.nio.ByteBuffer; +import java.util.function.Function; + import org.apache.hadoop.ozone.OzoneConfigKeys; + import org.apache.ratis.thirdparty.com.google.protobuf.ByteString; import org.apache.ratis.thirdparty.com.google.protobuf.UnsafeByteOperations; -import java.nio.ByteBuffer; -import java.util.function.Function; - /** * Helper class to create a conversion function from ByteBuffer to ByteString * based on the property @@ -38,17 +38,13 @@ private ByteStringConversion(){} // no instantiation. * Creates the conversion function to be used to convert ByteBuffers to * ByteString instances to be used in protobuf messages. * - * @param config the Ozone configuration * @return the conversion function defined by - * {@link OzoneConfigKeys#OZONE_UNSAFEBYTEOPERATIONS_ENABLED} + * {@link OzoneConfigKeys#OZONE_UNSAFEBYTEOPERATIONS_ENABLED} * @see ByteBuffer */ public static Function createByteBufferConversion( - ConfigurationSource config){ - boolean unsafeEnabled = - config!=null && config.getBoolean( - OzoneConfigKeys.OZONE_UNSAFEBYTEOPERATIONS_ENABLED, - OzoneConfigKeys.OZONE_UNSAFEBYTEOPERATIONS_ENABLED_DEFAULT); + boolean unsafeEnabled + ) { if (unsafeEnabled) { return UnsafeByteOperations::unsafeWrap; } else { diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java index e0de6ff90f86..70f4ffc0f6d7 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java @@ -47,6 +47,7 @@ import org.apache.hadoop.hdds.scm.ByteStringConversion; import org.apache.hadoop.hdds.scm.ScmConfigKeys; import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException; +import org.apache.hadoop.ozone.OzoneConfigKeys; import org.apache.hadoop.ozone.common.ChunkBuffer; import org.apache.hadoop.ozone.container.common.helpers.BlockData; import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo; @@ -127,14 +128,21 @@ public KeyValueHandler(ConfigurationSource config, String datanodeId, } catch (Exception e) { throw new RuntimeException(e); } - maxContainerSize = (long)config.getStorageSize( + maxContainerSize = (long) config.getStorageSize( ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE, - ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE_DEFAULT, StorageUnit.BYTES); + ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE_DEFAULT, StorageUnit.BYTES); // this handler lock is used for synchronizing createContainer Requests, // so using a fair lock here. containerCreationLock = new AutoCloseableLock(new ReentrantLock(true)); + + boolean isUnsafeByteBufferConversionEnabled = + conf.getBoolean( + OzoneConfigKeys.OZONE_UNSAFEBYTEOPERATIONS_ENABLED, + OzoneConfigKeys.OZONE_UNSAFEBYTEOPERATIONS_ENABLED_DEFAULT); + byteBufferToByteString = - ByteStringConversion.createByteBufferConversion(conf); + ByteStringConversion + .createByteBufferConversion(isUnsafeByteBufferConversionEnabled); } @VisibleForTesting diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntry.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntry.java index 8e1e6405e770..8e3059abc433 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntry.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntry.java @@ -19,13 +19,13 @@ import java.io.IOException; import java.io.OutputStream; +import java.util.Collection; +import java.util.Collections; -import com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.hdds.client.BlockID; import org.apache.hadoop.hdds.protocol.DatanodeDetails; -import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos - .ChecksumType; -import org.apache.hadoop.hdds.scm.XceiverClientManager; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChecksumType; +import org.apache.hadoop.hdds.scm.XceiverClientFactory; import org.apache.hadoop.hdds.scm.pipeline.Pipeline; import org.apache.hadoop.hdds.scm.storage.BlockOutputStream; import org.apache.hadoop.hdds.scm.storage.BufferPool; @@ -33,8 +33,7 @@ import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.Token; -import java.util.Collection; -import java.util.Collections; +import com.google.common.annotations.VisibleForTesting; /** * Helper class used inside {@link BlockOutputStream}. @@ -44,7 +43,7 @@ public final class BlockOutputStreamEntry extends OutputStream { private OutputStream outputStream; private BlockID blockID; private final String key; - private final XceiverClientManager xceiverClientManager; + private final XceiverClientFactory xceiverClientManager; private final Pipeline pipeline; private final ChecksumType checksumType; private final int bytesPerChecksum; @@ -64,7 +63,7 @@ public final class BlockOutputStreamEntry extends OutputStream { @SuppressWarnings({"parameternumber", "squid:S00107"}) private BlockOutputStreamEntry(BlockID blockID, String key, - XceiverClientManager xceiverClientManager, + XceiverClientFactory xceiverClientManager, Pipeline pipeline, String requestId, int chunkSize, long length, int streamBufferSize, long streamBufferFlushSize, boolean streamBufferFlushDelay, long streamBufferMaxSize, @@ -215,7 +214,7 @@ public static class Builder { private BlockID blockID; private String key; - private XceiverClientManager xceiverClientManager; + private XceiverClientFactory xceiverClientManager; private Pipeline pipeline; private String requestId; private int chunkSize; @@ -250,7 +249,8 @@ public Builder setKey(String keys) { return this; } - public Builder setXceiverClientManager(XceiverClientManager + public Builder setXceiverClientManager( + XceiverClientFactory xClientManager) { this.xceiverClientManager = xClientManager; return this; @@ -333,7 +333,7 @@ public String getKey() { return key; } - public XceiverClientManager getXceiverClientManager() { + public XceiverClientFactory getXceiverClientManager() { return xceiverClientManager; } diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntryPool.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntryPool.java index 712d1199a335..71784c5050d9 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntryPool.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntryPool.java @@ -25,7 +25,8 @@ import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; -import org.apache.hadoop.hdds.scm.XceiverClientManager; +import org.apache.hadoop.hdds.scm.ByteStringConversion; +import org.apache.hadoop.hdds.scm.XceiverClientFactory; import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList; import org.apache.hadoop.hdds.scm.pipeline.PipelineID; import org.apache.hadoop.hdds.scm.storage.BufferPool; @@ -37,11 +38,11 @@ import org.apache.hadoop.ozone.om.helpers.OmMultipartCommitUploadPartInfo; import org.apache.hadoop.ozone.om.protocol.OzoneManagerProtocol; import org.apache.hadoop.security.UserGroupInformation; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * This class manages the stream entries list and handles block allocation @@ -56,7 +57,7 @@ public class BlockOutputStreamEntryPool { private int currentStreamIndex; private final OzoneManagerProtocol omClient; private final OmKeyArgs keyArgs; - private final XceiverClientManager xceiverClientManager; + private final XceiverClientFactory xceiverClientFactory; private final int chunkSize; private final String requestID; private final int streamBufferSize; @@ -73,7 +74,8 @@ public class BlockOutputStreamEntryPool { private final ExcludeList excludeList; @SuppressWarnings({"parameternumber", "squid:S00107"}) - public BlockOutputStreamEntryPool(OzoneManagerProtocol omClient, + public BlockOutputStreamEntryPool( + OzoneManagerProtocol omClient, int chunkSize, String requestId, HddsProtos.ReplicationFactor factor, HddsProtos.ReplicationType type, int bufferSize, long bufferFlushSize, @@ -81,7 +83,9 @@ public BlockOutputStreamEntryPool(OzoneManagerProtocol omClient, long size, long watchTimeout, ContainerProtos.ChecksumType checksumType, int bytesPerChecksum, String uploadID, int partNumber, boolean isMultipart, OmKeyInfo info, - XceiverClientManager xceiverClientManager, long openID) { + boolean unsafeByteBufferConversion, + XceiverClientFactory xceiverClientFactory, long openID + ) { streamEntries = new ArrayList<>(); currentStreamIndex = 0; this.omClient = omClient; @@ -90,7 +94,7 @@ public BlockOutputStreamEntryPool(OzoneManagerProtocol omClient, .setType(type).setFactor(factor).setDataSize(info.getDataSize()) .setIsMultipartKey(isMultipart).setMultipartUploadID(uploadID) .setMultipartUploadPartNumber(partNumber).build(); - this.xceiverClientManager = xceiverClientManager; + this.xceiverClientFactory = xceiverClientFactory; this.chunkSize = chunkSize; this.requestID = requestId; this.streamBufferSize = bufferSize; @@ -122,7 +126,8 @@ public BlockOutputStreamEntryPool(OzoneManagerProtocol omClient, this.bufferPool = new BufferPool(streamBufferSize, (int) (streamBufferMaxSize / streamBufferSize), - xceiverClientManager.byteBufferToByteStringConversion()); + ByteStringConversion + .createByteBufferConversion(unsafeByteBufferConversion)); } /** @@ -135,7 +140,7 @@ public BlockOutputStreamEntryPool(OzoneManagerProtocol omClient, streamEntries = new ArrayList<>(); omClient = null; keyArgs = null; - xceiverClientManager = null; + xceiverClientFactory = null; chunkSize = 0; requestID = null; streamBufferSize = 0; @@ -185,7 +190,7 @@ private void addKeyLocationInfo(OmKeyLocationInfo subKeyInfo) new BlockOutputStreamEntry.Builder() .setBlockID(subKeyInfo.getBlockID()) .setKey(keyArgs.getKeyName()) - .setXceiverClientManager(xceiverClientManager) + .setXceiverClientManager(xceiverClientFactory) .setPipeline(subKeyInfo.getPipeline()) .setRequestId(requestID) .setChunkSize(chunkSize) @@ -257,8 +262,8 @@ List getStreamEntries() { return streamEntries; } - XceiverClientManager getXceiverClientManager() { - return xceiverClientManager; + XceiverClientFactory getXceiverClientFactory() { + return xceiverClientFactory; } String getKeyName() { diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyInputStream.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyInputStream.java index 769035a5e5d0..f8f6cd3fea7e 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyInputStream.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyInputStream.java @@ -17,29 +17,29 @@ */ package org.apache.hadoop.ozone.client.io; -import com.google.common.annotations.VisibleForTesting; +import java.io.EOFException; +import java.io.IOException; +import java.io.InputStream; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.function.Function; +import java.util.stream.Collectors; -import org.apache.commons.collections.CollectionUtils; import org.apache.hadoop.fs.FSExceptionMessages; import org.apache.hadoop.fs.Seekable; import org.apache.hadoop.hdds.client.BlockID; -import org.apache.hadoop.hdds.scm.XceiverClientManager; +import org.apache.hadoop.hdds.scm.XceiverClientFactory; import org.apache.hadoop.hdds.scm.pipeline.Pipeline; import org.apache.hadoop.hdds.scm.storage.BlockInputStream; import org.apache.hadoop.ozone.om.helpers.OmKeyInfo; import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo; + +import com.google.common.annotations.VisibleForTesting; +import org.apache.commons.collections.CollectionUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.EOFException; -import java.io.IOException; -import java.io.InputStream; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; -import java.util.function.Function; -import java.util.stream.Collectors; - /** * Maintaining a list of BlockInputStream. Read based on offset. */ @@ -82,21 +82,21 @@ public KeyInputStream() { * For each block in keyInfo, add a BlockInputStream to blockStreams. */ public static LengthInputStream getFromOmKeyInfo(OmKeyInfo keyInfo, - XceiverClientManager xceiverClientManager, + XceiverClientFactory xceiverClientFactory, boolean verifyChecksum, Function retryFunction) { List keyLocationInfos = keyInfo .getLatestVersionLocations().getBlocksLatestVersionOnly(); KeyInputStream keyInputStream = new KeyInputStream(); keyInputStream.initialize(keyInfo, keyLocationInfos, - xceiverClientManager, verifyChecksum, retryFunction); + xceiverClientFactory, verifyChecksum, retryFunction); return new LengthInputStream(keyInputStream, keyInputStream.length); } private synchronized void initialize(OmKeyInfo keyInfo, List blockInfos, - XceiverClientManager xceiverClientManager, + XceiverClientFactory xceiverClientFactory, boolean verifyChecksum, Function retryFunction) { this.key = keyInfo.getKeyName(); this.blockOffsets = new long[blockInfos.size()]; @@ -110,7 +110,7 @@ private synchronized void initialize(OmKeyInfo keyInfo, // We also pass in functional reference which is used to refresh the // pipeline info for a given OM Key location info. - addStream(omKeyLocationInfo, xceiverClientManager, + addStream(omKeyLocationInfo, xceiverClientFactory, verifyChecksum, keyLocationInfo -> { OmKeyInfo newKeyInfo = retryFunction.apply(keyInfo); BlockID blockID = keyLocationInfo.getBlockID(); @@ -140,12 +140,12 @@ private synchronized void initialize(OmKeyInfo keyInfo, * the block for the first time. */ private synchronized void addStream(OmKeyLocationInfo blockInfo, - XceiverClientManager xceiverClientMngr, + XceiverClientFactory xceiverClientFactory, boolean verifyChecksum, Function refreshPipelineFunction) { blockStreams.add(new BlockInputStream(blockInfo.getBlockID(), blockInfo.getLength(), blockInfo.getPipeline(), blockInfo.getToken(), - verifyChecksum, xceiverClientMngr, + verifyChecksum, xceiverClientFactory, blockID -> refreshPipelineFunction.apply(blockInfo))); } diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java index b0f5672aadef..03cdb721d51d 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java @@ -17,12 +17,23 @@ */ package org.apache.hadoop.ozone.client.io; -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Preconditions; +import java.io.IOException; +import java.io.InterruptedIOException; +import java.io.OutputStream; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.function.Function; +import java.util.stream.Collectors; + import org.apache.hadoop.fs.FSExceptionMessages; import org.apache.hadoop.fs.FileEncryptionInfo; import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChecksumType; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType; +import org.apache.hadoop.hdds.scm.XceiverClientFactory; +import org.apache.hadoop.hdds.scm.XceiverClientManager; import org.apache.hadoop.hdds.scm.client.HddsClientUtils; import org.apache.hadoop.hdds.scm.container.ContainerID; import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList; @@ -31,25 +42,20 @@ import org.apache.hadoop.hdds.scm.pipeline.PipelineID; import org.apache.hadoop.io.retry.RetryPolicies; import org.apache.hadoop.io.retry.RetryPolicy; -import org.apache.hadoop.ozone.om.helpers.*; -import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType; -import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor; +import org.apache.hadoop.ozone.om.helpers.OmKeyInfo; +import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo; +import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfoGroup; +import org.apache.hadoop.ozone.om.helpers.OmMultipartCommitUploadPartInfo; +import org.apache.hadoop.ozone.om.helpers.OpenKeySession; import org.apache.hadoop.ozone.om.protocol.OzoneManagerProtocol; -import org.apache.hadoop.hdds.scm.XceiverClientManager; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; import org.apache.ratis.protocol.exceptions.AlreadyClosedException; import org.apache.ratis.protocol.exceptions.RaftRetryFailureException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.IOException; -import java.io.InterruptedIOException; -import java.io.OutputStream; -import java.util.List; -import java.util.Collection; -import java.util.Map; -import java.util.function.Function; -import java.util.stream.Collectors; - /** * Maintaining a list of BlockInputStream. Write based on offset. * @@ -105,8 +111,8 @@ public List getStreamEntries() { } @VisibleForTesting - public XceiverClientManager getXceiverClientManager() { - return blockOutputStreamEntryPool.getXceiverClientManager(); + public XceiverClientFactory getXceiverClientFactory() { + return blockOutputStreamEntryPool.getXceiverClientFactory(); } @VisibleForTesting @@ -121,21 +127,23 @@ public int getRetryCount() { @SuppressWarnings({"parameternumber", "squid:S00107"}) public KeyOutputStream(OpenKeySession handler, - XceiverClientManager xceiverClientManager, + XceiverClientFactory xceiverClientManager, OzoneManagerProtocol omClient, int chunkSize, String requestId, ReplicationFactor factor, ReplicationType type, int bufferSize, long bufferFlushSize, boolean isBufferFlushDelay, long bufferMaxSize, long size, long watchTimeout, ChecksumType checksumType, int bytesPerChecksum, String uploadID, int partNumber, boolean isMultipart, - int maxRetryCount, long retryInterval) { + int maxRetryCount, long retryInterval, + boolean unsafeByteBufferConversion) { OmKeyInfo info = handler.getKeyInfo(); blockOutputStreamEntryPool = new BlockOutputStreamEntryPool(omClient, chunkSize, requestId, factor, type, bufferSize, bufferFlushSize, isBufferFlushDelay, bufferMaxSize, size, watchTimeout, checksumType, bytesPerChecksum, uploadID, partNumber, - isMultipart, info, xceiverClientManager, handler.getId()); + isMultipart, info, unsafeByteBufferConversion, + xceiverClientManager, handler.getId()); // Retrieve the file encryption key info, null if file is not in // encrypted bucket. this.feInfo = info.getFileEncryptionInfo(); @@ -554,6 +562,7 @@ public static class Builder { private boolean isMultipartKey; private int maxRetryCount; private long retryInterval; + private boolean unsafeByteBufferConversion; public Builder setMultipartUploadID(String uploadID) { this.multipartUploadID = uploadID; @@ -650,6 +659,11 @@ public Builder setRetryInterval(long retryIntervalInMS) { return this; } + public Builder enableUnsafeByteBufferConversion(boolean enabled) { + this.unsafeByteBufferConversion = enabled; + return this; + } + public KeyOutputStream build() { return new KeyOutputStream(openHandler, xceiverManager, omClient, chunkSize, requestID, factor, type, @@ -657,7 +671,7 @@ public KeyOutputStream build() { streamBufferMaxSize, blockSize, watchTimeout, checksumType, bytesPerChecksum, multipartUploadID, multipartNumber, isMultipartKey, - maxRetryCount, retryInterval); + maxRetryCount, retryInterval, unsafeByteBufferConversion); } } diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java index 94fbe52498ca..c61d0eb2074d 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java @@ -132,6 +132,7 @@ public class RpcClient implements ClientProtocol { private final int chunkSize; private final ChecksumType checksumType; private final int bytesPerChecksum; + private final boolean unsafeByteBufferConversion; private boolean verifyChecksum; private final UserGroupInformation ugi; private final ACLType userRights; @@ -211,6 +212,10 @@ public RpcClient(ConfigurationSource conf, String omServiceId) blockSize = (long) conf.getStorageSize(OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE, OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE_DEFAULT, StorageUnit.BYTES); + unsafeByteBufferConversion = conf.getBoolean( + OzoneConfigKeys.OZONE_UNSAFEBYTEOPERATIONS_ENABLED, + OzoneConfigKeys.OZONE_UNSAFEBYTEOPERATIONS_ENABLED_DEFAULT); + int configuredChecksumSize = (int) conf.getStorageSize( OzoneConfigKeys.OZONE_CLIENT_BYTES_PER_CHECKSUM, OzoneConfigKeys.OZONE_CLIENT_BYTES_PER_CHECKSUM_DEFAULT, @@ -1279,6 +1284,7 @@ private OzoneOutputStream createOutputStream(OpenKeySession openKey, .setBytesPerChecksum(bytesPerChecksum) .setMaxRetryCount(maxRetryCount) .setRetryInterval(retryInterval) + .enableUnsafeByteBufferConversion(unsafeByteBufferConversion) .build(); keyOutputStream .addPreallocateBlocks(openKey.getKeyInfo().getLatestVersionLocations(), diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestReadRetries.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestReadRetries.java index 8c187f4a67c1..914845931df5 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestReadRetries.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestReadRetries.java @@ -17,51 +17,49 @@ package org.apache.hadoop.ozone.client.rpc; +import java.io.IOException; +import java.util.HashMap; +import java.util.List; +import java.util.UUID; + import org.apache.hadoop.hdds.client.ReplicationFactor; import org.apache.hadoop.hdds.client.ReplicationType; import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.hdds.scm.ScmConfigKeys; -import org.apache.hadoop.hdds.scm.XceiverClientManager; +import org.apache.hadoop.hdds.scm.XceiverClientFactory; import org.apache.hadoop.hdds.scm.XceiverClientRatis; import org.apache.hadoop.hdds.scm.XceiverClientSpi; import org.apache.hadoop.hdds.scm.container.ContainerID; import org.apache.hadoop.hdds.scm.container.ContainerInfo; import org.apache.hadoop.hdds.scm.pipeline.Pipeline; -import org.apache.hadoop.hdds.scm.protocolPB - .StorageContainerLocationProtocolClientSideTranslatorPB; +import org.apache.hadoop.hdds.scm.protocolPB.StorageContainerLocationProtocolClientSideTranslatorPB; import org.apache.hadoop.ozone.MiniOzoneCluster; -import org.apache.hadoop.ozone.client.OzoneKey; -import org.apache.hadoop.ozone.client.OzoneVolume; -import org.apache.hadoop.ozone.client.OzoneBucket; -import org.apache.hadoop.ozone.client.OzoneKeyDetails; -import org.apache.hadoop.ozone.client.OzoneKeyLocation; import org.apache.hadoop.ozone.client.ObjectStore; +import org.apache.hadoop.ozone.client.OzoneBucket; import org.apache.hadoop.ozone.client.OzoneClient; import org.apache.hadoop.ozone.client.OzoneClientFactory; +import org.apache.hadoop.ozone.client.OzoneKey; +import org.apache.hadoop.ozone.client.OzoneKeyDetails; +import org.apache.hadoop.ozone.client.OzoneKeyLocation; +import org.apache.hadoop.ozone.client.OzoneVolume; import org.apache.hadoop.ozone.client.io.KeyOutputStream; import org.apache.hadoop.ozone.client.io.OzoneInputStream; import org.apache.hadoop.ozone.client.io.OzoneOutputStream; import org.apache.hadoop.ozone.om.OzoneManager; import org.apache.hadoop.ozone.om.helpers.OmKeyArgs; import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo; -import org.junit.Rule; -import org.junit.BeforeClass; + import org.junit.AfterClass; import org.junit.Assert; +import static org.junit.Assert.fail; +import org.junit.BeforeClass; +import org.junit.Rule; import org.junit.Test; - import org.junit.rules.ExpectedException; import org.junit.rules.Timeout; -import java.io.IOException; -import java.util.HashMap; -import java.util.List; -import java.util.UUID; - -import static org.junit.Assert.fail; - /** * Test read retries from multiple nodes in the pipeline. */ @@ -147,7 +145,7 @@ public void testPutKeyAndGetKeyThreeNodes() ReplicationFactor.THREE, new HashMap<>()); KeyOutputStream groupOutputStream = (KeyOutputStream) out.getOutputStream(); - XceiverClientManager manager = groupOutputStream.getXceiverClientManager(); + XceiverClientFactory factory = groupOutputStream.getXceiverClientFactory(); out.write(value.getBytes()); out.close(); // First, confirm the key info from the client matches the info in OM. @@ -179,7 +177,7 @@ public void testPutKeyAndGetKeyThreeNodes() DatanodeDetails datanodeDetails = datanodes.get(0); Assert.assertNotNull(datanodeDetails); - XceiverClientSpi clientSpi = manager.acquireClient(pipeline); + XceiverClientSpi clientSpi = factory.acquireClient(pipeline); Assert.assertTrue(clientSpi instanceof XceiverClientRatis); XceiverClientRatis ratisClient = (XceiverClientRatis)clientSpi; @@ -207,7 +205,7 @@ public void testPutKeyAndGetKeyThreeNodes() // it should throw an ioException as none of the servers // are available } - manager.releaseClient(clientSpi, false); + factory.releaseClient(clientSpi, false); } private void readKey(OzoneBucket bucket, String keyName, String data)