From fe313a8729cfd959bff8711dde0e8e6d8083620c Mon Sep 17 00:00:00 2001 From: Shashikant Banerjee Date: Wed, 24 Apr 2019 17:12:52 +0530 Subject: [PATCH 1/7] HDDS-1395. Key write fails with BlockOutputStream has been closed exception (#749). --- .../hadoop/hdds/scm/XceiverClientRatis.java | 16 +- .../hdds/scm/client/HddsClientUtils.java | 40 +- .../hdds/scm/storage/BlockOutputStream.java | 13 +- .../hdds/scm/storage/CommitWatcher.java | 1 - .../apache/hadoop/hdds/scm/ScmConfigKeys.java | 4 +- .../src/main/resources/ozone-default.xml | 4 +- .../hadoop/ozone/client/OzoneClientUtils.java | 18 - .../client/io/BlockOutputStreamEntry.java | 7 + .../client/io/BlockOutputStreamEntryPool.java | 341 ++++++++++++ .../ozone/client/io/KeyOutputStream.java | 378 +++---------- .../TestBlockOutputStreamWithFailures.java | 356 ++++++------- .../TestCloseContainerHandlingByClient.java | 2 +- .../TestOzoneClientRetriesOnException.java | 8 +- .../ozone/client/rpc/TestWatchForCommit.java | 502 ++++++++++++++++++ .../ozone/container/ContainerTestHelper.java | 12 +- 15 files changed, 1154 insertions(+), 548 deletions(-) create mode 100644 hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntryPool.java create mode 100644 hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestWatchForCommit.java diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java index 899bba8aadc67..efd82bce7bbd5 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java @@ -29,6 +29,7 @@ import org.apache.hadoop.util.Time; import org.apache.ratis.grpc.GrpcTlsConfig; import org.apache.ratis.proto.RaftProtos; +import org.apache.ratis.protocol.GroupMismatchException; import org.apache.ratis.protocol.RaftRetryFailureException; import org.apache.ratis.retry.RetryPolicy; import org.apache.ratis.thirdparty.com.google.protobuf @@ -69,7 +70,8 @@ * The underlying RPC mechanism can be chosen via the constructor. */ public final class XceiverClientRatis extends XceiverClientSpi { - static final Logger LOG = LoggerFactory.getLogger(XceiverClientRatis.class); + public static final Logger LOG = + LoggerFactory.getLogger(XceiverClientRatis.class); public static XceiverClientRatis newXceiverClientRatis( org.apache.hadoop.hdds.scm.pipeline.Pipeline pipeline, @@ -248,13 +250,17 @@ public XceiverClientReply watchForCommit(long index, long timeout) return clientReply; } LOG.debug("commit index : {} watch timeout : {}", index, timeout); - CompletableFuture replyFuture = getClient() - .sendWatchAsync(index, RaftProtos.ReplicationLevel.ALL_COMMITTED); RaftClientReply reply; try { + CompletableFuture replyFuture = getClient() + .sendWatchAsync(index, RaftProtos.ReplicationLevel.ALL_COMMITTED); replyFuture.get(timeout, TimeUnit.MILLISECONDS); - } catch (TimeoutException toe) { - LOG.warn("3 way commit failed ", toe); + } catch (Exception e) { + Throwable t = HddsClientUtils.checkForException(e); + LOG.warn("3 way commit failed ", e); + if (t instanceof GroupMismatchException) { + throw e; + } reply = getClient() .sendWatchAsync(index, RaftProtos.ReplicationLevel.MAJORITY_COMMITTED) .get(timeout, TimeUnit.MILLISECONDS); diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/client/HddsClientUtils.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/client/HddsClientUtils.java index be9bc9320fb48..578b0161a1823 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/client/HddsClientUtils.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/client/HddsClientUtils.java @@ -28,6 +28,7 @@ import org.apache.hadoop.hdds.protocolPB.SCMSecurityProtocolClientSideTranslatorPB; import org.apache.hadoop.hdds.protocolPB.SCMSecurityProtocolPB; import org.apache.hadoop.hdds.scm.ScmConfigKeys; +import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerNotOpenException; import org.apache.hadoop.hdds.scm.protocol.ScmBlockLocationProtocol; import org.apache.hadoop.hdds.scm.protocolPB.ScmBlockLocationProtocolPB; import org.apache.hadoop.ipc.Client; @@ -40,6 +41,10 @@ import org.apache.http.client.config.RequestConfig; import org.apache.http.impl.client.CloseableHttpClient; import org.apache.http.impl.client.HttpClients; +import org.apache.ratis.protocol.AlreadyClosedException; +import org.apache.ratis.protocol.GroupMismatchException; +import org.apache.ratis.protocol.NotReplicatedException; +import org.apache.ratis.protocol.RaftRetryFailureException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -50,8 +55,10 @@ import java.time.ZoneId; import java.time.ZonedDateTime; import java.time.format.DateTimeFormatter; +import java.util.ArrayList; +import java.util.List; import java.util.concurrent.TimeUnit; - +import java.util.concurrent.TimeoutException; /** * Utility methods for Ozone and Container Clients. @@ -72,6 +79,18 @@ public final class HddsClientUtils { private HddsClientUtils() { } + private static final List> EXCEPTION_LIST = + new ArrayList>() {{ + add(TimeoutException.class); + add(ContainerNotOpenException.class); + add(RaftRetryFailureException.class); + add(AlreadyClosedException.class); + add(GroupMismatchException.class); + // Not Replicated Exception will be thrown if watch For commit + // does not succeed + add(NotReplicatedException.class); + }}; + /** * Date format that used in ozone. Here the format is thread safe to use. */ @@ -290,4 +309,23 @@ public static SCMSecurityProtocol getScmSecurityClient( Client.getRpcTimeout(conf))); return scmSecurityClient; } + + public static Throwable checkForException(Exception e) throws IOException { + Throwable t = e; + while (t != null) { + for (Class cls : getExceptionList()) { + if (cls.isInstance(t)) { + return t; + } + } + t = t.getCause(); + } + + throw e instanceof IOException ? (IOException)e : new IOException(e); + } + + + public static List> getExceptionList() { + return EXCEPTION_LIST; + } } \ No newline at end of file diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java index 139f49404cb84..5ca32630c87c4 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java @@ -80,7 +80,7 @@ public class BlockOutputStream extends OutputStream { public static final Logger LOG = LoggerFactory.getLogger(BlockOutputStream.class); - private BlockID blockID; + private volatile BlockID blockID; private final String key; private final String traceID; private final BlockData.Builder containerBlockData; @@ -574,7 +574,7 @@ public void cleanup(boolean invalidateClient) { * @throws IOException if stream is closed */ private void checkOpen() throws IOException { - if (xceiverClient == null) { + if (isClosed()) { throw new IOException("BlockOutputStream has been closed."); } else if (getIoException() != null) { adjustBuffersOnException(); @@ -582,6 +582,10 @@ private void checkOpen() throws IOException { } } + public boolean isClosed() { + return xceiverClient == null; + } + /** * Writes buffered data as a new chunk to the container and saves chunk * information to be used later in putKey call. @@ -635,4 +639,9 @@ private void writeChunkToContainer(ByteBuffer chunk) throws IOException { + " length " + effectiveChunkSize); containerBlockData.addChunks(chunkInfo); } + + @VisibleForTesting + public void setXceiverClient(XceiverClientSpi xceiverClient) { + this.xceiverClient = xceiverClient; + } } diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/CommitWatcher.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/CommitWatcher.java index aeac941af9c1d..d4606b514c461 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/CommitWatcher.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/CommitWatcher.java @@ -188,7 +188,6 @@ void releaseBuffersOnException() { */ public XceiverClientReply watchForCommit(long commitIndex) throws IOException { - Preconditions.checkState(!commitIndex2flushedDataMap.isEmpty()); long index; try { XceiverClientReply reply = diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java index b097321507739..bb076dbd54d4e 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java @@ -121,12 +121,12 @@ public final class ScmConfigKeys { TimeDuration.valueOf(3000, TimeUnit.MILLISECONDS); public static final String DFS_RATIS_CLIENT_REQUEST_MAX_RETRIES_KEY = "dfs.ratis.client.request.max.retries"; - public static final int DFS_RATIS_CLIENT_REQUEST_MAX_RETRIES_DEFAULT = 20; + public static final int DFS_RATIS_CLIENT_REQUEST_MAX_RETRIES_DEFAULT = 180; public static final String DFS_RATIS_CLIENT_REQUEST_RETRY_INTERVAL_KEY = "dfs.ratis.client.request.retry.interval"; public static final TimeDuration DFS_RATIS_CLIENT_REQUEST_RETRY_INTERVAL_DEFAULT = - TimeDuration.valueOf(500, TimeUnit.MILLISECONDS); + TimeDuration.valueOf(1000, TimeUnit.MILLISECONDS); public static final String DFS_RATIS_SERVER_RETRY_CACHE_TIMEOUT_DURATION_KEY = "dfs.ratis.server.retry-cache.timeout.duration"; public static final TimeDuration diff --git a/hadoop-hdds/common/src/main/resources/ozone-default.xml b/hadoop-hdds/common/src/main/resources/ozone-default.xml index 162c93f70d488..cee7c9189da1f 100644 --- a/hadoop-hdds/common/src/main/resources/ozone-default.xml +++ b/hadoop-hdds/common/src/main/resources/ozone-default.xml @@ -237,13 +237,13 @@ dfs.ratis.client.request.max.retries - 20 + 180 OZONE, RATIS, MANAGEMENT Number of retries for ratis client request. dfs.ratis.client.request.retry.interval - 500ms + 1000ms OZONE, RATIS, MANAGEMENT Interval between successive retries for a ratis client request. diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/OzoneClientUtils.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/OzoneClientUtils.java index 9e9bb39796a4d..be88fbede0137 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/OzoneClientUtils.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/OzoneClientUtils.java @@ -19,33 +19,19 @@ import org.apache.hadoop.hdds.client.OzoneQuota; import org.apache.hadoop.hdds.scm.client.HddsClientUtils; -import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerNotOpenException; import org.apache.hadoop.io.retry.RetryPolicies; import org.apache.hadoop.io.retry.RetryPolicy; import org.apache.hadoop.ozone.OzoneConsts; import org.apache.hadoop.ozone.client.rest.response.*; -import org.apache.ratis.protocol.AlreadyClosedException; -import org.apache.ratis.protocol.GroupMismatchException; -import org.apache.ratis.protocol.RaftRetryFailureException; import java.util.ArrayList; import java.util.List; import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; /** A utility class for OzoneClient. */ public final class OzoneClientUtils { private OzoneClientUtils() {} - - private static final List> EXCEPTION_LIST = - new ArrayList>() {{ - add(TimeoutException.class); - add(ContainerNotOpenException.class); - add(RaftRetryFailureException.class); - add(AlreadyClosedException.class); - add(GroupMismatchException.class); - }}; /** * Returns a BucketInfo object constructed using fields of the input * OzoneBucket object. @@ -134,8 +120,4 @@ public static RetryPolicy createRetryPolicy(int maxRetryCount) { TimeUnit.MILLISECONDS); return retryPolicy; } - - public static List> getExceptionList() { - return EXCEPTION_LIST; - } } diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntry.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntry.java index fb700da001cc3..e11eab90e48c2 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntry.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntry.java @@ -149,6 +149,13 @@ public void close() throws IOException { } } + boolean isClosed() { + if (outputStream != null) { + return ((BlockOutputStream) outputStream).isClosed(); + } + return false; + } + long getTotalAckDataLength() { if (outputStream != null) { BlockOutputStream out = (BlockOutputStream) this.outputStream; diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntryPool.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntryPool.java new file mode 100644 index 0000000000000..622293a3058c5 --- /dev/null +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntryPool.java @@ -0,0 +1,341 @@ + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.ozone.client.io; + +import com.google.common.base.Preconditions; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos; +import org.apache.hadoop.hdds.scm.XceiverClientManager; +import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList; +import org.apache.hadoop.hdds.scm.pipeline.PipelineID; +import org.apache.hadoop.hdds.scm.storage.BufferPool; +import org.apache.hadoop.ozone.OzoneConfigKeys; +import org.apache.hadoop.ozone.om.helpers.*; +import org.apache.hadoop.ozone.om.protocol.OzoneManagerProtocol; +import org.apache.hadoop.security.UserGroupInformation; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.ListIterator; + +/** + * This class manages the stream entries list and handles block allocation + * from OzoneManager. + */ +public class BlockOutputStreamEntryPool { + + public static final Logger LOG = + LoggerFactory.getLogger(BlockOutputStreamEntryPool.class); + + private final List streamEntries; + private int currentStreamIndex; + private final OzoneManagerProtocol omClient; + private final OmKeyArgs keyArgs; + private final XceiverClientManager xceiverClientManager; + private final int chunkSize; + private final String requestID; + private final long streamBufferFlushSize; + private final long streamBufferMaxSize; + private final long watchTimeout; + private final long blockSize; + private final int bytesPerChecksum; + private final ContainerProtos.ChecksumType checksumType; + private final BufferPool bufferPool; + private OmMultipartCommitUploadPartInfo commitUploadPartInfo; + private final long openID; + private ExcludeList excludeList; + + @SuppressWarnings("parameternumber") + public BlockOutputStreamEntryPool(OzoneManagerProtocol omClient, + int chunkSize, String requestId, HddsProtos.ReplicationFactor factor, + HddsProtos.ReplicationType type, long bufferFlushSize, long bufferMaxSize, + long size, long watchTimeout, ContainerProtos.ChecksumType checksumType, + int bytesPerChecksum, String uploadID, int partNumber, + boolean isMultipart, OmKeyInfo info, + XceiverClientManager xceiverClientManager, long openID) { + streamEntries = new ArrayList<>(); + currentStreamIndex = 0; + this.omClient = omClient; + this.keyArgs = new OmKeyArgs.Builder().setVolumeName(info.getVolumeName()) + .setBucketName(info.getBucketName()).setKeyName(info.getKeyName()) + .setType(type).setFactor(factor).setDataSize(info.getDataSize()) + .setIsMultipartKey(isMultipart).setMultipartUploadID(uploadID) + .setMultipartUploadPartNumber(partNumber).build(); + this.xceiverClientManager = xceiverClientManager; + this.chunkSize = chunkSize; + this.requestID = requestId; + this.streamBufferFlushSize = bufferFlushSize; + this.streamBufferMaxSize = bufferMaxSize; + this.blockSize = size; + this.watchTimeout = watchTimeout; + this.bytesPerChecksum = bytesPerChecksum; + this.checksumType = checksumType; + this.openID = openID; + this.excludeList = new ExcludeList(); + + Preconditions.checkState(chunkSize > 0); + Preconditions.checkState(streamBufferFlushSize > 0); + Preconditions.checkState(streamBufferMaxSize > 0); + Preconditions.checkState(blockSize > 0); + Preconditions.checkState(streamBufferFlushSize % chunkSize == 0); + Preconditions.checkState(streamBufferMaxSize % streamBufferFlushSize == 0); + Preconditions.checkState(blockSize % streamBufferMaxSize == 0); + this.bufferPool = + new BufferPool(chunkSize, (int) streamBufferMaxSize / chunkSize); + } + + public BlockOutputStreamEntryPool() { + streamEntries = new ArrayList<>(); + omClient = null; + keyArgs = null; + xceiverClientManager = null; + chunkSize = 0; + requestID = null; + streamBufferFlushSize = 0; + streamBufferMaxSize = 0; + bufferPool = new BufferPool(chunkSize, 1); + watchTimeout = 0; + blockSize = 0; + this.checksumType = ContainerProtos.ChecksumType.valueOf( + OzoneConfigKeys.OZONE_CLIENT_CHECKSUM_TYPE_DEFAULT); + this.bytesPerChecksum = OzoneConfigKeys + .OZONE_CLIENT_BYTES_PER_CHECKSUM_DEFAULT_BYTES; // Default is 1MB + currentStreamIndex = 0; + openID = -1; + } + + /** + * When a key is opened, it is possible that there are some blocks already + * allocated to it for this open session. In this case, to make use of these + * blocks, we need to add these blocks to stream entries. But, a key's version + * also includes blocks from previous versions, we need to avoid adding these + * old blocks to stream entries, because these old blocks should not be picked + * for write. To do this, the following method checks that, only those + * blocks created in this particular open version are added to stream entries. + * + * @param version the set of blocks that are pre-allocated. + * @param openVersion the version corresponding to the pre-allocation. + * @throws IOException + */ + public void addPreallocateBlocks(OmKeyLocationInfoGroup version, + long openVersion) throws IOException { + // server may return any number of blocks, (0 to any) + // only the blocks allocated in this open session (block createVersion + // equals to open session version) + for (OmKeyLocationInfo subKeyInfo : version.getLocationList()) { + if (subKeyInfo.getCreateVersion() == openVersion) { + addKeyLocationInfo(subKeyInfo); + } + } + } + + private void addKeyLocationInfo(OmKeyLocationInfo subKeyInfo) + throws IOException { + Preconditions.checkNotNull(subKeyInfo.getPipeline()); + UserGroupInformation.getCurrentUser().addToken(subKeyInfo.getToken()); + BlockOutputStreamEntry.Builder builder = + new BlockOutputStreamEntry.Builder() + .setBlockID(subKeyInfo.getBlockID()) + .setKey(keyArgs.getKeyName()) + .setXceiverClientManager(xceiverClientManager) + .setPipeline(subKeyInfo.getPipeline()) + .setRequestId(requestID) + .setChunkSize(chunkSize) + .setLength(subKeyInfo.getLength()) + .setStreamBufferFlushSize(streamBufferFlushSize) + .setStreamBufferMaxSize(streamBufferMaxSize) + .setWatchTimeout(watchTimeout) + .setbufferPool(bufferPool) + .setChecksumType(checksumType) + .setBytesPerChecksum(bytesPerChecksum) + .setToken(subKeyInfo.getToken()); + streamEntries.add(builder.build()); + } + + public List getLocationInfoList() { + List locationInfoList = new ArrayList<>(); + for (BlockOutputStreamEntry streamEntry : streamEntries) { + OmKeyLocationInfo info = + new OmKeyLocationInfo.Builder().setBlockID(streamEntry.getBlockID()) + .setLength(streamEntry.getCurrentPosition()).setOffset(0) + .setToken(streamEntry.getToken()) + .setPipeline(streamEntry.getPipeline()) + .build(); + LOG.debug("block written " + streamEntry.getBlockID() + ", length " + + streamEntry.getCurrentPosition() + " bcsID " + + streamEntry.getBlockID().getBlockCommitSequenceId()); + if (streamEntry.getCurrentPosition() != 0) { + locationInfoList.add(info); + } + } + return locationInfoList; + } + + /** + * Discards the subsequent pre allocated blocks and removes the streamEntries + * from the streamEntries list for the container which is closed. + * @param containerID id of the closed container + * @param pipelineId id of the associated pipeline + */ + void discardPreallocatedBlocks(long containerID, PipelineID pipelineId) { + // currentStreamIndex < streamEntries.size() signifies that, there are still + // pre allocated blocks available. + + // This will be called only to discard the next subsequent unused blocks + // in the streamEntryList. + if (currentStreamIndex + 1 < streamEntries.size()) { + ListIterator streamEntryIterator = + streamEntries.listIterator(currentStreamIndex + 1); + while (streamEntryIterator.hasNext()) { + BlockOutputStreamEntry streamEntry = streamEntryIterator.next(); + Preconditions.checkArgument(streamEntry.getCurrentPosition() == 0); + if ((pipelineId != null && streamEntry.getPipeline().getId() + .equals(pipelineId)) || (containerID != -1 + && streamEntry.getBlockID().getContainerID() == containerID)) { + streamEntryIterator.remove(); + } + } + } + } + + List getStreamEntries() { + return streamEntries; + } + + XceiverClientManager getXceiverClientManager() { + return xceiverClientManager; + } + + String getKeyName() { + return keyArgs.getKeyName(); + } + + long getKeyLength() { + return streamEntries.stream().mapToLong(e -> e.getCurrentPosition()).sum(); + } + /** + * Contact OM to get a new block. Set the new block with the index (e.g. + * first block has index = 0, second has index = 1 etc.) + * + * The returned block is made to new BlockOutputStreamEntry to write. + * + * @throws IOException + */ + private void allocateNewBlock() throws IOException { + OmKeyLocationInfo subKeyInfo = + omClient.allocateBlock(keyArgs, openID, excludeList); + addKeyLocationInfo(subKeyInfo); + } + + + void commitKey(long offset) throws IOException { + if (keyArgs != null) { + // in test, this could be null + long length = getKeyLength(); + Preconditions.checkArgument(offset == length); + keyArgs.setDataSize(length); + keyArgs.setLocationInfoList(getLocationInfoList()); + // When the key is multipart upload part file upload, we should not + // commit the key, as this is not an actual key, this is a just a + // partial key of a large file. + if (keyArgs.getIsMultipartKey()) { + commitUploadPartInfo = + omClient.commitMultipartUploadPart(keyArgs, openID); + } else { + omClient.commitKey(keyArgs, openID); + } + } else { + LOG.warn("Closing KeyOutputStream, but key args is null"); + } + } + + public BlockOutputStreamEntry getCurrentStreamEntry() { + if (streamEntries.isEmpty() || streamEntries.size() <= currentStreamIndex) { + return null; + } else { + return streamEntries.get(currentStreamIndex); + } + } + + BlockOutputStreamEntry allocateBlockIfNeeded() throws IOException { + BlockOutputStreamEntry streamEntry = getCurrentStreamEntry(); + if (streamEntry != null && streamEntry.isClosed()) { + // a stream entry gets closed either by : + // a. If the stream gets full + // b. it has encountered an exception + currentStreamIndex++; + } + if (streamEntries.size() <= currentStreamIndex) { + Preconditions.checkNotNull(omClient); + // allocate a new block, if a exception happens, log an error and + // throw exception to the caller directly, and the write fails. + int succeededAllocates = 0; + try { + allocateNewBlock(); + succeededAllocates += 1; + } catch (IOException ioe) { + LOG.error("Try to allocate more blocks for write failed, already " + + "allocated " + succeededAllocates + " blocks for this write."); + throw ioe; + } + } + // in theory, this condition should never violate due the check above + // still do a sanity check. + Preconditions.checkArgument(currentStreamIndex < streamEntries.size()); + BlockOutputStreamEntry current = streamEntries.get(currentStreamIndex); + return current; + } + + long computeBufferData() { + return bufferPool.computeBufferData(); + } + + void cleanup() { + if (excludeList != null) { + excludeList.clear(); + excludeList = null; + } + if (bufferPool != null) { + bufferPool.clearBufferPool(); + } + + if (streamEntries != null) { + streamEntries.clear(); + } + } + + public OmMultipartCommitUploadPartInfo getCommitUploadPartInfo() { + return commitUploadPartInfo; + } + + public ExcludeList getExcludeList() { + return excludeList; + } + + public long getStreamBufferMaxSize() { + return streamBufferMaxSize; + } + + boolean isEmpty() { + return streamEntries.isEmpty(); + } +} \ No newline at end of file diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java index b12ef51b9fdda..033693605ed39 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java @@ -22,12 +22,10 @@ import org.apache.hadoop.fs.FSExceptionMessages; import org.apache.hadoop.fs.FileEncryptionInfo; import org.apache.hadoop.hdds.protocol.DatanodeDetails; -import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos - .ChecksumType; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChecksumType; +import org.apache.hadoop.hdds.scm.client.HddsClientUtils; import org.apache.hadoop.hdds.scm.container.ContainerID; import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerNotOpenException; -import org.apache.hadoop.hdds.scm.storage.BufferPool; -import org.apache.hadoop.ozone.OzoneConfigKeys; import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList; import org.apache.hadoop.hdds.scm.pipeline.PipelineID; import org.apache.hadoop.io.retry.RetryPolicies; @@ -38,7 +36,6 @@ import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor; import org.apache.hadoop.ozone.om.protocol.OzoneManagerProtocol; import org.apache.hadoop.hdds.scm.XceiverClientManager; -import org.apache.hadoop.security.UserGroupInformation; import org.apache.ratis.protocol.AlreadyClosedException; import org.apache.ratis.protocol.GroupMismatchException; import org.apache.ratis.protocol.RaftRetryFailureException; @@ -48,10 +45,8 @@ import java.io.IOException; import java.io.InterruptedIOException; import java.io.OutputStream; -import java.util.ArrayList; import java.util.List; import java.util.Collection; -import java.util.ListIterator; import java.util.concurrent.TimeoutException; /** @@ -75,81 +70,38 @@ enum StreamAction { public static final Logger LOG = LoggerFactory.getLogger(KeyOutputStream.class); - // array list's get(index) is O(1) - private final ArrayList streamEntries; - private int currentStreamIndex; - private final OzoneManagerProtocol omClient; - private final OmKeyArgs keyArgs; - private final long openID; - private final XceiverClientManager xceiverClientManager; - private final int chunkSize; - private final String requestID; private boolean closed; - private final long streamBufferFlushSize; - private final long streamBufferMaxSize; - private final long watchTimeout; - private final long blockSize; - private final int bytesPerChecksum; - private final ChecksumType checksumType; - private final BufferPool bufferPool; - private OmMultipartCommitUploadPartInfo commitUploadPartInfo; private FileEncryptionInfo feInfo; - private ExcludeList excludeList; private final RetryPolicy retryPolicy; private int retryCount; private long offset; + private final BlockOutputStreamEntryPool blockOutputStreamEntryPool; + /** * A constructor for testing purpose only. */ @VisibleForTesting - @SuppressWarnings("parameternumber") public KeyOutputStream() { - streamEntries = new ArrayList<>(); - omClient = null; - keyArgs = null; - openID = -1; - xceiverClientManager = null; - chunkSize = 0; - requestID = null; closed = false; - streamBufferFlushSize = 0; - streamBufferMaxSize = 0; - bufferPool = new BufferPool(chunkSize, 1); - watchTimeout = 0; - blockSize = 0; - this.checksumType = ChecksumType.valueOf( - OzoneConfigKeys.OZONE_CLIENT_CHECKSUM_TYPE_DEFAULT); - this.bytesPerChecksum = OzoneConfigKeys - .OZONE_CLIENT_BYTES_PER_CHECKSUM_DEFAULT_BYTES; // Default is 1MB this.retryPolicy = RetryPolicies.TRY_ONCE_THEN_FAIL; retryCount = 0; offset = 0; + blockOutputStreamEntryPool = new BlockOutputStreamEntryPool(); } @VisibleForTesting public List getStreamEntries() { - return streamEntries; + return blockOutputStreamEntryPool.getStreamEntries(); } + @VisibleForTesting public XceiverClientManager getXceiverClientManager() { - return xceiverClientManager; + return blockOutputStreamEntryPool.getXceiverClientManager(); } - public List getLocationInfoList() throws IOException { - List locationInfoList = new ArrayList<>(); - for (BlockOutputStreamEntry streamEntry : streamEntries) { - OmKeyLocationInfo info = - new OmKeyLocationInfo.Builder().setBlockID(streamEntry.getBlockID()) - .setLength(streamEntry.getCurrentPosition()).setOffset(0) - .setToken(streamEntry.getToken()) - .setPipeline(streamEntry.getPipeline()) - .build(); - LOG.debug("block written " + streamEntry.getBlockID() + ", length " - + streamEntry.getCurrentPosition() + " bcsID " - + streamEntry.getBlockID().getBlockCommitSequenceId()); - locationInfoList.add(info); - } - return locationInfoList; + @VisibleForTesting + public List getLocationInfoList() { + return blockOutputStreamEntryPool.getLocationInfoList(); } @VisibleForTesting @@ -159,46 +111,21 @@ public int getRetryCount() { @SuppressWarnings("parameternumber") public KeyOutputStream(OpenKeySession handler, - XceiverClientManager xceiverClientManager, - OzoneManagerProtocol omClient, int chunkSize, - String requestId, ReplicationFactor factor, ReplicationType type, - long bufferFlushSize, long bufferMaxSize, long size, long watchTimeout, - ChecksumType checksumType, int bytesPerChecksum, + XceiverClientManager xceiverClientManager, OzoneManagerProtocol omClient, + int chunkSize, String requestId, ReplicationFactor factor, + ReplicationType type, long bufferFlushSize, long bufferMaxSize, long size, + long watchTimeout, ChecksumType checksumType, int bytesPerChecksum, String uploadID, int partNumber, boolean isMultipart, int maxRetryCount) { - this.streamEntries = new ArrayList<>(); - this.currentStreamIndex = 0; - this.omClient = omClient; OmKeyInfo info = handler.getKeyInfo(); + blockOutputStreamEntryPool = + new BlockOutputStreamEntryPool(omClient, chunkSize, requestId, factor, + type, bufferFlushSize, bufferMaxSize, size, watchTimeout, + checksumType, bytesPerChecksum, uploadID, partNumber, isMultipart, + info, xceiverClientManager, handler.getId()); // Retrieve the file encryption key info, null if file is not in // encrypted bucket. this.feInfo = info.getFileEncryptionInfo(); - this.keyArgs = new OmKeyArgs.Builder().setVolumeName(info.getVolumeName()) - .setBucketName(info.getBucketName()).setKeyName(info.getKeyName()) - .setType(type).setFactor(factor).setDataSize(info.getDataSize()) - .setIsMultipartKey(isMultipart).setMultipartUploadID( - uploadID).setMultipartUploadPartNumber(partNumber) - .build(); - this.openID = handler.getId(); - this.xceiverClientManager = xceiverClientManager; - this.chunkSize = chunkSize; - this.requestID = requestId; - this.streamBufferFlushSize = bufferFlushSize; - this.streamBufferMaxSize = bufferMaxSize; - this.blockSize = size; - this.watchTimeout = watchTimeout; - this.bytesPerChecksum = bytesPerChecksum; - this.checksumType = checksumType; - Preconditions.checkState(chunkSize > 0); - Preconditions.checkState(streamBufferFlushSize > 0); - Preconditions.checkState(streamBufferMaxSize > 0); - Preconditions.checkState(blockSize > 0); - Preconditions.checkState(streamBufferFlushSize % chunkSize == 0); - Preconditions.checkState(streamBufferMaxSize % streamBufferFlushSize == 0); - Preconditions.checkState(blockSize % streamBufferMaxSize == 0); - this.bufferPool = - new BufferPool(chunkSize, (int)streamBufferMaxSize / chunkSize); - this.excludeList = new ExcludeList(); this.retryPolicy = OzoneClientUtils.createRetryPolicy(maxRetryCount); this.retryCount = 0; } @@ -218,37 +145,7 @@ public KeyOutputStream(OpenKeySession handler, */ public void addPreallocateBlocks(OmKeyLocationInfoGroup version, long openVersion) throws IOException { - // server may return any number of blocks, (0 to any) - // only the blocks allocated in this open session (block createVersion - // equals to open session version) - for (OmKeyLocationInfo subKeyInfo : version.getLocationList()) { - if (subKeyInfo.getCreateVersion() == openVersion) { - addKeyLocationInfo(subKeyInfo); - } - } - } - - private void addKeyLocationInfo(OmKeyLocationInfo subKeyInfo) - throws IOException { - Preconditions.checkNotNull(subKeyInfo.getPipeline()); - UserGroupInformation.getCurrentUser().addToken(subKeyInfo.getToken()); - BlockOutputStreamEntry.Builder builder = - new BlockOutputStreamEntry.Builder() - .setBlockID(subKeyInfo.getBlockID()) - .setKey(keyArgs.getKeyName()) - .setXceiverClientManager(xceiverClientManager) - .setPipeline(subKeyInfo.getPipeline()) - .setRequestId(requestID) - .setChunkSize(chunkSize) - .setLength(subKeyInfo.getLength()) - .setStreamBufferFlushSize(streamBufferFlushSize) - .setStreamBufferMaxSize(streamBufferMaxSize) - .setWatchTimeout(watchTimeout) - .setbufferPool(bufferPool) - .setChecksumType(checksumType) - .setBytesPerChecksum(bytesPerChecksum) - .setToken(subKeyInfo.getToken()); - streamEntries.add(builder.build()); + blockOutputStreamEntryPool.addPreallocateBlocks(version, openVersion); } @Override @@ -287,34 +184,12 @@ public void write(byte[] b, int off, int len) handleWrite(b, off, len, false); } - private long computeBufferData() { - return bufferPool.computeBufferData(); - } - private void handleWrite(byte[] b, int off, long len, boolean retry) throws IOException { - int succeededAllocates = 0; while (len > 0) { try { - if (streamEntries.size() <= currentStreamIndex) { - Preconditions.checkNotNull(omClient); - // allocate a new block, if a exception happens, log an error and - // throw exception to the caller directly, and the write fails. - try { - allocateNewBlock(currentStreamIndex); - succeededAllocates += 1; - } catch (IOException ioe) { - LOG.error("Try to allocate more blocks for write failed, already " - + "allocated " + succeededAllocates - + " blocks for this write."); - throw ioe; - } - } - // in theory, this condition should never violate due the check above - // still do a sanity check. - Preconditions.checkArgument(currentStreamIndex < streamEntries.size()); - BlockOutputStreamEntry current = streamEntries.get(currentStreamIndex); - + BlockOutputStreamEntry current = + blockOutputStreamEntryPool.allocateBlockIfNeeded(); // length(len) will be in int range if the call is happening through // write API of blockOutputStream. Length can be in long range if it // comes via Exception path. @@ -335,7 +210,8 @@ private void handleWrite(byte[] b, int off, long len, boolean retry) // to or less than the max length of the buffer allocated. // The len specified here is the combined sum of the data length of // the buffers - Preconditions.checkState(!retry || len <= streamBufferMaxSize); + Preconditions.checkState(!retry || len <= blockOutputStreamEntryPool + .getStreamBufferMaxSize()); int dataWritten = (int) (current.getWrittenDataLength() - currentPos); writeLen = retry ? (int) len : dataWritten; // In retry path, the data written is already accounted in offset. @@ -343,7 +219,7 @@ private void handleWrite(byte[] b, int off, long len, boolean retry) offset += writeLen; } LOG.debug("writeLen {}, total len {}", writeLen, len); - handleException(current, currentStreamIndex, ioe); + handleException(current, ioe); } if (current.getRemaining() <= 0) { // since the current block is already written close the stream. @@ -358,80 +234,19 @@ private void handleWrite(byte[] b, int off, long len, boolean retry) } } - /** - * Discards the subsequent pre allocated blocks and removes the streamEntries - * from the streamEntries list for the container which is closed. - * @param containerID id of the closed container - * @param pipelineId id of the associated pipeline - * @param streamIndex index of the stream - */ - private void discardPreallocatedBlocks(long containerID, - PipelineID pipelineId, int streamIndex) { - // streamIndex < streamEntries.size() signifies that, there are still - // pre allocated blocks available. - - // This will be called only to discard the next subsequent unused blocks - // in the streamEntryList. - if (streamIndex < streamEntries.size()) { - ListIterator streamEntryIterator = - streamEntries.listIterator(streamIndex); - while (streamEntryIterator.hasNext()) { - BlockOutputStreamEntry streamEntry = streamEntryIterator.next(); - Preconditions.checkArgument(streamEntry.getCurrentPosition() == 0); - if (((pipelineId != null && streamEntry.getPipeline().getId() - .equals(pipelineId)) || (containerID != -1 - && streamEntry.getBlockID().getContainerID() == containerID))) { - streamEntryIterator.remove(); - } - } - } - } - - /** - * It might be possible that the blocks pre allocated might never get written - * while the stream gets closed normally. In such cases, it would be a good - * idea to trim down the locationInfoList by removing the unused blocks if any - * so as only the used block info gets updated on OzoneManager during close. - */ - private void removeEmptyBlocks() { - if (currentStreamIndex < streamEntries.size()) { - ListIterator streamEntryIterator = - streamEntries.listIterator(currentStreamIndex); - while (streamEntryIterator.hasNext()) { - if (streamEntryIterator.next().getCurrentPosition() == 0) { - streamEntryIterator.remove(); - } - } - } - } - - private void cleanup() { - if (excludeList != null) { - excludeList.clear(); - excludeList = null; - } - if (bufferPool != null) { - bufferPool.clearBufferPool(); - } - - if (streamEntries != null) { - streamEntries.clear(); - } - } /** * It performs following actions : * a. Updates the committed length at datanode for the current stream in - * datanode. + * datanode. * b. Reads the data from the underlying buffer and writes it the next stream. * * @param streamEntry StreamEntry - * @param streamIndex Index of the entry - * @param exception actual exception that occurred + * @param exception actual exception that occurred * @throws IOException Throws IOException if Write fails */ private void handleException(BlockOutputStreamEntry streamEntry, - int streamIndex, IOException exception) throws IOException { - Throwable t = checkForException(exception); + IOException exception) throws IOException { + Throwable t = HddsClientUtils.checkForException(exception); boolean retryFailure = checkForRetryFailure(t); boolean closedContainerException = false; if (!retryFailure) { @@ -441,15 +256,19 @@ private void handleException(BlockOutputStreamEntry streamEntry, long totalSuccessfulFlushedData = streamEntry.getTotalAckDataLength(); //set the correct length for the current stream streamEntry.setCurrentPosition(totalSuccessfulFlushedData); - long bufferedDataLen = computeBufferData(); - LOG.warn("Encountered exception {}. The last committed block length is {}, " + long bufferedDataLen = blockOutputStreamEntryPool.computeBufferData(); + LOG.debug( + "Encountered exception {}. The last committed block length is {}, " + "uncommitted data length is {} retry count {}", exception, totalSuccessfulFlushedData, bufferedDataLen, retryCount); - Preconditions.checkArgument(bufferedDataLen <= streamBufferMaxSize); - Preconditions.checkArgument(offset - getKeyLength() == bufferedDataLen); + Preconditions.checkArgument( + bufferedDataLen <= blockOutputStreamEntryPool.getStreamBufferMaxSize()); + Preconditions.checkArgument( + offset - blockOutputStreamEntryPool.getKeyLength() == bufferedDataLen); long containerId = streamEntry.getBlockID().getContainerID(); Collection failedServers = streamEntry.getFailedServers(); Preconditions.checkNotNull(failedServers); + ExcludeList excludeList = blockOutputStreamEntryPool.getExcludeList(); if (!failedServers.isEmpty()) { excludeList.addDatanodes(failedServers); } @@ -463,47 +282,43 @@ private void handleException(BlockOutputStreamEntry streamEntry, // just clean up the current stream. streamEntry.cleanup(retryFailure); - // discard all sunsequent blocks the containers and pipelines which + // discard all subsequent blocks the containers and pipelines which // are in the exclude list so that, the very next retry should never // write data on the closed container/pipeline if (closedContainerException) { // discard subsequent pre allocated blocks from the streamEntries list // from the closed container - discardPreallocatedBlocks(streamEntry.getBlockID().getContainerID(), null, - streamIndex + 1); + blockOutputStreamEntryPool + .discardPreallocatedBlocks(streamEntry.getBlockID().getContainerID(), + null); } else { // In case there is timeoutException or Watch for commit happening over // majority or the client connection failure to the leader in the - // pipeline, just discard all the preallocated blocks on this pipeline. + // pipeline, just discard all the pre allocated blocks on this pipeline. // Next block allocation will happen with excluding this specific pipeline // This will ensure if 2 way commit happens , it cannot span over multiple // blocks - discardPreallocatedBlocks(-1, pipelineId, streamIndex + 1); + blockOutputStreamEntryPool + .discardPreallocatedBlocks(-1, pipelineId); } if (bufferedDataLen > 0) { // If the data is still cached in the underlying stream, we need to // allocate new block and write this data in the datanode. - currentStreamIndex += 1; handleRetry(exception, bufferedDataLen); // reset the retryCount after handling the exception retryCount = 0; } - if (totalSuccessfulFlushedData == 0) { - streamEntries.remove(streamIndex); - currentStreamIndex -= 1; - } } private void markStreamClosed() { - cleanup(); + blockOutputStreamEntryPool.cleanup(); closed = true; } private void handleRetry(IOException exception, long len) throws IOException { RetryPolicy.RetryAction action; try { - action = retryPolicy - .shouldRetry(exception, retryCount, 0, true); + action = retryPolicy.shouldRetry(exception, retryCount, 0, true); } catch (Exception e) { throw e instanceof IOException ? (IOException) e : new IOException(e); } @@ -533,10 +348,11 @@ private void handleRetry(IOException exception, long len) throws IOException { } } retryCount++; - LOG.trace("Retrying Write request. Already tried " - + retryCount + " time(s); retry policy is " + retryPolicy); + LOG.trace("Retrying Write request. Already tried " + retryCount + + " time(s); retry policy is " + retryPolicy); handleWrite(null, 0, len, true); } + /** * Checks if the provided exception signifies retry failure in ratis client. * In case of retry failure, ratis client throws RaftRetryFailureException @@ -551,40 +367,6 @@ private boolean checkIfContainerIsClosed(Throwable t) { return t instanceof ContainerNotOpenException; } - public Throwable checkForException(IOException ioe) throws IOException { - Throwable t = ioe.getCause(); - while (t != null) { - for (Class cls : OzoneClientUtils - .getExceptionList()) { - if (cls.isInstance(t)) { - return t; - } - } - t = t.getCause(); - } - throw ioe; - } - - private long getKeyLength() { - return streamEntries.stream().mapToLong(e -> e.getCurrentPosition()) - .sum(); - } - - /** - * Contact OM to get a new block. Set the new block with the index (e.g. - * first block has index = 0, second has index = 1 etc.) - * - * The returned block is made to new BlockOutputStreamEntry to write. - * - * @param index the index of the block. - * @throws IOException - */ - private void allocateNewBlock(int index) throws IOException { - OmKeyLocationInfo subKeyInfo = - omClient.allocateBlock(keyArgs, openID, excludeList); - addKeyLocationInfo(subKeyInfo); - } - @Override public void flush() throws IOException { checkNotClosed(); @@ -601,20 +383,19 @@ public void flush() throws IOException { * written to new stream , it will be at max half full. In such cases, we * should just write the data and not close the stream as the block won't be * completely full. + * * @param op Flag which decides whether to call close or flush on the - * outputStream. + * outputStream. * @throws IOException In case, flush or close fails with exception. */ private void handleFlushOrClose(StreamAction op) throws IOException { - if (streamEntries.size() == 0) { + if (blockOutputStreamEntryPool.isEmpty()) { return; } while (true) { try { - int size = streamEntries.size(); - int streamIndex = - currentStreamIndex >= size ? size - 1 : currentStreamIndex; - BlockOutputStreamEntry entry = streamEntries.get(streamIndex); + BlockOutputStreamEntry entry = + blockOutputStreamEntryPool.getCurrentStreamEntry(); if (entry != null) { try { Collection failedServers = @@ -622,7 +403,8 @@ private void handleFlushOrClose(StreamAction op) throws IOException { // failed servers can be null in case there is no data written in // the stream if (failedServers != null && !failedServers.isEmpty()) { - excludeList.addDatanodes(failedServers); + blockOutputStreamEntryPool.getExcludeList() + .addDatanodes(failedServers); } switch (op) { case CLOSE: @@ -631,7 +413,6 @@ private void handleFlushOrClose(StreamAction op) throws IOException { case FULL: if (entry.getRemaining() == 0) { entry.close(); - currentStreamIndex++; } break; case FLUSH: @@ -641,7 +422,7 @@ private void handleFlushOrClose(StreamAction op) throws IOException { throw new IOException("Invalid Operation"); } } catch (IOException ioe) { - handleException(entry, streamIndex, ioe); + handleException(entry, ioe); continue; } } @@ -666,34 +447,16 @@ public void close() throws IOException { closed = true; try { handleFlushOrClose(StreamAction.CLOSE); - if (keyArgs != null) { - // in test, this could be null - removeEmptyBlocks(); - long length = getKeyLength(); - Preconditions.checkArgument(offset == length); - keyArgs.setDataSize(length); - keyArgs.setLocationInfoList(getLocationInfoList()); - // When the key is multipart upload part file upload, we should not - // commit the key, as this is not an actual key, this is a just a - // partial key of a large file. - if (keyArgs.getIsMultipartKey()) { - commitUploadPartInfo = omClient.commitMultipartUploadPart(keyArgs, - openID); - } else { - omClient.commitKey(keyArgs, openID); - } - } else { - LOG.warn("Closing KeyOutputStream, but key args is null"); - } + blockOutputStreamEntryPool.commitKey(offset); } catch (IOException ioe) { throw ioe; } finally { - cleanup(); + blockOutputStreamEntryPool.cleanup(); } } public OmMultipartCommitUploadPartInfo getCommitUploadPartInfo() { - return commitUploadPartInfo; + return blockOutputStreamEntryPool.getCommitUploadPartInfo(); } public FileEncryptionInfo getFileEncryptionInfo() { @@ -702,7 +465,7 @@ public FileEncryptionInfo getFileEncryptionInfo() { @VisibleForTesting public ExcludeList getExcludeList() { - return excludeList; + return blockOutputStreamEntryPool.getExcludeList(); } /** @@ -727,7 +490,6 @@ public static class Builder { private boolean isMultipartKey; private int maxRetryCount; - public Builder setMultipartUploadID(String uploadID) { this.multipartUploadID = uploadID; return this; @@ -748,8 +510,7 @@ public Builder setXceiverClientManager(XceiverClientManager manager) { return this; } - public Builder setOmClient( - OzoneManagerProtocol client) { + public Builder setOmClient(OzoneManagerProtocol client) { this.omClient = client; return this; } @@ -794,12 +555,12 @@ public Builder setWatchTimeout(long timeout) { return this; } - public Builder setChecksumType(ChecksumType cType){ + public Builder setChecksumType(ChecksumType cType) { this.checksumType = cType; return this; } - public Builder setBytesPerChecksum(int bytes){ + public Builder setBytesPerChecksum(int bytes) { this.bytesPerChecksum = bytes; return this; } @@ -815,8 +576,8 @@ public Builder setMaxRetryCount(int maxCount) { } public KeyOutputStream build() throws IOException { - return new KeyOutputStream(openHandler, xceiverManager, - omClient, chunkSize, requestID, factor, type, streamBufferFlushSize, + return new KeyOutputStream(openHandler, xceiverManager, omClient, + chunkSize, requestID, factor, type, streamBufferFlushSize, streamBufferMaxSize, blockSize, watchTimeout, checksumType, bytesPerChecksum, multipartUploadID, multipartNumber, isMultipartKey, maxRetryCount); @@ -826,13 +587,14 @@ public KeyOutputStream build() throws IOException { /** * Verify that the output stream is open. Non blocking; this gives * the last state of the volatile {@link #closed} field. + * * @throws IOException if the connection is closed. */ private void checkNotClosed() throws IOException { if (closed) { throw new IOException( - ": " + FSExceptionMessages.STREAM_IS_CLOSED + " Key: " + keyArgs - .getKeyName()); + ": " + FSExceptionMessages.STREAM_IS_CLOSED + " Key: " + + blockOutputStreamEntryPool.getKeyName()); } } } diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStreamWithFailures.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStreamWithFailures.java index 89a2af966a1b9..dfccb98ea0215 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStreamWithFailures.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStreamWithFailures.java @@ -24,8 +24,8 @@ import org.apache.hadoop.hdds.scm.XceiverClientManager; import org.apache.hadoop.hdds.scm.XceiverClientMetrics; import org.apache.hadoop.hdds.scm.XceiverClientRatis; -import org.apache.hadoop.hdds.scm.container.common.helpers - .ContainerNotOpenException; +import org.apache.hadoop.hdds.scm.client.HddsClientUtils; +import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerNotOpenException; import org.apache.hadoop.hdds.scm.pipeline.Pipeline; import org.apache.hadoop.hdds.scm.storage.BlockOutputStream; import org.apache.hadoop.ozone.MiniOzoneCluster; @@ -75,27 +75,23 @@ public class TestBlockOutputStreamWithFailures { * * @throws IOException */ - @Before - public void init() throws Exception { + @Before public void init() throws Exception { chunkSize = 100; flushSize = 2 * chunkSize; maxFlushSize = 2 * flushSize; blockSize = 2 * maxFlushSize; - conf.set(OzoneConfigKeys.OZONE_CLIENT_WATCH_REQUEST_TIMEOUT, "5000ms"); + conf.set(OzoneConfigKeys.OZONE_CLIENT_WATCH_REQUEST_TIMEOUT, "1s"); conf.setTimeDuration(HDDS_SCM_WATCHER_TIMEOUT, 1000, TimeUnit.MILLISECONDS); - conf.setTimeDuration(OZONE_SCM_STALENODE_INTERVAL, 3, TimeUnit.SECONDS); + conf.setTimeDuration(OZONE_SCM_STALENODE_INTERVAL, 5, TimeUnit.SECONDS); conf.set(OzoneConfigKeys.OZONE_CLIENT_CHECKSUM_TYPE, "NONE"); conf.setQuietMode(false); conf.setStorageSize(OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE, 4, StorageUnit.MB); - cluster = MiniOzoneCluster.newBuilder(conf) - .setNumDatanodes(7) - .setBlockSize(blockSize) - .setChunkSize(chunkSize) + cluster = MiniOzoneCluster.newBuilder(conf).setNumDatanodes(7) + .setBlockSize(blockSize).setChunkSize(chunkSize) .setStreamBufferFlushSize(flushSize) .setStreamBufferMaxSize(maxFlushSize) - .setStreamBufferSizeUnit(StorageUnit.BYTES) - .build(); + .setStreamBufferSizeUnit(StorageUnit.BYTES).build(); cluster.waitForClusterToBeReady(); //the easiest way to create an open container is creating a key client = OzoneClientFactory.getClient(conf); @@ -114,25 +110,24 @@ private String getKeyName() { /** * Shutdown MiniDFSCluster. */ - @After - public void shutdown() { + @After public void shutdown() { if (cluster != null) { cluster.shutdown(); } } - @Test - public void testWatchForCommitWithCloseContainerException() throws Exception { + @Test public void testWatchForCommitWithCloseContainerException() + throws Exception { XceiverClientMetrics metrics = XceiverClientManager.getXceiverClientMetrics(); - long writeChunkCount = metrics.getContainerOpCountMetrics( - ContainerProtos.Type.WriteChunk); - long putBlockCount = metrics.getContainerOpCountMetrics( - ContainerProtos.Type.PutBlock); - long pendingWriteChunkCount = metrics.getContainerOpsMetrics( - ContainerProtos.Type.WriteChunk); - long pendingPutBlockCount = metrics.getContainerOpsMetrics( - ContainerProtos.Type.PutBlock); + long writeChunkCount = + metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk); + long putBlockCount = + metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock); + long pendingWriteChunkCount = + metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk); + long pendingPutBlockCount = + metrics.getContainerOpsMetrics(ContainerProtos.Type.PutBlock); long totalOpCount = metrics.getTotalOpCount(); String keyName = getKeyName(); OzoneOutputStream key = createKey(keyName, ReplicationType.RATIS, 0); @@ -155,15 +150,14 @@ public void testWatchForCommitWithCloseContainerException() throws Exception { metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk)); Assert.assertEquals(putBlockCount + 2, metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock)); - Assert.assertEquals(totalOpCount + 6, - metrics.getTotalOpCount()); + Assert.assertEquals(totalOpCount + 6, metrics.getTotalOpCount()); Assert.assertTrue(key.getOutputStream() instanceof KeyOutputStream); - KeyOutputStream keyOutputStream = (KeyOutputStream)key.getOutputStream(); + KeyOutputStream keyOutputStream = (KeyOutputStream) key.getOutputStream(); Assert.assertTrue(keyOutputStream.getStreamEntries().size() == 1); - OutputStream stream = keyOutputStream.getStreamEntries().get(0) - .getOutputStream(); + OutputStream stream = + keyOutputStream.getStreamEntries().get(0).getOutputStream(); Assert.assertTrue(stream instanceof BlockOutputStream); BlockOutputStream blockOutputStream = (BlockOutputStream) stream; @@ -199,8 +193,7 @@ public void testWatchForCommitWithCloseContainerException() throws Exception { metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk)); Assert.assertEquals(putBlockCount + 3, metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock)); - Assert.assertEquals(totalOpCount + 8, - metrics.getTotalOpCount()); + Assert.assertEquals(totalOpCount + 8, metrics.getTotalOpCount()); // flush is a sync call, all pending operations will complete Assert.assertEquals(pendingWriteChunkCount, @@ -233,9 +226,8 @@ public void testWatchForCommitWithCloseContainerException() throws Exception { // rewritten plus one partial chunk plus two putBlocks for flushSize // and one flush for partial chunk key.flush(); - Assert.assertEquals(2, keyOutputStream.getStreamEntries().size()); - Assert.assertTrue(keyOutputStream.checkForException(blockOutputStream + Assert.assertTrue(HddsClientUtils.checkForException(blockOutputStream .getIoException()) instanceof ContainerNotOpenException); // Make sure the retryCount is reset after the exception is handled @@ -247,8 +239,7 @@ public void testWatchForCommitWithCloseContainerException() throws Exception { // make sure the bufferPool is empty Assert .assertEquals(0, blockOutputStream.getBufferPool().computeBufferData()); - Assert - .assertEquals(dataLength, blockOutputStream.getTotalAckDataLength()); + Assert.assertEquals(dataLength, blockOutputStream.getTotalAckDataLength()); Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap()); Assert.assertEquals(0, keyOutputStream.getStreamEntries().size()); Assert.assertEquals(pendingWriteChunkCount, @@ -259,25 +250,23 @@ public void testWatchForCommitWithCloseContainerException() throws Exception { metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk)); Assert.assertEquals(putBlockCount + 8, metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock)); - Assert.assertEquals(totalOpCount + 22, - metrics.getTotalOpCount()); + Assert.assertEquals(totalOpCount + 22, metrics.getTotalOpCount()); // Written the same data twice String dataString = new String(data1, UTF_8); validateData(keyName, dataString.concat(dataString).getBytes()); } - @Test - public void testWatchForCommitDatanodeFailure() throws Exception { + @Test public void testWatchForCommitDatanodeFailure() throws Exception { XceiverClientMetrics metrics = XceiverClientManager.getXceiverClientMetrics(); - long writeChunkCount = metrics.getContainerOpCountMetrics( - ContainerProtos.Type.WriteChunk); - long putBlockCount = metrics.getContainerOpCountMetrics( - ContainerProtos.Type.PutBlock); - long pendingWriteChunkCount = metrics.getContainerOpsMetrics( - ContainerProtos.Type.WriteChunk); - long pendingPutBlockCount = metrics.getContainerOpsMetrics( - ContainerProtos.Type.PutBlock); + long writeChunkCount = + metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk); + long putBlockCount = + metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock); + long pendingWriteChunkCount = + metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk); + long pendingPutBlockCount = + metrics.getContainerOpsMetrics(ContainerProtos.Type.PutBlock); long totalOpCount = metrics.getTotalOpCount(); String keyName = getKeyName(); OzoneOutputStream key = createKey(keyName, ReplicationType.RATIS, 0); @@ -299,14 +288,13 @@ public void testWatchForCommitDatanodeFailure() throws Exception { metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk)); Assert.assertEquals(putBlockCount + 2, metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock)); - Assert.assertEquals(totalOpCount + 6, - metrics.getTotalOpCount()); + Assert.assertEquals(totalOpCount + 6, metrics.getTotalOpCount()); Assert.assertTrue(key.getOutputStream() instanceof KeyOutputStream); - KeyOutputStream keyOutputStream = (KeyOutputStream)key.getOutputStream(); + KeyOutputStream keyOutputStream = (KeyOutputStream) key.getOutputStream(); Assert.assertTrue(keyOutputStream.getStreamEntries().size() == 1); - OutputStream stream = keyOutputStream.getStreamEntries().get(0) - .getOutputStream(); + OutputStream stream = + keyOutputStream.getStreamEntries().get(0).getOutputStream(); Assert.assertTrue(stream instanceof BlockOutputStream); BlockOutputStream blockOutputStream = (BlockOutputStream) stream; @@ -344,8 +332,7 @@ public void testWatchForCommitDatanodeFailure() throws Exception { metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk)); Assert.assertEquals(putBlockCount + 3, metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock)); - Assert.assertEquals(totalOpCount + 8, - metrics.getTotalOpCount()); + Assert.assertEquals(totalOpCount + 8, metrics.getTotalOpCount()); // Since the data in the buffer is already flushed, flush here will have // no impact on the counters and data structures @@ -376,8 +363,7 @@ public void testWatchForCommitDatanodeFailure() throws Exception { Assert.assertEquals(2, keyOutputStream.getStreamEntries().size()); // now close the stream, It will update the ack length after watchForCommit key.close(); - Assert - .assertEquals(blockSize, blockOutputStream.getTotalAckDataLength()); + Assert.assertEquals(blockSize, blockOutputStream.getTotalAckDataLength()); // Make sure the retryCount is reset after the exception is handled Assert.assertTrue(keyOutputStream.getRetryCount() == 0); // make sure the bufferPool is empty @@ -396,25 +382,23 @@ public void testWatchForCommitDatanodeFailure() throws Exception { // 4 flushes at flushSize boundaries + 2 flush for partial chunks Assert.assertEquals(putBlockCount + 6, metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock)); - Assert.assertEquals(totalOpCount + 16, - metrics.getTotalOpCount()); + Assert.assertEquals(totalOpCount + 16, metrics.getTotalOpCount()); // Written the same data twice String dataString = new String(data1, UTF_8); validateData(keyName, dataString.concat(dataString).getBytes()); } - @Test - public void test2DatanodesFailure() throws Exception { + @Test public void test2DatanodesFailure() throws Exception { XceiverClientMetrics metrics = XceiverClientManager.getXceiverClientMetrics(); - long writeChunkCount = metrics.getContainerOpCountMetrics( - ContainerProtos.Type.WriteChunk); - long putBlockCount = metrics.getContainerOpCountMetrics( - ContainerProtos.Type.PutBlock); - long pendingWriteChunkCount = metrics.getContainerOpsMetrics( - ContainerProtos.Type.WriteChunk); - long pendingPutBlockCount = metrics.getContainerOpsMetrics( - ContainerProtos.Type.PutBlock); + long writeChunkCount = + metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk); + long putBlockCount = + metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock); + long pendingWriteChunkCount = + metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk); + long pendingPutBlockCount = + metrics.getContainerOpsMetrics(ContainerProtos.Type.PutBlock); long totalOpCount = metrics.getTotalOpCount(); String keyName = getKeyName(); OzoneOutputStream key = createKey(keyName, ReplicationType.RATIS, 0); @@ -436,14 +420,13 @@ public void test2DatanodesFailure() throws Exception { metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk)); Assert.assertEquals(putBlockCount + 2, metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock)); - Assert.assertEquals(totalOpCount + 6, - metrics.getTotalOpCount()); + Assert.assertEquals(totalOpCount + 6, metrics.getTotalOpCount()); Assert.assertTrue(key.getOutputStream() instanceof KeyOutputStream); - KeyOutputStream keyOutputStream = (KeyOutputStream)key.getOutputStream(); + KeyOutputStream keyOutputStream = (KeyOutputStream) key.getOutputStream(); Assert.assertTrue(keyOutputStream.getStreamEntries().size() == 1); - OutputStream stream = keyOutputStream.getStreamEntries().get(0) - .getOutputStream(); + OutputStream stream = + keyOutputStream.getStreamEntries().get(0).getOutputStream(); Assert.assertTrue(stream instanceof BlockOutputStream); BlockOutputStream blockOutputStream = (BlockOutputStream) stream; @@ -479,8 +462,7 @@ public void test2DatanodesFailure() throws Exception { metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk)); Assert.assertEquals(putBlockCount + 3, metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock)); - Assert.assertEquals(totalOpCount + 8, - metrics.getTotalOpCount()); + Assert.assertEquals(totalOpCount + 8, metrics.getTotalOpCount()); // Since the data in the buffer is already flushed, flush here will have // no impact on the counters and data structures @@ -512,7 +494,7 @@ public void test2DatanodesFailure() throws Exception { // rewritten plus one partial chunk plus two putBlocks for flushSize // and one flush for partial chunk key.flush(); - Assert.assertTrue(keyOutputStream.checkForException(blockOutputStream + Assert.assertTrue(HddsClientUtils.checkForException(blockOutputStream .getIoException()) instanceof RaftRetryFailureException); // Make sure the retryCount is reset after the exception is handled Assert.assertTrue(keyOutputStream.getRetryCount() == 0); @@ -522,8 +504,7 @@ public void test2DatanodesFailure() throws Exception { key.close(); Assert .assertEquals(0, blockOutputStream.getBufferPool().computeBufferData()); - Assert - .assertEquals(dataLength, blockOutputStream.getTotalAckDataLength()); + Assert.assertEquals(dataLength, blockOutputStream.getTotalAckDataLength()); Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap()); Assert.assertEquals(pendingWriteChunkCount, metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk)); @@ -533,30 +514,27 @@ public void test2DatanodesFailure() throws Exception { metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk)); Assert.assertEquals(putBlockCount + 8, metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock)); - Assert.assertEquals(totalOpCount + 22, - metrics.getTotalOpCount()); - Assert - .assertEquals(dataLength, blockOutputStream.getTotalAckDataLength()); + Assert.assertEquals(totalOpCount + 22, metrics.getTotalOpCount()); + Assert.assertEquals(dataLength, blockOutputStream.getTotalAckDataLength()); // make sure the bufferPool is empty Assert .assertEquals(0, blockOutputStream.getBufferPool().computeBufferData()); Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap()); - Assert.assertEquals(0, keyOutputStream.getStreamEntries().size()); + Assert.assertEquals(0, keyOutputStream.getLocationInfoList().size()); validateData(keyName, data1); } - @Test - public void testFailureWithPrimeSizedData() throws Exception { + @Test public void testFailureWithPrimeSizedData() throws Exception { XceiverClientMetrics metrics = XceiverClientManager.getXceiverClientMetrics(); - long writeChunkCount = metrics.getContainerOpCountMetrics( - ContainerProtos.Type.WriteChunk); - long putBlockCount = metrics.getContainerOpCountMetrics( - ContainerProtos.Type.PutBlock); - long pendingWriteChunkCount = metrics.getContainerOpsMetrics( - ContainerProtos.Type.WriteChunk); - long pendingPutBlockCount = metrics.getContainerOpsMetrics( - ContainerProtos.Type.PutBlock); + long writeChunkCount = + metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk); + long putBlockCount = + metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock); + long pendingWriteChunkCount = + metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk); + long pendingPutBlockCount = + metrics.getContainerOpsMetrics(ContainerProtos.Type.PutBlock); long totalOpCount = metrics.getTotalOpCount(); String keyName = getKeyName(); OzoneOutputStream key = createKey(keyName, ReplicationType.RATIS, 0); @@ -577,24 +555,21 @@ public void testFailureWithPrimeSizedData() throws Exception { metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk)); Assert.assertEquals(putBlockCount, metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock)); - Assert.assertEquals(totalOpCount + 1, - metrics.getTotalOpCount()); + Assert.assertEquals(totalOpCount + 1, metrics.getTotalOpCount()); Assert.assertTrue(key.getOutputStream() instanceof KeyOutputStream); - KeyOutputStream keyOutputStream = (KeyOutputStream)key.getOutputStream(); + KeyOutputStream keyOutputStream = (KeyOutputStream) key.getOutputStream(); Assert.assertTrue(keyOutputStream.getStreamEntries().size() == 1); - OutputStream stream = keyOutputStream.getStreamEntries().get(0) - .getOutputStream(); + OutputStream stream = + keyOutputStream.getStreamEntries().get(0).getOutputStream(); Assert.assertTrue(stream instanceof BlockOutputStream); BlockOutputStream blockOutputStream = (BlockOutputStream) stream; - Assert.assertEquals(2, blockOutputStream.getBufferPool().getSize()); Assert.assertEquals(dataLength, blockOutputStream.getWrittenDataLength()); - Assert.assertEquals(0, - blockOutputStream.getTotalDataFlushedLength()); + Assert.assertEquals(0, blockOutputStream.getTotalDataFlushedLength()); Assert.assertTrue(blockOutputStream.getTotalAckDataLength() == 0); @@ -613,8 +588,7 @@ public void testFailureWithPrimeSizedData() throws Exception { metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk)); Assert.assertEquals(putBlockCount + 1, metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock)); - Assert.assertEquals(totalOpCount + 3, - metrics.getTotalOpCount()); + Assert.assertEquals(totalOpCount + 3, metrics.getTotalOpCount()); // Since the data in the buffer is already flushed, flush here will have // no impact on the counters and data structures @@ -641,7 +615,7 @@ public void testFailureWithPrimeSizedData() throws Exception { key.flush(); Assert.assertEquals(2, keyOutputStream.getStreamEntries().size()); - Assert.assertTrue(keyOutputStream.checkForException(blockOutputStream + Assert.assertTrue(HddsClientUtils.checkForException(blockOutputStream .getIoException()) instanceof ContainerNotOpenException); // Make sure the retryCount is reset after the exception is handled Assert.assertTrue(keyOutputStream.getRetryCount() == 0); @@ -653,8 +627,7 @@ public void testFailureWithPrimeSizedData() throws Exception { // make sure the bufferPool is empty Assert .assertEquals(0, blockOutputStream.getBufferPool().computeBufferData()); - Assert - .assertEquals(dataLength, blockOutputStream.getTotalAckDataLength()); + Assert.assertEquals(dataLength, blockOutputStream.getTotalAckDataLength()); Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap()); Assert.assertEquals(pendingWriteChunkCount, metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk)); @@ -664,26 +637,24 @@ public void testFailureWithPrimeSizedData() throws Exception { metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk)); Assert.assertEquals(putBlockCount + 3, metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock)); - Assert.assertEquals(totalOpCount + 9, - metrics.getTotalOpCount()); - Assert.assertTrue(keyOutputStream.getStreamEntries().size() == 0); + Assert.assertEquals(totalOpCount + 9, metrics.getTotalOpCount()); + Assert.assertTrue(keyOutputStream.getLocationInfoList().size() == 0); // Written the same data twice String dataString = new String(data1, UTF_8); validateData(keyName, dataString.concat(dataString).getBytes()); } - @Test - public void testExceptionDuringClose() throws Exception { + @Test public void testExceptionDuringClose() throws Exception { XceiverClientMetrics metrics = XceiverClientManager.getXceiverClientMetrics(); - long writeChunkCount = metrics.getContainerOpCountMetrics( - ContainerProtos.Type.WriteChunk); - long putBlockCount = metrics.getContainerOpCountMetrics( - ContainerProtos.Type.PutBlock); - long pendingWriteChunkCount = metrics.getContainerOpsMetrics( - ContainerProtos.Type.WriteChunk); - long pendingPutBlockCount = metrics.getContainerOpsMetrics( - ContainerProtos.Type.PutBlock); + long writeChunkCount = + metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk); + long putBlockCount = + metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock); + long pendingWriteChunkCount = + metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk); + long pendingPutBlockCount = + metrics.getContainerOpsMetrics(ContainerProtos.Type.PutBlock); long totalOpCount = metrics.getTotalOpCount(); String keyName = getKeyName(); OzoneOutputStream key = createKey(keyName, ReplicationType.RATIS, 0); @@ -704,24 +675,21 @@ public void testExceptionDuringClose() throws Exception { metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk)); Assert.assertEquals(putBlockCount, metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock)); - Assert.assertEquals(totalOpCount + 1, - metrics.getTotalOpCount()); + Assert.assertEquals(totalOpCount + 1, metrics.getTotalOpCount()); Assert.assertTrue(key.getOutputStream() instanceof KeyOutputStream); - KeyOutputStream keyOutputStream = (KeyOutputStream)key.getOutputStream(); + KeyOutputStream keyOutputStream = (KeyOutputStream) key.getOutputStream(); Assert.assertTrue(keyOutputStream.getStreamEntries().size() == 1); - OutputStream stream = keyOutputStream.getStreamEntries().get(0) - .getOutputStream(); + OutputStream stream = + keyOutputStream.getStreamEntries().get(0).getOutputStream(); Assert.assertTrue(stream instanceof BlockOutputStream); BlockOutputStream blockOutputStream = (BlockOutputStream) stream; - Assert.assertEquals(2, blockOutputStream.getBufferPool().getSize()); Assert.assertEquals(dataLength, blockOutputStream.getWrittenDataLength()); - Assert.assertEquals(0, - blockOutputStream.getTotalDataFlushedLength()); + Assert.assertEquals(0, blockOutputStream.getTotalDataFlushedLength()); Assert.assertTrue(blockOutputStream.getTotalAckDataLength() == 0); @@ -740,8 +708,7 @@ public void testExceptionDuringClose() throws Exception { metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk)); Assert.assertEquals(putBlockCount + 1, metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock)); - Assert.assertEquals(totalOpCount + 3, - metrics.getTotalOpCount()); + Assert.assertEquals(totalOpCount + 3, metrics.getTotalOpCount()); // Since the data in the buffer is already flushed, flush here will have // no impact on the counters and data structures @@ -767,15 +734,14 @@ public void testExceptionDuringClose() throws Exception { // now close the stream, It will hit exception key.close(); - Assert.assertTrue(keyOutputStream.checkForException(blockOutputStream + Assert.assertTrue(HddsClientUtils.checkForException(blockOutputStream .getIoException()) instanceof ContainerNotOpenException); // Make sure the retryCount is reset after the exception is handled Assert.assertTrue(keyOutputStream.getRetryCount() == 0); // make sure the bufferPool is empty Assert .assertEquals(0, blockOutputStream.getBufferPool().computeBufferData()); - Assert - .assertEquals(dataLength, blockOutputStream.getTotalAckDataLength()); + Assert.assertEquals(dataLength, blockOutputStream.getTotalAckDataLength()); Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap()); Assert.assertEquals(pendingWriteChunkCount, metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk)); @@ -785,26 +751,24 @@ public void testExceptionDuringClose() throws Exception { metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk)); Assert.assertEquals(putBlockCount + 3, metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock)); - Assert.assertEquals(totalOpCount + 9, - metrics.getTotalOpCount()); + Assert.assertEquals(totalOpCount + 9, metrics.getTotalOpCount()); Assert.assertTrue(keyOutputStream.getStreamEntries().size() == 0); // Written the same data twice String dataString = new String(data1, UTF_8); validateData(keyName, dataString.concat(dataString).getBytes()); } - @Test - public void testWatchForCommitWithSingleNodeRatis() throws Exception { + @Test public void testWatchForCommitWithSingleNodeRatis() throws Exception { XceiverClientMetrics metrics = XceiverClientManager.getXceiverClientMetrics(); - long writeChunkCount = metrics.getContainerOpCountMetrics( - ContainerProtos.Type.WriteChunk); - long putBlockCount = metrics.getContainerOpCountMetrics( - ContainerProtos.Type.PutBlock); - long pendingWriteChunkCount = metrics.getContainerOpsMetrics( - ContainerProtos.Type.WriteChunk); - long pendingPutBlockCount = metrics.getContainerOpsMetrics( - ContainerProtos.Type.PutBlock); + long writeChunkCount = + metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk); + long putBlockCount = + metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock); + long pendingWriteChunkCount = + metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk); + long pendingPutBlockCount = + metrics.getContainerOpsMetrics(ContainerProtos.Type.PutBlock); long totalOpCount = metrics.getTotalOpCount(); String keyName = getKeyName(); OzoneOutputStream key = @@ -828,15 +792,14 @@ public void testWatchForCommitWithSingleNodeRatis() throws Exception { metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk)); Assert.assertEquals(putBlockCount + 2, metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock)); - Assert.assertEquals(totalOpCount + 6, - metrics.getTotalOpCount()); + Assert.assertEquals(totalOpCount + 6, metrics.getTotalOpCount()); Assert.assertTrue(key.getOutputStream() instanceof KeyOutputStream); - KeyOutputStream keyOutputStream = (KeyOutputStream)key.getOutputStream(); + KeyOutputStream keyOutputStream = (KeyOutputStream) key.getOutputStream(); Assert.assertTrue(keyOutputStream.getStreamEntries().size() == 1); - OutputStream stream = keyOutputStream.getStreamEntries().get(0) - .getOutputStream(); + OutputStream stream = + keyOutputStream.getStreamEntries().get(0).getOutputStream(); Assert.assertTrue(stream instanceof BlockOutputStream); BlockOutputStream blockOutputStream = (BlockOutputStream) stream; @@ -872,8 +835,7 @@ public void testWatchForCommitWithSingleNodeRatis() throws Exception { metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk)); Assert.assertEquals(putBlockCount + 3, metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock)); - Assert.assertEquals(totalOpCount + 8, - metrics.getTotalOpCount()); + Assert.assertEquals(totalOpCount + 8, metrics.getTotalOpCount()); // flush is a sync call, all pending operations will complete Assert.assertEquals(pendingWriteChunkCount, @@ -907,7 +869,7 @@ public void testWatchForCommitWithSingleNodeRatis() throws Exception { // and one flush for partial chunk key.flush(); - Assert.assertTrue(keyOutputStream.checkForException(blockOutputStream + Assert.assertTrue(HddsClientUtils.checkForException(blockOutputStream .getIoException()) instanceof ContainerNotOpenException); // Make sure the retryCount is reset after the exception is handled Assert.assertTrue(keyOutputStream.getRetryCount() == 0); @@ -919,10 +881,9 @@ public void testWatchForCommitWithSingleNodeRatis() throws Exception { // make sure the bufferPool is empty Assert .assertEquals(0, blockOutputStream.getBufferPool().computeBufferData()); - Assert - .assertEquals(dataLength, blockOutputStream.getTotalAckDataLength()); + Assert.assertEquals(dataLength, blockOutputStream.getTotalAckDataLength()); Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap()); - Assert.assertEquals(0, keyOutputStream.getStreamEntries().size()); + Assert.assertEquals(0, keyOutputStream.getLocationInfoList().size()); Assert.assertEquals(pendingWriteChunkCount, metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk)); Assert.assertEquals(pendingPutBlockCount, @@ -931,25 +892,23 @@ public void testWatchForCommitWithSingleNodeRatis() throws Exception { metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk)); Assert.assertEquals(putBlockCount + 8, metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock)); - Assert.assertEquals(totalOpCount + 22, - metrics.getTotalOpCount()); + Assert.assertEquals(totalOpCount + 22, metrics.getTotalOpCount()); // Written the same data twice String dataString = new String(data1, UTF_8); validateData(keyName, dataString.concat(dataString).getBytes()); } - @Test - public void testDatanodeFailureWithSingleNodeRatis() throws Exception { + @Test public void testDatanodeFailureWithSingleNodeRatis() throws Exception { XceiverClientMetrics metrics = XceiverClientManager.getXceiverClientMetrics(); - long writeChunkCount = metrics.getContainerOpCountMetrics( - ContainerProtos.Type.WriteChunk); - long putBlockCount = metrics.getContainerOpCountMetrics( - ContainerProtos.Type.PutBlock); - long pendingWriteChunkCount = metrics.getContainerOpsMetrics( - ContainerProtos.Type.WriteChunk); - long pendingPutBlockCount = metrics.getContainerOpsMetrics( - ContainerProtos.Type.PutBlock); + long writeChunkCount = + metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk); + long putBlockCount = + metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock); + long pendingWriteChunkCount = + metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk); + long pendingPutBlockCount = + metrics.getContainerOpsMetrics(ContainerProtos.Type.PutBlock); long totalOpCount = metrics.getTotalOpCount(); String keyName = getKeyName(); OzoneOutputStream key = @@ -972,14 +931,13 @@ public void testDatanodeFailureWithSingleNodeRatis() throws Exception { metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk)); Assert.assertEquals(putBlockCount + 2, metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock)); - Assert.assertEquals(totalOpCount + 6, - metrics.getTotalOpCount()); + Assert.assertEquals(totalOpCount + 6, metrics.getTotalOpCount()); Assert.assertTrue(key.getOutputStream() instanceof KeyOutputStream); - KeyOutputStream keyOutputStream = (KeyOutputStream)key.getOutputStream(); + KeyOutputStream keyOutputStream = (KeyOutputStream) key.getOutputStream(); Assert.assertTrue(keyOutputStream.getStreamEntries().size() == 1); - OutputStream stream = keyOutputStream.getStreamEntries().get(0) - .getOutputStream(); + OutputStream stream = + keyOutputStream.getStreamEntries().get(0).getOutputStream(); Assert.assertTrue(stream instanceof BlockOutputStream); BlockOutputStream blockOutputStream = (BlockOutputStream) stream; @@ -1015,8 +973,7 @@ public void testDatanodeFailureWithSingleNodeRatis() throws Exception { metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk)); Assert.assertEquals(putBlockCount + 3, metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock)); - Assert.assertEquals(totalOpCount + 8, - metrics.getTotalOpCount()); + Assert.assertEquals(totalOpCount + 8, metrics.getTotalOpCount()); // Since the data in the buffer is already flushed, flush here will have // no impact on the counters and data structures @@ -1044,7 +1001,7 @@ public void testDatanodeFailureWithSingleNodeRatis() throws Exception { key.flush(); - Assert.assertTrue(keyOutputStream.checkForException(blockOutputStream + Assert.assertTrue(HddsClientUtils.checkForException(blockOutputStream .getIoException()) instanceof RaftRetryFailureException); Assert.assertEquals(1, raftClient.getCommitInfoMap().size()); // Make sure the retryCount is reset after the exception is handled @@ -1052,8 +1009,7 @@ public void testDatanodeFailureWithSingleNodeRatis() throws Exception { Assert.assertEquals(2, keyOutputStream.getStreamEntries().size()); // now close the stream, It will update the ack length after watchForCommit key.close(); - Assert - .assertEquals(dataLength, blockOutputStream.getTotalAckDataLength()); + Assert.assertEquals(dataLength, blockOutputStream.getTotalAckDataLength()); // make sure the bufferPool is empty Assert .assertEquals(0, blockOutputStream.getBufferPool().computeBufferData()); @@ -1073,27 +1029,25 @@ public void testDatanodeFailureWithSingleNodeRatis() throws Exception { // flush failed + 3 more flushes for the next block Assert.assertEquals(putBlockCount + 8, metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock)); - Assert.assertEquals(totalOpCount + 22, - metrics.getTotalOpCount()); - Assert.assertEquals(0, keyOutputStream.getStreamEntries().size()); + Assert.assertEquals(totalOpCount + 22, metrics.getTotalOpCount()); + Assert.assertEquals(0, keyOutputStream.getLocationInfoList().size()); // Written the same data twice String dataString = new String(data1, UTF_8); cluster.restartHddsDatanode(pipeline.getNodes().get(0), true); validateData(keyName, dataString.concat(dataString).getBytes()); } - @Test - public void testDatanodeFailureWithPreAllocation() throws Exception { + @Test public void testDatanodeFailureWithPreAllocation() throws Exception { XceiverClientMetrics metrics = XceiverClientManager.getXceiverClientMetrics(); - long writeChunkCount = metrics.getContainerOpCountMetrics( - ContainerProtos.Type.WriteChunk); - long putBlockCount = metrics.getContainerOpCountMetrics( - ContainerProtos.Type.PutBlock); - long pendingWriteChunkCount = metrics.getContainerOpsMetrics( - ContainerProtos.Type.WriteChunk); - long pendingPutBlockCount = metrics.getContainerOpsMetrics( - ContainerProtos.Type.PutBlock); + long writeChunkCount = + metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk); + long putBlockCount = + metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock); + long pendingWriteChunkCount = + metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk); + long pendingPutBlockCount = + metrics.getContainerOpsMetrics(ContainerProtos.Type.PutBlock); long totalOpCount = metrics.getTotalOpCount(); String keyName = getKeyName(); OzoneOutputStream key = @@ -1117,14 +1071,13 @@ public void testDatanodeFailureWithPreAllocation() throws Exception { metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk)); Assert.assertEquals(putBlockCount + 2, metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock)); - Assert.assertEquals(totalOpCount + 6, - metrics.getTotalOpCount()); + Assert.assertEquals(totalOpCount + 6, metrics.getTotalOpCount()); Assert.assertTrue(key.getOutputStream() instanceof KeyOutputStream); - KeyOutputStream keyOutputStream = (KeyOutputStream)key.getOutputStream(); + KeyOutputStream keyOutputStream = (KeyOutputStream) key.getOutputStream(); Assert.assertTrue(keyOutputStream.getStreamEntries().size() == 3); - OutputStream stream = keyOutputStream.getStreamEntries().get(0) - .getOutputStream(); + OutputStream stream = + keyOutputStream.getStreamEntries().get(0).getOutputStream(); Assert.assertTrue(stream instanceof BlockOutputStream); BlockOutputStream blockOutputStream = (BlockOutputStream) stream; @@ -1160,8 +1113,7 @@ public void testDatanodeFailureWithPreAllocation() throws Exception { metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk)); Assert.assertEquals(putBlockCount + 3, metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock)); - Assert.assertEquals(totalOpCount + 8, - metrics.getTotalOpCount()); + Assert.assertEquals(totalOpCount + 8, metrics.getTotalOpCount()); // Since the data in the buffer is already flushed, flush here will have // no impact on the counters and data structures @@ -1188,7 +1140,7 @@ public void testDatanodeFailureWithPreAllocation() throws Exception { key.flush(); - Assert.assertTrue(keyOutputStream.checkForException(blockOutputStream + Assert.assertTrue(HddsClientUtils.checkForException(blockOutputStream .getIoException()) instanceof RaftRetryFailureException); // Make sure the retryCount is reset after the exception is handled @@ -1197,13 +1149,12 @@ public void testDatanodeFailureWithPreAllocation() throws Exception { // now close the stream, It will update the ack length after watchForCommit key.close(); - Assert - .assertEquals(dataLength, blockOutputStream.getTotalAckDataLength()); + Assert.assertEquals(dataLength, blockOutputStream.getTotalAckDataLength()); // make sure the bufferPool is empty Assert .assertEquals(0, blockOutputStream.getBufferPool().computeBufferData()); Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap()); - Assert.assertEquals(0, keyOutputStream.getStreamEntries().size()); + Assert.assertEquals(0, keyOutputStream.getLocationInfoList().size()); Assert.assertEquals(pendingWriteChunkCount, metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk)); Assert.assertEquals(pendingPutBlockCount, @@ -1219,8 +1170,7 @@ public void testDatanodeFailureWithPreAllocation() throws Exception { // flush failed + 3 more flushes for the next block Assert.assertEquals(putBlockCount + 8, metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock)); - Assert.assertEquals(totalOpCount + 22, - metrics.getTotalOpCount()); + Assert.assertEquals(totalOpCount + 22, metrics.getTotalOpCount()); // Written the same data twice String dataString = new String(data1, UTF_8); cluster.restartHddsDatanode(pipeline.getNodes().get(0), true); diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestCloseContainerHandlingByClient.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestCloseContainerHandlingByClient.java index 8be1cccb50f7d..e551ab1ae207e 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestCloseContainerHandlingByClient.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestCloseContainerHandlingByClient.java @@ -291,7 +291,7 @@ public void testMultiBlockWrites3() throws Exception { (KeyOutputStream) key.getOutputStream(); // With the initial size provided, it should have preallocated 4 blocks Assert.assertEquals(4, keyOutputStream.getStreamEntries().size()); - // write data 3 blocks and one more chunk + // write data 4 blocks and one more chunk byte[] writtenData = ContainerTestHelper.getFixedLengthString(keyString, keyLen) .getBytes(UTF_8); diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneClientRetriesOnException.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneClientRetriesOnException.java index 5cb6dbc047407..5f6d494a24b19 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneClientRetriesOnException.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneClientRetriesOnException.java @@ -21,6 +21,7 @@ import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.scm.XceiverClientManager; import org.apache.hadoop.hdds.scm.XceiverClientSpi; +import org.apache.hadoop.hdds.scm.client.HddsClientUtils; import org.apache.hadoop.hdds.scm.container.ContainerID; import org.apache.hadoop.hdds.scm.container.ContainerInfo; import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerNotOpenException; @@ -50,7 +51,6 @@ import static java.nio.charset.StandardCharsets.UTF_8; import static org.apache.hadoop.hdds.scm.ScmConfigKeys.HDDS_SCM_WATCHER_TIMEOUT; -import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_STALENODE_INTERVAL; /** * Tests failure detection and handling in BlockOutputStream Class. @@ -85,7 +85,7 @@ public void init() throws Exception { blockSize = 2 * maxFlushSize; conf.set(OzoneConfigKeys.OZONE_CLIENT_WATCH_REQUEST_TIMEOUT, "5000ms"); conf.setTimeDuration(HDDS_SCM_WATCHER_TIMEOUT, 1000, TimeUnit.MILLISECONDS); - conf.setTimeDuration(OZONE_SCM_STALENODE_INTERVAL, 3, TimeUnit.SECONDS); + // conf.setTimeDuration(OZONE_SCM_STALENODE_INTERVAL, 30, TimeUnit.SECONDS); conf.set(OzoneConfigKeys.OZONE_CLIENT_CHECKSUM_TYPE, "NONE"); conf.setInt(OzoneConfigKeys.OZONE_CLIENT_MAX_RETRIES, 3); conf.setQuietMode(false); @@ -150,7 +150,7 @@ public void testGroupMismatchExceptionHandling() throws Exception { .getPipeline(container.getPipelineID()); ContainerTestHelper.waitForPipelineClose(key, cluster, true); key.flush(); - Assert.assertTrue(keyOutputStream.checkForException(blockOutputStream + Assert.assertTrue(HddsClientUtils.checkForException(blockOutputStream .getIoException()) instanceof GroupMismatchException); Assert.assertTrue(keyOutputStream.getExcludeList().getPipelineIds() .contains(pipeline.getId())); @@ -201,7 +201,7 @@ public void testMaxRetriesByOzoneClient() throws Exception { key.write(data1); Assert.fail("Expected exception not thrown"); } catch (IOException ioe) { - Assert.assertTrue(keyOutputStream.checkForException(blockOutputStream + Assert.assertTrue(HddsClientUtils.checkForException(blockOutputStream .getIoException()) instanceof ContainerNotOpenException); Assert.assertTrue(ioe.getMessage().contains( "Retry request failed. retries get failed due to exceeded maximum " diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestWatchForCommit.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestWatchForCommit.java new file mode 100644 index 0000000000000..f9502ba5dad8e --- /dev/null +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestWatchForCommit.java @@ -0,0 +1,502 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.hadoop.ozone.client.rpc; + +import org.apache.hadoop.conf.StorageUnit; +import org.apache.hadoop.hdds.client.ReplicationType; +import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos; +import org.apache.hadoop.hdds.scm.*; +import org.apache.hadoop.hdds.scm.client.HddsClientUtils; +import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline; +import org.apache.hadoop.hdds.scm.pipeline.Pipeline; +import org.apache.hadoop.hdds.scm.protocolPB.StorageContainerLocationProtocolClientSideTranslatorPB; +import org.apache.hadoop.hdds.scm.storage.BlockOutputStream; +import org.apache.hadoop.ozone.MiniOzoneCluster; +import org.apache.hadoop.ozone.OzoneConfigKeys; +import org.apache.hadoop.ozone.client.ObjectStore; +import org.apache.hadoop.ozone.client.OzoneClient; +import org.apache.hadoop.ozone.client.OzoneClientFactory; +import org.apache.hadoop.ozone.client.io.KeyOutputStream; +import org.apache.hadoop.ozone.client.io.OzoneOutputStream; +import org.apache.hadoop.ozone.container.ContainerTestHelper; +import org.apache.hadoop.test.GenericTestUtils; +import org.apache.ratis.protocol.GroupMismatchException; +import org.apache.ratis.protocol.RaftRetryFailureException; +import org.junit.Assert; +import org.junit.Test; + +import java.io.IOException; +import java.io.OutputStream; +import java.util.ArrayList; +import java.util.List; +import java.util.UUID; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import static java.nio.charset.StandardCharsets.UTF_8; +import static org.apache.hadoop.hdds.scm.ScmConfigKeys.HDDS_SCM_WATCHER_TIMEOUT; +import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_STALENODE_INTERVAL; + +/** + * This class verifies the watchForCommit Handling by client. + */ +public class TestWatchForCommit { + + private MiniOzoneCluster cluster; + private OzoneClient client; + private ObjectStore objectStore; + private String volumeName; + private String bucketName; + private String keyString; + private int chunkSize; + private int flushSize; + private int maxFlushSize; + private int blockSize; + private StorageContainerLocationProtocolClientSideTranslatorPB + storageContainerLocationClient; + private static String containerOwner = "OZONE"; + + /** + * Create a MiniDFSCluster for testing. + *

+ * Ozone is made active by setting OZONE_ENABLED = true + * + * @throws IOException + */ + private void startCluster(OzoneConfiguration conf) throws Exception { + chunkSize = 100; + flushSize = 2 * chunkSize; + maxFlushSize = 2 * flushSize; + blockSize = 2 * maxFlushSize; + + conf.setTimeDuration(HDDS_SCM_WATCHER_TIMEOUT, 1000, TimeUnit.MILLISECONDS); + conf.setTimeDuration( + OzoneConfigKeys.DFS_RATIS_CLIENT_REQUEST_RETRY_INTERVAL_KEY, + 1, TimeUnit.SECONDS); + + conf.setQuietMode(false); + cluster = MiniOzoneCluster.newBuilder(conf) + .setNumDatanodes(7) + .setBlockSize(blockSize) + .setChunkSize(chunkSize) + .setStreamBufferFlushSize(flushSize) + .setStreamBufferMaxSize(maxFlushSize) + .setStreamBufferSizeUnit(StorageUnit.BYTES) + .build(); + cluster.waitForClusterToBeReady(); + //the easiest way to create an open container is creating a key + client = OzoneClientFactory.getClient(conf); + objectStore = client.getObjectStore(); + keyString = UUID.randomUUID().toString(); + volumeName = "watchforcommithandlingtest"; + bucketName = volumeName; + objectStore.createVolume(volumeName); + objectStore.getVolume(volumeName).createBucket(bucketName); + storageContainerLocationClient = cluster + .getStorageContainerLocationClient(); + } + + + /** + * Shutdown MiniDFSCluster. + */ + private void shutdown() { + if (cluster != null) { + cluster.shutdown(); + } + } + + private String getKeyName() { + return UUID.randomUUID().toString(); + } + + @Test + public void testWatchForCommitWithKeyWrite() throws Exception { + // in this case, watch request should fail with RaftRetryFailureException + // and will be captured in keyOutputStream and the failover will happen + // to a different block + OzoneConfiguration conf = new OzoneConfiguration(); + conf.setTimeDuration(OzoneConfigKeys.OZONE_CLIENT_WATCH_REQUEST_TIMEOUT, 10, + TimeUnit.SECONDS); + conf.setInt(OzoneConfigKeys.DFS_RATIS_CLIENT_REQUEST_MAX_RETRIES_KEY, 2); + startCluster(conf); + XceiverClientMetrics metrics = + XceiverClientManager.getXceiverClientMetrics(); + long writeChunkCount = metrics.getContainerOpCountMetrics( + ContainerProtos.Type.WriteChunk); + long putBlockCount = metrics.getContainerOpCountMetrics( + ContainerProtos.Type.PutBlock); + long pendingWriteChunkCount = metrics.getContainerOpsMetrics( + ContainerProtos.Type.WriteChunk); + long pendingPutBlockCount = metrics.getContainerOpsMetrics( + ContainerProtos.Type.PutBlock); + long totalOpCount = metrics.getTotalOpCount(); + String keyName = getKeyName(); + OzoneOutputStream key = createKey(keyName, ReplicationType.RATIS, 0); + int dataLength = maxFlushSize + 50; + // write data more than 1 chunk + byte[] data1 = + ContainerTestHelper.getFixedLengthString(keyString, dataLength) + .getBytes(UTF_8); + key.write(data1); + // since its hitting the full bufferCondition, it will call watchForCommit + // and completes atleast putBlock for first flushSize worth of data + Assert.assertTrue( + metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk) + <= pendingWriteChunkCount + 2); + Assert.assertTrue( + metrics.getContainerOpsMetrics(ContainerProtos.Type.PutBlock) + <= pendingPutBlockCount + 1); + Assert.assertEquals(writeChunkCount + 4, + metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk)); + Assert.assertEquals(putBlockCount + 2, + metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock)); + Assert.assertEquals(totalOpCount + 6, + metrics.getTotalOpCount()); + Assert.assertTrue(key.getOutputStream() instanceof KeyOutputStream); + KeyOutputStream keyOutputStream = (KeyOutputStream)key.getOutputStream(); + + Assert.assertTrue(keyOutputStream.getStreamEntries().size() == 1); + OutputStream stream = keyOutputStream.getStreamEntries().get(0) + .getOutputStream(); + Assert.assertTrue(stream instanceof BlockOutputStream); + BlockOutputStream blockOutputStream = (BlockOutputStream) stream; + + // we have just written data more than flush Size(2 chunks), at this time + // buffer pool will have 3 buffers allocated worth of chunk size + + Assert.assertEquals(4, blockOutputStream.getBufferPool().getSize()); + // writtenDataLength as well flushedDataLength will be updated here + Assert.assertEquals(dataLength, blockOutputStream.getWrittenDataLength()); + + Assert.assertEquals(maxFlushSize, + blockOutputStream.getTotalDataFlushedLength()); + + // since data equals to maxBufferSize is written, this will be a blocking + // call and hence will wait for atleast flushSize worth of data to get + // acked by all servers right here + Assert.assertTrue(blockOutputStream.getTotalAckDataLength() >= flushSize); + + // watchForCommit will clean up atleast one entry from the map where each + // entry corresponds to flushSize worth of data + Assert.assertTrue( + blockOutputStream.getCommitIndex2flushedDataMap().size() <= 1); + + // Now do a flush. This will flush the data and update the flush length and + // the map. + key.flush(); + + Assert.assertEquals(pendingWriteChunkCount, + metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk)); + Assert.assertEquals(pendingPutBlockCount, + metrics.getContainerOpsMetrics(ContainerProtos.Type.PutBlock)); + Assert.assertEquals(writeChunkCount + 5, + metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk)); + Assert.assertEquals(putBlockCount + 3, + metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock)); + Assert.assertEquals(totalOpCount + 8, + metrics.getTotalOpCount()); + + // Since the data in the buffer is already flushed, flush here will have + // no impact on the counters and data structures + + Assert.assertEquals(4, blockOutputStream.getBufferPool().getSize()); + Assert.assertEquals(dataLength, blockOutputStream.getWrittenDataLength()); + + Assert.assertEquals(dataLength, + blockOutputStream.getTotalDataFlushedLength()); + // flush will make sure one more entry gets updated in the map + Assert.assertTrue( + blockOutputStream.getCommitIndex2flushedDataMap().size() <= 2); + + XceiverClientRatis raftClient = + (XceiverClientRatis) blockOutputStream.getXceiverClient(); + Assert.assertEquals(3, raftClient.getCommitInfoMap().size()); + Pipeline pipeline = raftClient.getPipeline(); + cluster.shutdownHddsDatanode(pipeline.getNodes().get(0)); + cluster.shutdownHddsDatanode(pipeline.getNodes().get(1)); + // again write data with more than max buffer limit. This will call + // watchForCommit again. Since the commit will happen 2 way, the + // commitInfoMap will get updated for servers which are alive + + // 4 writeChunks = maxFlushSize + 2 putBlocks will be discarded here + // once exception is hit + key.write(data1); + + // As a part of handling the exception, 4 failed writeChunks will be + // rewritten plus one partial chunk plus two putBlocks for flushSize + // and one flush for partial chunk + key.flush(); + Assert.assertTrue(HddsClientUtils.checkForException(blockOutputStream + .getIoException()) instanceof RaftRetryFailureException); + // Make sure the retryCount is reset after the exception is handled + Assert.assertTrue(keyOutputStream.getRetryCount() == 0); + // now close the stream, It will update the ack length after watchForCommit + key.close(); + Assert + .assertEquals(0, blockOutputStream.getBufferPool().computeBufferData()); + Assert + .assertEquals(dataLength, blockOutputStream.getTotalAckDataLength()); + Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap()); + Assert.assertEquals(0, keyOutputStream.getStreamEntries().size()); + Assert.assertEquals(pendingWriteChunkCount, + metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk)); + Assert.assertEquals(pendingPutBlockCount, + metrics.getContainerOpsMetrics(ContainerProtos.Type.PutBlock)); + Assert.assertEquals(writeChunkCount + 14, + metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk)); + Assert.assertEquals(putBlockCount + 8, + metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock)); + Assert.assertEquals(totalOpCount + 22, + metrics.getTotalOpCount()); + Assert + .assertEquals(dataLength, blockOutputStream.getTotalAckDataLength()); + // make sure the bufferPool is empty + Assert + .assertEquals(0, blockOutputStream.getBufferPool().computeBufferData()); + Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap()); + Assert.assertEquals(2, keyOutputStream.getStreamEntries().size()); + validateData(keyName, data1); + shutdown(); + } + + @Test + public void testWatchForCommitWithSmallerTimeoutValue() throws Exception { + OzoneConfiguration conf = new OzoneConfiguration(); + conf.setTimeDuration(OzoneConfigKeys.OZONE_CLIENT_WATCH_REQUEST_TIMEOUT, 3, + TimeUnit.SECONDS); + conf.setInt(OzoneConfigKeys.DFS_RATIS_CLIENT_REQUEST_MAX_RETRIES_KEY, 10); + startCluster(conf); + XceiverClientManager clientManager = new XceiverClientManager(conf); + ContainerWithPipeline container1 = storageContainerLocationClient + .allocateContainer(HddsProtos.ReplicationType.RATIS, + HddsProtos.ReplicationFactor.THREE, containerOwner); + XceiverClientSpi client = clientManager + .acquireClient(container1.getPipeline()); + Assert.assertEquals(1, client.getRefcount()); + Assert.assertEquals(container1.getPipeline(), + client.getPipeline()); + Pipeline pipeline = client.getPipeline(); + XceiverClientReply reply = + client.sendCommandAsync(ContainerTestHelper.getCreateContainerRequest( + container1.getContainerInfo().getContainerID(), + client.getPipeline())); + reply.getResponse().get(); + long index = reply.getLogIndex(); + cluster.shutdownHddsDatanode(pipeline.getNodes().get(0)); + cluster.shutdownHddsDatanode(pipeline.getNodes().get(1)); + try { + // just watch for a lo index which in not updated in the commitInfo Map + client.watchForCommit(index + 1, 3000); + Assert.fail("expected exception not thrown"); + } catch (Exception e) { + Assert.assertTrue( + HddsClientUtils.checkForException(e) instanceof TimeoutException); + } + // After releasing the client, this connection should be closed + // and any container operations should fail + clientManager.releaseClient(client, false); + shutdown(); + } + + @Test + public void testWatchForCommitForRetryfailure() throws Exception { + OzoneConfiguration conf = new OzoneConfiguration(); + conf.setTimeDuration(OzoneConfigKeys.OZONE_CLIENT_WATCH_REQUEST_TIMEOUT, 20, + TimeUnit.SECONDS); + conf.setInt(OzoneConfigKeys.DFS_RATIS_CLIENT_REQUEST_MAX_RETRIES_KEY, 3); + startCluster(conf); + XceiverClientManager clientManager = new XceiverClientManager(conf); + ContainerWithPipeline container1 = storageContainerLocationClient + .allocateContainer(HddsProtos.ReplicationType.RATIS, + HddsProtos.ReplicationFactor.THREE, containerOwner); + XceiverClientSpi client = clientManager + .acquireClient(container1.getPipeline()); + Assert.assertEquals(1, client.getRefcount()); + Assert.assertEquals(container1.getPipeline(), + client.getPipeline()); + Pipeline pipeline = client.getPipeline(); + XceiverClientReply reply = + client.sendCommandAsync(ContainerTestHelper.getCreateContainerRequest( + container1.getContainerInfo().getContainerID(), + client.getPipeline())); + reply.getResponse().get(); + long index = reply.getLogIndex(); + cluster.shutdownHddsDatanode(pipeline.getNodes().get(0)); + cluster.shutdownHddsDatanode(pipeline.getNodes().get(1)); + // again write data with more than max buffer limit. This wi + try { + // just watch for a lo index which in not updated in the commitInfo Map + client.watchForCommit(index + 1, 20000); + Assert.fail("expected exception not thrown"); + } catch (Exception e) { + Assert.assertTrue(HddsClientUtils + .checkForException(e) instanceof RaftRetryFailureException); + } + clientManager.releaseClient(client, false); + shutdown(); + } + + @Test + public void test2WayCommitForRetryfailure() throws Exception { + OzoneConfiguration conf = new OzoneConfiguration(); + conf.setTimeDuration(OzoneConfigKeys.OZONE_CLIENT_WATCH_REQUEST_TIMEOUT, 20, + TimeUnit.SECONDS); + conf.setInt(OzoneConfigKeys.DFS_RATIS_CLIENT_REQUEST_MAX_RETRIES_KEY, 3); + startCluster(conf); + GenericTestUtils.LogCapturer logCapturer = + GenericTestUtils.LogCapturer.captureLogs(XceiverClientRatis.LOG); + XceiverClientManager clientManager = new XceiverClientManager(conf); + + ContainerWithPipeline container1 = storageContainerLocationClient + .allocateContainer(HddsProtos.ReplicationType.RATIS, + HddsProtos.ReplicationFactor.THREE, containerOwner); + XceiverClientSpi client = clientManager + .acquireClient(container1.getPipeline()); + Assert.assertEquals(1, client.getRefcount()); + Assert.assertEquals(container1.getPipeline(), + client.getPipeline()); + Pipeline pipeline = client.getPipeline(); + XceiverClientRatis ratisClient = (XceiverClientRatis) client; + XceiverClientReply reply = + client.sendCommandAsync(ContainerTestHelper.getCreateContainerRequest( + container1.getContainerInfo().getContainerID(), + client.getPipeline())); + reply.getResponse().get(); + Assert.assertEquals(3, ratisClient.getCommitInfoMap().size()); + cluster.shutdownHddsDatanode(pipeline.getNodes().get(0)); + reply = client.sendCommandAsync(ContainerTestHelper + .getCloseContainer(pipeline, + container1.getContainerInfo().getContainerID())); + reply.getResponse().get(); + client.watchForCommit(reply.getLogIndex(), 20000); + + // commitInfo Map will be reduced to 2 here + Assert.assertEquals(2, ratisClient.getCommitInfoMap().size()); + clientManager.releaseClient(client, false); + Assert.assertTrue(logCapturer.getOutput().contains("3 way commit failed")); + Assert.assertTrue( + logCapturer.getOutput().contains("RaftRetryFailureException")); + Assert + .assertTrue(logCapturer.getOutput().contains("Committed by majority")); + logCapturer.stopCapturing(); + shutdown(); + } + + @Test + public void test2WayCommitForTimeoutException() throws Exception { + OzoneConfiguration conf = new OzoneConfiguration(); + conf.setTimeDuration(OzoneConfigKeys.OZONE_CLIENT_WATCH_REQUEST_TIMEOUT, 3, + TimeUnit.SECONDS); + conf.setInt(OzoneConfigKeys.DFS_RATIS_CLIENT_REQUEST_MAX_RETRIES_KEY, 10); + startCluster(conf); + GenericTestUtils.LogCapturer logCapturer = + GenericTestUtils.LogCapturer.captureLogs(XceiverClientRatis.LOG); + XceiverClientManager clientManager = new XceiverClientManager(conf); + + ContainerWithPipeline container1 = storageContainerLocationClient + .allocateContainer(HddsProtos.ReplicationType.RATIS, + HddsProtos.ReplicationFactor.THREE, containerOwner); + XceiverClientSpi client = clientManager + .acquireClient(container1.getPipeline()); + Assert.assertEquals(1, client.getRefcount()); + Assert.assertEquals(container1.getPipeline(), + client.getPipeline()); + Pipeline pipeline = client.getPipeline(); + XceiverClientRatis ratisClient = (XceiverClientRatis) client; + XceiverClientReply reply = + client.sendCommandAsync(ContainerTestHelper.getCreateContainerRequest( + container1.getContainerInfo().getContainerID(), + client.getPipeline())); + reply.getResponse().get(); + Assert.assertEquals(3, ratisClient.getCommitInfoMap().size()); + cluster.shutdownHddsDatanode(pipeline.getNodes().get(0)); + reply = client.sendCommandAsync(ContainerTestHelper + .getCloseContainer(pipeline, + container1.getContainerInfo().getContainerID())); + reply.getResponse().get(); + client.watchForCommit(reply.getLogIndex(), 3000); + + // commitInfo Map will be reduced to 2 here + Assert.assertEquals(2, ratisClient.getCommitInfoMap().size()); + clientManager.releaseClient(client, false); + Assert.assertTrue(logCapturer.getOutput().contains("3 way commit failed")); + Assert.assertTrue(logCapturer.getOutput().contains("TimeoutException")); + Assert + .assertTrue(logCapturer.getOutput().contains("Committed by majority")); + logCapturer.stopCapturing(); + shutdown(); + } + + @Test + public void testWatchForCommitForGroupMismatchException() throws Exception { + OzoneConfiguration conf = new OzoneConfiguration(); + conf.setTimeDuration(OzoneConfigKeys.OZONE_CLIENT_WATCH_REQUEST_TIMEOUT, 20, + TimeUnit.SECONDS); + conf.setInt(OzoneConfigKeys.DFS_RATIS_CLIENT_REQUEST_MAX_RETRIES_KEY, 20); + + // mark the node stale early so that pipleline gets destroyed quickly + conf.setTimeDuration(OZONE_SCM_STALENODE_INTERVAL, 3, TimeUnit.SECONDS); + startCluster(conf); + GenericTestUtils.LogCapturer logCapturer = + GenericTestUtils.LogCapturer.captureLogs(XceiverClientRatis.LOG); + XceiverClientManager clientManager = new XceiverClientManager(conf); + + ContainerWithPipeline container1 = storageContainerLocationClient + .allocateContainer(HddsProtos.ReplicationType.RATIS, + HddsProtos.ReplicationFactor.THREE, containerOwner); + XceiverClientSpi client = clientManager + .acquireClient(container1.getPipeline()); + Assert.assertEquals(1, client.getRefcount()); + Assert.assertEquals(container1.getPipeline(), + client.getPipeline()); + Pipeline pipeline = client.getPipeline(); + XceiverClientRatis ratisClient = (XceiverClientRatis) client; + long containerId = container1.getContainerInfo().getContainerID(); + XceiverClientReply reply = client.sendCommandAsync(ContainerTestHelper + .getCreateContainerRequest(containerId, client.getPipeline())); + reply.getResponse().get(); + Assert.assertEquals(3, ratisClient.getCommitInfoMap().size()); + List pipelineList = new ArrayList<>(); + pipelineList.add(pipeline); + ContainerTestHelper.waitForPipelineClose(pipelineList, cluster); + try { + // just watch for a lo index which in not updated in the commitInfo Map + client.watchForCommit(reply.getLogIndex() + 1, 20000); + Assert.fail("Expected exception not thrown"); + } catch(Exception e) { + Assert.assertTrue(HddsClientUtils + .checkForException(e) instanceof GroupMismatchException); + } + clientManager.releaseClient(client, false); + shutdown(); + } + + private OzoneOutputStream createKey(String keyName, ReplicationType type, + long size) throws Exception { + return ContainerTestHelper + .createKey(keyName, type, size, objectStore, volumeName, bucketName); + } + + private void validateData(String keyName, byte[] data) throws Exception { + ContainerTestHelper + .validateData(keyName, data, objectStore, volumeName, bucketName); + } +} diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ContainerTestHelper.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ContainerTestHelper.java index 463e42889a351..6f5b14e6e3270 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ContainerTestHelper.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ContainerTestHelper.java @@ -33,6 +33,7 @@ import java.util.UUID; import java.util.concurrent.TimeoutException; +import org.apache.commons.math3.stat.descriptive.rank.Min; import org.apache.hadoop.conf.StorageUnit; import org.apache.hadoop.hdds.HddsUtils; import org.apache.hadoop.hdds.client.BlockID; @@ -745,7 +746,10 @@ public static void waitForPipelineClose(OzoneOutputStream outputStream, keyOutputStream.getLocationInfoList(); List containerIdList = new ArrayList<>(); for (OmKeyLocationInfo info : locationInfoList) { - containerIdList.add(info.getContainerID()); + long id = info.getContainerID(); + if (!containerIdList.contains(id)) { + containerIdList.add(id); + } } Assert.assertTrue(!containerIdList.isEmpty()); waitForPipelineClose(cluster, waitForContainerCreation, @@ -784,6 +788,12 @@ public static void waitForPipelineClose(MiniOzoneCluster cluster, } } } + waitForPipelineClose(pipelineList, cluster); + } + + public static void waitForPipelineClose(List pipelineList, + MiniOzoneCluster cluster) + throws TimeoutException, InterruptedException, IOException { for (Pipeline pipeline1 : pipelineList) { // issue pipeline destroy command cluster.getStorageContainerManager().getPipelineManager() From 259c1c158ede9d795bfd1788df934579879adef0 Mon Sep 17 00:00:00 2001 From: Shashikant Banerjee Date: Wed, 24 Apr 2019 17:14:46 +0530 Subject: [PATCH 2/7] Addressed review comments --- .../java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java index 033693605ed39..59cb115f9dca5 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java @@ -575,7 +575,7 @@ public Builder setMaxRetryCount(int maxCount) { return this; } - public KeyOutputStream build() throws IOException { + public KeyOutputStream build() { return new KeyOutputStream(openHandler, xceiverManager, omClient, chunkSize, requestID, factor, type, streamBufferFlushSize, streamBufferMaxSize, blockSize, watchTimeout, checksumType, From f7a42feb17516e0aa9770f68677d49d9db45a40c Mon Sep 17 00:00:00 2001 From: Shashikant Banerjee Date: Wed, 24 Apr 2019 17:12:52 +0530 Subject: [PATCH 3/7] HDDS-1395. Key write fails with BlockOutputStream has been closed exception (#749). --- .../org/apache/hadoop/ozone/client/io/KeyOutputStream.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java index bd0b57388dd34..e09c4734b3631 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java @@ -584,8 +584,8 @@ public Builder setRetryInterval(long retryIntervalInMS) { } public KeyOutputStream build() throws IOException { - return new KeyOutputStream(openHandler, xceiverManager, - omClient, chunkSize, requestID, factor, type, streamBufferFlushSize, + return new KeyOutputStream(openHandler, xceiverManager, omClient, + chunkSize, requestID, factor, type, streamBufferFlushSize, streamBufferMaxSize, blockSize, watchTimeout, checksumType, bytesPerChecksum, multipartUploadID, multipartNumber, isMultipartKey, maxRetryCount, retryInterval); From 7813d227bdd328b66f2a91ed23a0ce6c1e63a2b9 Mon Sep 17 00:00:00 2001 From: Shashikant Banerjee Date: Mon, 29 Apr 2019 20:15:49 +0530 Subject: [PATCH 4/7] Rebased and Fixed checkstyle warnings. --- .../ozone/client/rpc/TestWatchForCommit.java | 94 +++++++++---------- 1 file changed, 47 insertions(+), 47 deletions(-) diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestWatchForCommit.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestWatchForCommit.java index f9502ba5dad8e..e17c2fbffefc5 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestWatchForCommit.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestWatchForCommit.java @@ -55,7 +55,7 @@ import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_STALENODE_INTERVAL; /** - * This class verifies the watchForCommit Handling by client. + * This class verifies the watchForCommit Handling by xceiverClient. */ public class TestWatchForCommit { @@ -288,31 +288,31 @@ public void testWatchForCommitWithSmallerTimeoutValue() throws Exception { ContainerWithPipeline container1 = storageContainerLocationClient .allocateContainer(HddsProtos.ReplicationType.RATIS, HddsProtos.ReplicationFactor.THREE, containerOwner); - XceiverClientSpi client = clientManager + XceiverClientSpi xceiverClient = clientManager .acquireClient(container1.getPipeline()); - Assert.assertEquals(1, client.getRefcount()); + Assert.assertEquals(1, xceiverClient.getRefcount()); Assert.assertEquals(container1.getPipeline(), - client.getPipeline()); - Pipeline pipeline = client.getPipeline(); + xceiverClient.getPipeline()); + Pipeline pipeline = xceiverClient.getPipeline(); XceiverClientReply reply = - client.sendCommandAsync(ContainerTestHelper.getCreateContainerRequest( + xceiverClient.sendCommandAsync(ContainerTestHelper.getCreateContainerRequest( container1.getContainerInfo().getContainerID(), - client.getPipeline())); + xceiverClient.getPipeline())); reply.getResponse().get(); long index = reply.getLogIndex(); cluster.shutdownHddsDatanode(pipeline.getNodes().get(0)); cluster.shutdownHddsDatanode(pipeline.getNodes().get(1)); try { // just watch for a lo index which in not updated in the commitInfo Map - client.watchForCommit(index + 1, 3000); + xceiverClient.watchForCommit(index + 1, 3000); Assert.fail("expected exception not thrown"); } catch (Exception e) { Assert.assertTrue( HddsClientUtils.checkForException(e) instanceof TimeoutException); } - // After releasing the client, this connection should be closed + // After releasing the xceiverClient, this connection should be closed // and any container operations should fail - clientManager.releaseClient(client, false); + clientManager.releaseClient(xceiverClient, false); shutdown(); } @@ -327,16 +327,16 @@ public void testWatchForCommitForRetryfailure() throws Exception { ContainerWithPipeline container1 = storageContainerLocationClient .allocateContainer(HddsProtos.ReplicationType.RATIS, HddsProtos.ReplicationFactor.THREE, containerOwner); - XceiverClientSpi client = clientManager + XceiverClientSpi xceiverClient = clientManager .acquireClient(container1.getPipeline()); - Assert.assertEquals(1, client.getRefcount()); + Assert.assertEquals(1, xceiverClient.getRefcount()); Assert.assertEquals(container1.getPipeline(), - client.getPipeline()); - Pipeline pipeline = client.getPipeline(); + xceiverClient.getPipeline()); + Pipeline pipeline = xceiverClient.getPipeline(); XceiverClientReply reply = - client.sendCommandAsync(ContainerTestHelper.getCreateContainerRequest( + xceiverClient.sendCommandAsync(ContainerTestHelper.getCreateContainerRequest( container1.getContainerInfo().getContainerID(), - client.getPipeline())); + xceiverClient.getPipeline())); reply.getResponse().get(); long index = reply.getLogIndex(); cluster.shutdownHddsDatanode(pipeline.getNodes().get(0)); @@ -344,13 +344,13 @@ public void testWatchForCommitForRetryfailure() throws Exception { // again write data with more than max buffer limit. This wi try { // just watch for a lo index which in not updated in the commitInfo Map - client.watchForCommit(index + 1, 20000); + xceiverClient.watchForCommit(index + 1, 20000); Assert.fail("expected exception not thrown"); } catch (Exception e) { Assert.assertTrue(HddsClientUtils .checkForException(e) instanceof RaftRetryFailureException); } - clientManager.releaseClient(client, false); + clientManager.releaseClient(xceiverClient, false); shutdown(); } @@ -368,29 +368,29 @@ public void test2WayCommitForRetryfailure() throws Exception { ContainerWithPipeline container1 = storageContainerLocationClient .allocateContainer(HddsProtos.ReplicationType.RATIS, HddsProtos.ReplicationFactor.THREE, containerOwner); - XceiverClientSpi client = clientManager + XceiverClientSpi xceiverClient = clientManager .acquireClient(container1.getPipeline()); - Assert.assertEquals(1, client.getRefcount()); + Assert.assertEquals(1, xceiverClient.getRefcount()); Assert.assertEquals(container1.getPipeline(), - client.getPipeline()); - Pipeline pipeline = client.getPipeline(); - XceiverClientRatis ratisClient = (XceiverClientRatis) client; + xceiverClient.getPipeline()); + Pipeline pipeline = xceiverClient.getPipeline(); + XceiverClientRatis ratisClient = (XceiverClientRatis) xceiverClient; XceiverClientReply reply = - client.sendCommandAsync(ContainerTestHelper.getCreateContainerRequest( + xceiverClient.sendCommandAsync(ContainerTestHelper.getCreateContainerRequest( container1.getContainerInfo().getContainerID(), - client.getPipeline())); + xceiverClient.getPipeline())); reply.getResponse().get(); Assert.assertEquals(3, ratisClient.getCommitInfoMap().size()); cluster.shutdownHddsDatanode(pipeline.getNodes().get(0)); - reply = client.sendCommandAsync(ContainerTestHelper + reply = xceiverClient.sendCommandAsync(ContainerTestHelper .getCloseContainer(pipeline, container1.getContainerInfo().getContainerID())); reply.getResponse().get(); - client.watchForCommit(reply.getLogIndex(), 20000); + xceiverClient.watchForCommit(reply.getLogIndex(), 20000); // commitInfo Map will be reduced to 2 here Assert.assertEquals(2, ratisClient.getCommitInfoMap().size()); - clientManager.releaseClient(client, false); + clientManager.releaseClient(xceiverClient, false); Assert.assertTrue(logCapturer.getOutput().contains("3 way commit failed")); Assert.assertTrue( logCapturer.getOutput().contains("RaftRetryFailureException")); @@ -414,29 +414,29 @@ public void test2WayCommitForTimeoutException() throws Exception { ContainerWithPipeline container1 = storageContainerLocationClient .allocateContainer(HddsProtos.ReplicationType.RATIS, HddsProtos.ReplicationFactor.THREE, containerOwner); - XceiverClientSpi client = clientManager + XceiverClientSpi xceiverClient = clientManager .acquireClient(container1.getPipeline()); - Assert.assertEquals(1, client.getRefcount()); + Assert.assertEquals(1, xceiverClient.getRefcount()); Assert.assertEquals(container1.getPipeline(), - client.getPipeline()); - Pipeline pipeline = client.getPipeline(); - XceiverClientRatis ratisClient = (XceiverClientRatis) client; + xceiverClient.getPipeline()); + Pipeline pipeline = xceiverClient.getPipeline(); + XceiverClientRatis ratisClient = (XceiverClientRatis) xceiverClient; XceiverClientReply reply = - client.sendCommandAsync(ContainerTestHelper.getCreateContainerRequest( + xceiverClient.sendCommandAsync(ContainerTestHelper.getCreateContainerRequest( container1.getContainerInfo().getContainerID(), - client.getPipeline())); + xceiverClient.getPipeline())); reply.getResponse().get(); Assert.assertEquals(3, ratisClient.getCommitInfoMap().size()); cluster.shutdownHddsDatanode(pipeline.getNodes().get(0)); - reply = client.sendCommandAsync(ContainerTestHelper + reply = xceiverClient.sendCommandAsync(ContainerTestHelper .getCloseContainer(pipeline, container1.getContainerInfo().getContainerID())); reply.getResponse().get(); - client.watchForCommit(reply.getLogIndex(), 3000); + xceiverClient.watchForCommit(reply.getLogIndex(), 3000); // commitInfo Map will be reduced to 2 here Assert.assertEquals(2, ratisClient.getCommitInfoMap().size()); - clientManager.releaseClient(client, false); + clientManager.releaseClient(xceiverClient, false); Assert.assertTrue(logCapturer.getOutput().contains("3 way commit failed")); Assert.assertTrue(logCapturer.getOutput().contains("TimeoutException")); Assert @@ -462,16 +462,16 @@ public void testWatchForCommitForGroupMismatchException() throws Exception { ContainerWithPipeline container1 = storageContainerLocationClient .allocateContainer(HddsProtos.ReplicationType.RATIS, HddsProtos.ReplicationFactor.THREE, containerOwner); - XceiverClientSpi client = clientManager + XceiverClientSpi xceiverClient = clientManager .acquireClient(container1.getPipeline()); - Assert.assertEquals(1, client.getRefcount()); + Assert.assertEquals(1, xceiverClient.getRefcount()); Assert.assertEquals(container1.getPipeline(), - client.getPipeline()); - Pipeline pipeline = client.getPipeline(); - XceiverClientRatis ratisClient = (XceiverClientRatis) client; + xceiverClient.getPipeline()); + Pipeline pipeline = xceiverClient.getPipeline(); + XceiverClientRatis ratisClient = (XceiverClientRatis) xceiverClient; long containerId = container1.getContainerInfo().getContainerID(); - XceiverClientReply reply = client.sendCommandAsync(ContainerTestHelper - .getCreateContainerRequest(containerId, client.getPipeline())); + XceiverClientReply reply = xceiverClient.sendCommandAsync(ContainerTestHelper + .getCreateContainerRequest(containerId, xceiverClient.getPipeline())); reply.getResponse().get(); Assert.assertEquals(3, ratisClient.getCommitInfoMap().size()); List pipelineList = new ArrayList<>(); @@ -479,13 +479,13 @@ public void testWatchForCommitForGroupMismatchException() throws Exception { ContainerTestHelper.waitForPipelineClose(pipelineList, cluster); try { // just watch for a lo index which in not updated in the commitInfo Map - client.watchForCommit(reply.getLogIndex() + 1, 20000); + xceiverClient.watchForCommit(reply.getLogIndex() + 1, 20000); Assert.fail("Expected exception not thrown"); } catch(Exception e) { Assert.assertTrue(HddsClientUtils .checkForException(e) instanceof GroupMismatchException); } - clientManager.releaseClient(client, false); + clientManager.releaseClient(xceiverClient, false); shutdown(); } From 0c6dec852c5de5060ccd47439684bbaf00a066c1 Mon Sep 17 00:00:00 2001 From: Shashikant Banerjee Date: Tue, 30 Apr 2019 14:58:25 +0530 Subject: [PATCH 5/7] Fixed Checkstyle warnings, related unit test failures and review comments discussed offline. --- .../client/io/BlockOutputStreamEntryPool.java | 23 +++++++++++-------- .../ozone/client/rpc/TestWatchForCommit.java | 23 ++++++++++--------- .../ozone/container/ContainerTestHelper.java | 18 +++++++-------- 3 files changed, 34 insertions(+), 30 deletions(-) diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntryPool.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntryPool.java index 622293a3058c5..7a8af65b85981 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntryPool.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntryPool.java @@ -174,18 +174,21 @@ private void addKeyLocationInfo(OmKeyLocationInfo subKeyInfo) public List getLocationInfoList() { List locationInfoList = new ArrayList<>(); for (BlockOutputStreamEntry streamEntry : streamEntries) { - OmKeyLocationInfo info = - new OmKeyLocationInfo.Builder().setBlockID(streamEntry.getBlockID()) - .setLength(streamEntry.getCurrentPosition()).setOffset(0) - .setToken(streamEntry.getToken()) - .setPipeline(streamEntry.getPipeline()) - .build(); - LOG.debug("block written " + streamEntry.getBlockID() + ", length " - + streamEntry.getCurrentPosition() + " bcsID " - + streamEntry.getBlockID().getBlockCommitSequenceId()); - if (streamEntry.getCurrentPosition() != 0) { + long length = streamEntry.getCurrentPosition(); + + // Commit only those blocks to OzoneManager which are not empty + if (length != 0) { + OmKeyLocationInfo info = + new OmKeyLocationInfo.Builder().setBlockID(streamEntry.getBlockID()) + .setLength(streamEntry.getCurrentPosition()).setOffset(0) + .setToken(streamEntry.getToken()) + .setPipeline(streamEntry.getPipeline()).build(); locationInfoList.add(info); } + LOG.debug( + "block written " + streamEntry.getBlockID() + ", length " + length + + " bcsID " + streamEntry.getBlockID() + .getBlockCommitSequenceId()); } return locationInfoList; } diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestWatchForCommit.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestWatchForCommit.java index e17c2fbffefc5..bb17b381c9231 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestWatchForCommit.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestWatchForCommit.java @@ -249,6 +249,7 @@ public void testWatchForCommitWithKeyWrite() throws Exception { // Make sure the retryCount is reset after the exception is handled Assert.assertTrue(keyOutputStream.getRetryCount() == 0); // now close the stream, It will update the ack length after watchForCommit + Assert.assertEquals(2, keyOutputStream.getStreamEntries().size()); key.close(); Assert .assertEquals(0, blockOutputStream.getBufferPool().computeBufferData()); @@ -272,7 +273,6 @@ public void testWatchForCommitWithKeyWrite() throws Exception { Assert .assertEquals(0, blockOutputStream.getBufferPool().computeBufferData()); Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap()); - Assert.assertEquals(2, keyOutputStream.getStreamEntries().size()); validateData(keyName, data1); shutdown(); } @@ -294,8 +294,8 @@ public void testWatchForCommitWithSmallerTimeoutValue() throws Exception { Assert.assertEquals(container1.getPipeline(), xceiverClient.getPipeline()); Pipeline pipeline = xceiverClient.getPipeline(); - XceiverClientReply reply = - xceiverClient.sendCommandAsync(ContainerTestHelper.getCreateContainerRequest( + XceiverClientReply reply = xceiverClient.sendCommandAsync( + ContainerTestHelper.getCreateContainerRequest( container1.getContainerInfo().getContainerID(), xceiverClient.getPipeline())); reply.getResponse().get(); @@ -333,8 +333,8 @@ public void testWatchForCommitForRetryfailure() throws Exception { Assert.assertEquals(container1.getPipeline(), xceiverClient.getPipeline()); Pipeline pipeline = xceiverClient.getPipeline(); - XceiverClientReply reply = - xceiverClient.sendCommandAsync(ContainerTestHelper.getCreateContainerRequest( + XceiverClientReply reply = xceiverClient.sendCommandAsync( + ContainerTestHelper.getCreateContainerRequest( container1.getContainerInfo().getContainerID(), xceiverClient.getPipeline())); reply.getResponse().get(); @@ -375,8 +375,8 @@ public void test2WayCommitForRetryfailure() throws Exception { xceiverClient.getPipeline()); Pipeline pipeline = xceiverClient.getPipeline(); XceiverClientRatis ratisClient = (XceiverClientRatis) xceiverClient; - XceiverClientReply reply = - xceiverClient.sendCommandAsync(ContainerTestHelper.getCreateContainerRequest( + XceiverClientReply reply = xceiverClient.sendCommandAsync( + ContainerTestHelper.getCreateContainerRequest( container1.getContainerInfo().getContainerID(), xceiverClient.getPipeline())); reply.getResponse().get(); @@ -421,8 +421,8 @@ public void test2WayCommitForTimeoutException() throws Exception { xceiverClient.getPipeline()); Pipeline pipeline = xceiverClient.getPipeline(); XceiverClientRatis ratisClient = (XceiverClientRatis) xceiverClient; - XceiverClientReply reply = - xceiverClient.sendCommandAsync(ContainerTestHelper.getCreateContainerRequest( + XceiverClientReply reply = xceiverClient.sendCommandAsync( + ContainerTestHelper.getCreateContainerRequest( container1.getContainerInfo().getContainerID(), xceiverClient.getPipeline())); reply.getResponse().get(); @@ -470,8 +470,9 @@ public void testWatchForCommitForGroupMismatchException() throws Exception { Pipeline pipeline = xceiverClient.getPipeline(); XceiverClientRatis ratisClient = (XceiverClientRatis) xceiverClient; long containerId = container1.getContainerInfo().getContainerID(); - XceiverClientReply reply = xceiverClient.sendCommandAsync(ContainerTestHelper - .getCreateContainerRequest(containerId, xceiverClient.getPipeline())); + XceiverClientReply reply = xceiverClient.sendCommandAsync( + ContainerTestHelper.getCreateContainerRequest(containerId, + xceiverClient.getPipeline())); reply.getResponse().get(); Assert.assertEquals(3, ratisClient.getCommitInfoMap().size()); List pipelineList = new ArrayList<>(); diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ContainerTestHelper.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ContainerTestHelper.java index 6f5b14e6e3270..034a4ae160f9e 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ContainerTestHelper.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ContainerTestHelper.java @@ -33,7 +33,6 @@ import java.util.UUID; import java.util.concurrent.TimeoutException; -import org.apache.commons.math3.stat.descriptive.rank.Min; import org.apache.hadoop.conf.StorageUnit; import org.apache.hadoop.hdds.HddsUtils; import org.apache.hadoop.hdds.client.BlockID; @@ -58,6 +57,7 @@ import org.apache.hadoop.ozone.MiniOzoneCluster; import org.apache.hadoop.ozone.OzoneConsts; import org.apache.hadoop.ozone.client.ObjectStore; +import org.apache.hadoop.ozone.client.io.BlockOutputStreamEntry; import org.apache.hadoop.ozone.client.io.KeyOutputStream; import org.apache.hadoop.ozone.client.io.OzoneInputStream; import org.apache.hadoop.ozone.client.io.OzoneOutputStream; @@ -724,11 +724,11 @@ public static void waitForContainerClose(OzoneOutputStream outputStream, MiniOzoneCluster cluster) throws Exception { KeyOutputStream keyOutputStream = (KeyOutputStream) outputStream.getOutputStream(); - List locationInfoList = - keyOutputStream.getLocationInfoList(); + List streamEntryList = + keyOutputStream.getStreamEntries(); List containerIdList = new ArrayList<>(); - for (OmKeyLocationInfo info : locationInfoList) { - long id = info.getContainerID(); + for (BlockOutputStreamEntry entry : streamEntryList) { + long id = entry.getBlockID().getContainerID(); if (!containerIdList.contains(id)) { containerIdList.add(id); } @@ -742,11 +742,11 @@ public static void waitForPipelineClose(OzoneOutputStream outputStream, throws Exception { KeyOutputStream keyOutputStream = (KeyOutputStream) outputStream.getOutputStream(); - List locationInfoList = - keyOutputStream.getLocationInfoList(); + List streamEntryList = + keyOutputStream.getStreamEntries(); List containerIdList = new ArrayList<>(); - for (OmKeyLocationInfo info : locationInfoList) { - long id = info.getContainerID(); + for (BlockOutputStreamEntry entry : streamEntryList) { + long id = entry.getBlockID().getContainerID(); if (!containerIdList.contains(id)) { containerIdList.add(id); } From 5f538a03c133ea0435edcd94a9a77175dece61ed Mon Sep 17 00:00:00 2001 From: Shashikant Banerjee Date: Thu, 2 May 2019 20:45:00 +0530 Subject: [PATCH 6/7] Fixed related unit test failures --- .../hadoop/ozone/client/rpc/TestWatchForCommit.java | 8 +++----- .../hadoop/ozone/container/ContainerTestHelper.java | 1 - 2 files changed, 3 insertions(+), 6 deletions(-) diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestWatchForCommit.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestWatchForCommit.java index bb17b381c9231..d43777618046d 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestWatchForCommit.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestWatchForCommit.java @@ -133,9 +133,9 @@ public void testWatchForCommitWithKeyWrite() throws Exception { // and will be captured in keyOutputStream and the failover will happen // to a different block OzoneConfiguration conf = new OzoneConfiguration(); - conf.setTimeDuration(OzoneConfigKeys.OZONE_CLIENT_WATCH_REQUEST_TIMEOUT, 10, + conf.setTimeDuration(OzoneConfigKeys.OZONE_CLIENT_WATCH_REQUEST_TIMEOUT, 20, TimeUnit.SECONDS); - conf.setInt(OzoneConfigKeys.DFS_RATIS_CLIENT_REQUEST_MAX_RETRIES_KEY, 2); + conf.setInt(OzoneConfigKeys.DFS_RATIS_CLIENT_REQUEST_MAX_RETRIES_KEY, 5); startCluster(conf); XceiverClientMetrics metrics = XceiverClientManager.getXceiverClientMetrics(); @@ -359,7 +359,7 @@ public void test2WayCommitForRetryfailure() throws Exception { OzoneConfiguration conf = new OzoneConfiguration(); conf.setTimeDuration(OzoneConfigKeys.OZONE_CLIENT_WATCH_REQUEST_TIMEOUT, 20, TimeUnit.SECONDS); - conf.setInt(OzoneConfigKeys.DFS_RATIS_CLIENT_REQUEST_MAX_RETRIES_KEY, 3); + conf.setInt(OzoneConfigKeys.DFS_RATIS_CLIENT_REQUEST_MAX_RETRIES_KEY, 8); startCluster(conf); GenericTestUtils.LogCapturer logCapturer = GenericTestUtils.LogCapturer.captureLogs(XceiverClientRatis.LOG); @@ -392,8 +392,6 @@ public void test2WayCommitForRetryfailure() throws Exception { Assert.assertEquals(2, ratisClient.getCommitInfoMap().size()); clientManager.releaseClient(xceiverClient, false); Assert.assertTrue(logCapturer.getOutput().contains("3 way commit failed")); - Assert.assertTrue( - logCapturer.getOutput().contains("RaftRetryFailureException")); Assert .assertTrue(logCapturer.getOutput().contains("Committed by majority")); logCapturer.stopCapturing(); diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ContainerTestHelper.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ContainerTestHelper.java index 034a4ae160f9e..4da190762b01d 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ContainerTestHelper.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ContainerTestHelper.java @@ -69,7 +69,6 @@ import org.apache.hadoop.ozone.container.common.interfaces.Container; import org.apache.hadoop.ozone.container.common.transport.server.XceiverServerSpi; import org.apache.hadoop.ozone.container.common.transport.server.ratis.XceiverServerRatis; -import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo; import org.apache.hadoop.security.token.Token; import com.google.common.base.Preconditions; From 7ce0865d116dfc80be8690c999db5522cdbd23a5 Mon Sep 17 00:00:00 2001 From: Shashikant Banerjee Date: Thu, 2 May 2019 20:54:44 +0530 Subject: [PATCH 7/7] HDDS-1224. Restructure code to validate the response from server in the Read path --- .../hadoop/hdds/scm/XceiverClientGrpc.java | 96 ++++++++++--------- .../hadoop/hdds/scm/XceiverClientRatis.java | 1 + .../hdds/scm/storage/BlockInputStream.java | 89 +++++++---------- .../hadoop/hdds/scm/XceiverClientSpi.java | 15 +-- .../hdds/scm/storage/CheckedFunction.java | 27 ++++++ .../scm/storage/ContainerProtocolCalls.java | 77 +++++++-------- 6 files changed, 156 insertions(+), 149 deletions(-) create mode 100644 hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/CheckedFunction.java diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java index a535c9f3f2aa4..837c1579fa756 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java @@ -31,12 +31,14 @@ import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.hdds.scm.client.HddsClientUtils; import org.apache.hadoop.hdds.scm.pipeline.Pipeline; +import org.apache.hadoop.hdds.scm.storage.CheckedFunction; import org.apache.hadoop.hdds.security.exception.SCMSecurityException; import org.apache.hadoop.hdds.security.x509.SecurityConfig; import org.apache.hadoop.hdds.tracing.GrpcClientInterceptor; import org.apache.hadoop.hdds.tracing.TracingUtil; import org.apache.hadoop.ozone.OzoneConfigKeys; import org.apache.hadoop.ozone.OzoneConsts; +import org.apache.hadoop.ozone.common.OzoneChecksumException; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.util.Time; @@ -83,7 +85,7 @@ public class XceiverClientGrpc extends XceiverClientSpi { * data nodes. * * @param pipeline - Pipeline that defines the machines. - * @param config -- Ozone Config + * @param config -- Ozone Config */ public XceiverClientGrpc(Pipeline pipeline, Configuration config) { super(); @@ -91,7 +93,7 @@ public XceiverClientGrpc(Pipeline pipeline, Configuration config) { Preconditions.checkNotNull(config); this.pipeline = pipeline; this.config = config; - this.secConfig = new SecurityConfig(config); + this.secConfig = new SecurityConfig(config); this.semaphore = new Semaphore(HddsClientUtils.getMaxOutstandingRequests(config)); this.metrics = XceiverClientManager.getXceiverClientMetrics(); @@ -101,9 +103,8 @@ public XceiverClientGrpc(Pipeline pipeline, Configuration config) { /** * To be used when grpc token is not enabled. - * */ - @Override - public void connect() throws Exception { + */ + @Override public void connect() throws Exception { // leader by default is the 1st datanode in the datanode list of pipleline DatanodeDetails dn = this.pipeline.getFirstNode(); // just make a connection to the 1st datanode at the beginning @@ -112,9 +113,8 @@ public void connect() throws Exception { /** * Passed encoded token to GRPC header when security is enabled. - * */ - @Override - public void connect(String encodedToken) throws Exception { + */ + @Override public void connect(String encodedToken) throws Exception { // leader by default is the 1st datanode in the datanode list of pipleline DatanodeDetails dn = this.pipeline.getFirstNode(); // just make a connection to the 1st datanode at the beginning @@ -132,11 +132,10 @@ private void connectToDatanode(DatanodeDetails dn, String encodedToken) } // Add credential context to the client call - String userName = UserGroupInformation.getCurrentUser() - .getShortUserName(); + String userName = UserGroupInformation.getCurrentUser().getShortUserName(); LOG.debug("Connecting to server Port : " + dn.getIpAddress()); - NettyChannelBuilder channelBuilder = NettyChannelBuilder.forAddress(dn - .getIpAddress(), port).usePlaintext() + NettyChannelBuilder channelBuilder = + NettyChannelBuilder.forAddress(dn.getIpAddress(), port).usePlaintext() .maxInboundMessageSize(OzoneConsts.OZONE_SCM_CHUNK_MAX_SIZE) .intercept(new ClientCredentialInterceptor(userName, encodedToken), new GrpcClientInterceptor()); @@ -149,8 +148,8 @@ private void connectToDatanode(DatanodeDetails dn, String encodedToken) if (trustCertCollectionFile != null) { sslContextBuilder.trustManager(trustCertCollectionFile); } - if (secConfig.isGrpcMutualTlsRequired() && clientCertChainFile != null && - privateKeyFile != null) { + if (secConfig.isGrpcMutualTlsRequired() && clientCertChainFile != null + && privateKeyFile != null) { sslContextBuilder.keyManager(clientCertChainFile, privateKeyFile); } @@ -174,8 +173,7 @@ private void connectToDatanode(DatanodeDetails dn, String encodedToken) * * @return True if the connection is alive, false otherwise. */ - @VisibleForTesting - public boolean isConnected(DatanodeDetails details) { + @VisibleForTesting public boolean isConnected(DatanodeDetails details) { return isConnected(channels.get(details.getUuid())); } @@ -183,8 +181,7 @@ private boolean isConnected(ManagedChannel channel) { return channel != null && !channel.isTerminated() && !channel.isShutdown(); } - @Override - public void close() { + @Override public void close() { closed = true; for (ManagedChannel channel : channels.values()) { channel.shutdownNow(); @@ -216,49 +213,46 @@ public ContainerCommandResponseProto sendCommand( } @Override - public XceiverClientReply sendCommand( - ContainerCommandRequestProto request, List excludeDns) + public ContainerCommandResponseProto sendCommand( + ContainerCommandRequestProto request, CheckedFunction function) throws IOException { Preconditions.checkState(HddsUtils.isReadOnly(request)); - return sendCommandWithTraceIDAndRetry(request, excludeDns); + try { + XceiverClientReply reply; + reply = sendCommandWithTraceIDAndRetry(request, function); + ContainerCommandResponseProto responseProto = reply.getResponse().get(); + return responseProto; + } catch (ExecutionException | InterruptedException e) { + throw new IOException("Failed to execute command " + request, e); + } } private XceiverClientReply sendCommandWithTraceIDAndRetry( - ContainerCommandRequestProto request, List excludeDns) + ContainerCommandRequestProto request, CheckedFunction function) throws IOException { try (Scope scope = GlobalTracer.get() .buildSpan("XceiverClientGrpc." + request.getCmdType().name()) .startActive(true)) { ContainerCommandRequestProto finalPayload = ContainerCommandRequestProto.newBuilder(request) - .setTraceID(TracingUtil.exportCurrentSpan()) - .build(); - return sendCommandWithRetry(finalPayload, excludeDns); + .setTraceID(TracingUtil.exportCurrentSpan()).build(); + return sendCommandWithRetry(finalPayload, function); } } private XceiverClientReply sendCommandWithRetry( - ContainerCommandRequestProto request, List excludeDns) + ContainerCommandRequestProto request, CheckedFunction function) throws IOException { ContainerCommandResponseProto responseProto = null; + IOException ioException = null; // In case of an exception or an error, we will try to read from the // datanodes in the pipeline in a round robin fashion. // TODO: cache the correct leader info in here, so that any subsequent calls // should first go to leader - List dns = pipeline.getNodes(); - List healthyDns = - excludeDns != null ? dns.stream().filter(dnId -> { - for (DatanodeDetails excludeId : excludeDns) { - if (dnId.equals(excludeId)) { - return false; - } - } - return true; - }).collect(Collectors.toList()) : dns; XceiverClientReply reply = new XceiverClientReply(null); - for (DatanodeDetails dn : healthyDns) { + for (DatanodeDetails dn : pipeline.getNodes()) { try { LOG.debug("Executing command " + request + " on datanode " + dn); // In case the command gets retried on a 2nd datanode, @@ -266,17 +260,24 @@ private XceiverClientReply sendCommandWithRetry( // in case these don't exist for the specific datanode. reply.addDatanode(dn); responseProto = sendCommandAsync(request, dn).getResponse().get(); - if (responseProto.getResult() == ContainerProtos.Result.SUCCESS) { - break; + if (function != null) { + function.apply(responseProto); } - } catch (ExecutionException | InterruptedException e) { + break; + } catch (ExecutionException | InterruptedException | IOException e) { LOG.debug("Failed to execute command " + request + " on datanode " + dn .getUuidString(), e); - if (Status.fromThrowable(e.getCause()).getCode() - == Status.UNAUTHENTICATED.getCode()) { - throw new SCMSecurityException("Failed to authenticate with " - + "GRPC XceiverServer with Ozone block token."); + if (!(e instanceof IOException)) { + if (Status.fromThrowable(e.getCause()).getCode() + == Status.UNAUTHENTICATED.getCode()) { + throw new SCMSecurityException("Failed to authenticate with " + + "GRPC XceiverServer with Ozone block token."); + } + ioException = new IOException(e); + } else { + ioException = (IOException) e; } + responseProto = null; } } @@ -284,9 +285,10 @@ private XceiverClientReply sendCommandWithRetry( reply.setResponse(CompletableFuture.completedFuture(responseProto)); return reply; } else { - throw new IOException( - "Failed to execute command " + request + " on the pipeline " - + pipeline.getId()); + Preconditions.checkNotNull(ioException); + LOG.error("Failed to execute command " + request + " on the pipeline " + + pipeline.getId()); + throw ioException; } } diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java index efd82bce7bbd5..607966d5efc7d 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java @@ -22,6 +22,7 @@ import org.apache.hadoop.hdds.HddsUtils; import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; +import org.apache.hadoop.hdds.scm.storage.CheckedFunction; import org.apache.hadoop.hdds.security.x509.SecurityConfig; import io.opentracing.Scope; diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java index 66671637bfae6..aa5960cf11cb7 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java @@ -26,6 +26,7 @@ .StorageContainerException; import org.apache.hadoop.ozone.common.Checksum; import org.apache.hadoop.ozone.common.ChecksumData; +import org.apache.hadoop.ozone.common.OzoneChecksumException; import org.apache.ratis.thirdparty.com.google.protobuf.ByteString; import org.apache.hadoop.fs.Seekable; import org.apache.hadoop.hdds.scm.XceiverClientManager; @@ -65,7 +66,7 @@ public class BlockInputStream extends InputStream implements Seekable { private long[] chunkOffset; private List buffers; private int bufferIndex; - private final boolean verifyChecksum; + private boolean verifyChecksum; /** * Creates a new BlockInputStream. @@ -286,64 +287,44 @@ private synchronized void readChunkFromContainer() throws IOException { // On every chunk read chunkIndex should be increased so as to read the // next chunk chunkIndex += 1; - XceiverClientReply reply; - ReadChunkResponseProto readChunkResponse = null; - final ChunkInfo chunkInfo = chunks.get(chunkIndex); - List excludeDns = null; + ReadChunkResponseProto readChunkResponse; ByteString byteString; - List dnList = xceiverClient.getPipeline().getNodes(); - while (true) { - try { - reply = ContainerProtocolCalls - .readChunk(xceiverClient, chunkInfo, blockID, traceID, excludeDns); - ContainerProtos.ContainerCommandResponseProto response; - response = reply.getResponse().get(); - ContainerProtocolCalls.validateContainerResponse(response); - readChunkResponse = response.getReadChunk(); - } catch (IOException e) { - if (e instanceof StorageContainerException) { - throw e; - } - throw new IOException("Unexpected OzoneException: " + e.toString(), e); - } catch (ExecutionException | InterruptedException e) { - throw new IOException( - "Failed to execute ReadChunk command for chunk " + chunkInfo - .getChunkName(), e); - } - byteString = readChunkResponse.getData(); - try { - if (byteString.size() != chunkInfo.getLen()) { - // Bytes read from chunk should be equal to chunk size. - throw new IOException(String - .format("Inconsistent read for chunk=%s len=%d bytesRead=%d", - chunkInfo.getChunkName(), chunkInfo.getLen(), - byteString.size())); - } - ChecksumData checksumData = - ChecksumData.getFromProtoBuf(chunkInfo.getChecksumData()); - if (verifyChecksum) { - Checksum.verifyChecksum(byteString, checksumData); - } - break; - } catch (IOException ioe) { - // we will end up in this situation only if the checksum mismatch - // happens or the length of the chunk mismatches. - // In this case, read should be retried on a different replica. - // TODO: Inform SCM of a possible corrupt container replica here - if (excludeDns == null) { - excludeDns = new ArrayList<>(); - } - excludeDns.addAll(reply.getDatanodes()); - if (excludeDns.size() == dnList.size()) { - throw ioe; - } - } - } - + final ChunkInfo chunkInfo = chunks.get(chunkIndex); + readChunkResponse = ContainerProtocolCalls + .readChunk(xceiverClient, chunkInfo, blockID, traceID, validator); + byteString = readChunkResponse.getData(); buffers = byteString.asReadOnlyByteBufferList(); bufferIndex = 0; } + private CheckedFunction + validator = (response) -> { + ReadChunkResponseProto readChunkResponse; + final ChunkInfo chunkInfo = chunks.get(chunkIndex); + ByteString byteString; + try { + ContainerProtocolCalls.validateContainerResponse(response); + readChunkResponse = response.getReadChunk(); + } catch (IOException e) { + if (e instanceof StorageContainerException) { + throw e; + } + throw new IOException("Unexpected OzoneException: " + e.toString(), e); + } + byteString = readChunkResponse.getData(); + if (byteString.size() != chunkInfo.getLen()) { + // Bytes read from chunk should be equal to chunk size. + throw new OzoneChecksumException(String + .format("Inconsistent read for chunk=%s len=%d bytesRead=%d", + chunkInfo.getChunkName(), chunkInfo.getLen(), byteString.size())); + } + ChecksumData checksumData = + ChecksumData.getFromProtoBuf(chunkInfo.getChecksumData()); + if (verifyChecksum) { + Checksum.verifyChecksum(byteString, checksumData); + } + }; + @Override public synchronized void seek(long pos) throws IOException { if (pos < 0 || (chunks.size() == 0 && pos > 0) diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientSpi.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientSpi.java index 1a183664a127f..9f4ad210ee91a 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientSpi.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientSpi.java @@ -32,6 +32,8 @@ import org.apache.hadoop.hdds.scm.pipeline.Pipeline; import com.google.common.annotations.VisibleForTesting; +import org.apache.hadoop.hdds.scm.storage.CheckedFunction; +import org.apache.hadoop.hdds.scm.storage.CheckedFunction; /** * A Client for the storageContainer protocol. @@ -118,18 +120,19 @@ public ContainerCommandResponseProto sendCommand( * Sends a given command to server and gets the reply back along with * the server associated info. * @param request Request - * @param excludeDns list of servers on which the command won't be sent to. + * @param function function to validate the response * @return Response to the command * @throws IOException */ - public XceiverClientReply sendCommand( - ContainerCommandRequestProto request, List excludeDns) + public ContainerCommandResponseProto sendCommand( + ContainerCommandRequestProto request, CheckedFunction function) throws IOException { try { XceiverClientReply reply; reply = sendCommandAsync(request); - reply.getResponse().get(); - return reply; + ContainerCommandResponseProto responseProto = reply.getResponse().get(); + function.apply(responseProto); + return responseProto; } catch (ExecutionException | InterruptedException e) { throw new IOException("Failed to command " + request, e); } @@ -156,7 +159,7 @@ public XceiverClientReply sendCommand( /** * Check if an specfic commitIndex is replicated to majority/all servers. * @param index index to watch for - * @param timeout timeout provided for the watch ipeartion to complete + * @param timeout timeout provided for the watch operation to complete * @return reply containing the min commit index replicated to all or majority * servers in case of a failure * @throws InterruptedException diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/CheckedFunction.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/CheckedFunction.java new file mode 100644 index 0000000000000..3c722b6c33411 --- /dev/null +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/CheckedFunction.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdds.scm.storage; + + +import java.io.IOException; + +@FunctionalInterface +public interface CheckedFunction { + void apply(IN input) throws THROWABLE; +} diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java index 5a1a75eb90fd9..8df94326dc8b5 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java @@ -116,9 +116,8 @@ public static GetBlockResponseProto getBlock(XceiverClientSpi xceiverClient, } ContainerCommandRequestProto request = builder.build(); - ContainerCommandResponseProto response = xceiverClient.sendCommand(request); - validateContainerResponse(response); - + ContainerCommandResponseProto response = + xceiverClient.sendCommand(request, validator); return response.getGetBlock(); } @@ -153,8 +152,8 @@ public static GetBlockResponseProto getBlock(XceiverClientSpi xceiverClient, builder.setEncodedToken(encodedToken); } ContainerCommandRequestProto request = builder.build(); - ContainerCommandResponseProto response = xceiverClient.sendCommand(request); - validateContainerResponse(response); + ContainerCommandResponseProto response = + xceiverClient.sendCommand(request, validator); return response.getGetCommittedBlockLength(); } @@ -184,8 +183,8 @@ public static ContainerProtos.PutBlockResponseProto putBlock( builder.setEncodedToken(encodedToken); } ContainerCommandRequestProto request = builder.build(); - ContainerCommandResponseProto response = xceiverClient.sendCommand(request); - validateContainerResponse(response); + ContainerCommandResponseProto response = + xceiverClient.sendCommand(request, validator); return response.getPutBlock(); } @@ -228,35 +227,31 @@ public static XceiverClientReply putBlockAsync( * @param chunk information about chunk to read * @param blockID ID of the block * @param traceID container protocol call args - * @param excludeDns datamode to exclude while executing the command + * @param validator function to validate the response * @return container protocol read chunk response * @throws IOException if there is an I/O error while performing the call */ - public static XceiverClientReply readChunk(XceiverClientSpi xceiverClient, - ChunkInfo chunk, BlockID blockID, String traceID, - List excludeDns) - throws IOException { - ReadChunkRequestProto.Builder readChunkRequest = ReadChunkRequestProto - .newBuilder() - .setBlockID(blockID.getDatanodeBlockIDProtobuf()) - .setChunkData(chunk); + public static ContainerProtos.ReadChunkResponseProto readChunk( + XceiverClientSpi xceiverClient, ChunkInfo chunk, BlockID blockID, + String traceID, CheckedFunction validator) throws IOException { + ReadChunkRequestProto.Builder readChunkRequest = + ReadChunkRequestProto.newBuilder() + .setBlockID(blockID.getDatanodeBlockIDProtobuf()) + .setChunkData(chunk); String id = xceiverClient.getPipeline().getFirstNode().getUuidString(); - ContainerCommandRequestProto.Builder builder = ContainerCommandRequestProto - .newBuilder() - .setCmdType(Type.ReadChunk) - .setContainerID(blockID.getContainerID()) - .setTraceID(traceID) - .setDatanodeUuid(id) - .setReadChunk(readChunkRequest); + ContainerCommandRequestProto.Builder builder = + ContainerCommandRequestProto.newBuilder().setCmdType(Type.ReadChunk) + .setContainerID(blockID.getContainerID()).setTraceID(traceID) + .setDatanodeUuid(id).setReadChunk(readChunkRequest); String encodedToken = getEncodedBlockToken(new Text(blockID. getContainerBlockID().toString())); if (encodedToken != null) { builder.setEncodedToken(encodedToken); } ContainerCommandRequestProto request = builder.build(); - XceiverClientReply reply = - xceiverClient.sendCommand(request, excludeDns); - return reply; + ContainerCommandResponseProto reply = + xceiverClient.sendCommand(request, validator); + return reply.getReadChunk(); } /** @@ -291,8 +286,7 @@ public static void writeChunk(XceiverClientSpi xceiverClient, ChunkInfo chunk, builder.setEncodedToken(encodedToken); } ContainerCommandRequestProto request = builder.build(); - ContainerCommandResponseProto response = xceiverClient.sendCommand(request); - validateContainerResponse(response); + xceiverClient.sendCommand(request, validator); } /** @@ -384,8 +378,8 @@ public static PutSmallFileResponseProto writeSmallFile( builder.setEncodedToken(encodedToken); } ContainerCommandRequestProto request = builder.build(); - ContainerCommandResponseProto response = client.sendCommand(request); - validateContainerResponse(response); + ContainerCommandResponseProto response = + client.sendCommand(request, validator); return response.getPutSmallFile(); } @@ -416,9 +410,7 @@ public static void createContainer(XceiverClientSpi client, long containerID, request.setCreateContainer(createRequest.build()); request.setDatanodeUuid(id); request.setTraceID(traceID); - ContainerCommandResponseProto response = client.sendCommand( - request.build()); - validateContainerResponse(response); + client.sendCommand(request.build(), validator); } /** @@ -444,12 +436,10 @@ public static void deleteContainer(XceiverClientSpi client, long containerID, request.setDeleteContainer(deleteRequest); request.setTraceID(traceID); request.setDatanodeUuid(id); - if(encodedToken != null) { + if (encodedToken != null) { request.setEncodedToken(encodedToken); } - ContainerCommandResponseProto response = - client.sendCommand(request.build()); - validateContainerResponse(response); + client.sendCommand(request.build(), validator); } /** @@ -505,8 +495,7 @@ public static ReadContainerResponseProto readContainer( request.setEncodedToken(encodedToken); } ContainerCommandResponseProto response = - client.sendCommand(request.build()); - validateContainerResponse(response); + client.sendCommand(request.build(), validator); return response.getReadContainer(); } @@ -544,9 +533,8 @@ public static GetSmallFileResponseProto readSmallFile(XceiverClientSpi client, builder.setEncodedToken(encodedToken); } ContainerCommandRequestProto request = builder.build(); - ContainerCommandResponseProto response = client.sendCommand(request); - validateContainerResponse(response); - + ContainerCommandResponseProto response = + client.sendCommand(request, validator); return response.getGetSmallFile(); } @@ -598,4 +586,9 @@ private static Text getService(DatanodeBlockID blockId) { .append(blockId.getLocalID()) .toString()); } + + private static CheckedFunction + + validator = (response) -> + validateContainerResponse(response); }