diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockDataStreamOutput.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockDataStreamOutput.java index f658df1af9c2..39ec2f921961 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockDataStreamOutput.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockDataStreamOutput.java @@ -36,21 +36,18 @@ import org.apache.hadoop.hdds.scm.pipeline.Pipeline; import org.apache.hadoop.ozone.common.Checksum; import org.apache.hadoop.ozone.common.ChecksumData; -import org.apache.hadoop.ozone.common.ChunkBuffer; import org.apache.hadoop.ozone.common.OzoneChecksumException; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.TokenIdentifier; import org.apache.ratis.client.api.DataStreamOutput; import org.apache.ratis.io.StandardWriteOption; import org.apache.ratis.protocol.DataStreamReply; -import org.apache.ratis.thirdparty.com.google.protobuf.ByteString; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; import java.util.ArrayList; import java.util.List; -import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; import java.util.concurrent.ExecutionException; @@ -93,7 +90,6 @@ public class BlockDataStreamOutput implements ByteBufStreamOutput { private int chunkIndex; private final AtomicLong chunkOffset = new AtomicLong(); - private final BufferPool bufferPool; // The IOException will be set by response handling thread in case there is an // exception received in the response. If the exception is set, the next // request will fail upfront. @@ -106,28 +102,16 @@ public class BlockDataStreamOutput implements ByteBufStreamOutput { // effective data write attempted so far for the block private long writtenDataLength; - // List containing buffers for which the putBlock call will - // update the length in the datanodes. This list will just maintain - // references to the buffers in the BufferPool which will be cleared - // when the watchForCommit acknowledges a putBlock logIndex has been - // committed on all datanodes. This list will be a place holder for buffers - // which got written between successive putBlock calls. - private List bufferList; - // This object will maintain the commitIndexes and byteBufferList in order // Also, corresponding to the logIndex, the corresponding list of buffers will // be released from the buffer pool. - private final CommitWatcher commitWatcher; + private final StreamCommitWatcher commitWatcher; private final List failedServers; private final Checksum checksum; //number of buffers used before doing a flush/putBlock. private int flushPeriod; - //bytes remaining to write in the current buffer. - private int currentBufferRemaining; - //current buffer allocated to write - private ChunkBuffer currentBuffer; private final Token token; private final DataStreamOutput out; private CompletableFuture dataStreamCloseReply; @@ -141,13 +125,11 @@ public class BlockDataStreamOutput implements ByteBufStreamOutput { * @param blockID block ID * @param xceiverClientManager client manager that controls client * @param pipeline pipeline where block will be written - * @param bufferPool pool of buffers */ public BlockDataStreamOutput( BlockID blockID, XceiverClientFactory xceiverClientManager, Pipeline pipeline, - BufferPool bufferPool, OzoneClientConfig config, Token token ) throws IOException { @@ -163,11 +145,8 @@ public BlockDataStreamOutput( (XceiverClientRatis)xceiverClientManager.acquireClient(pipeline); // Alternatively, stream setup can be delayed till the first chunk write. this.out = setupStream(); - this.bufferPool = bufferPool; this.token = token; - //number of buffers used before doing a flush - refreshCurrentBuffer(bufferPool); flushPeriod = (int) (config.getStreamBufferFlushSize() / config .getStreamBufferSize()); @@ -178,8 +157,7 @@ public BlockDataStreamOutput( // A single thread executor handle the responses of async requests responseExecutor = Executors.newSingleThreadExecutor(); - commitWatcher = new CommitWatcher(bufferPool, xceiverClient); - bufferList = null; + commitWatcher = new StreamCommitWatcher(xceiverClient); totalDataFlushedLength = 0; writtenDataLength = 0; failedServers = new ArrayList<>(0); @@ -209,20 +187,10 @@ private DataStreamOutput setupStream() throws IOException { .stream(message.getContent().asReadOnlyByteBuffer()); } - private void refreshCurrentBuffer(BufferPool pool) { - currentBuffer = pool.getCurrentBuffer(); - currentBufferRemaining = - currentBuffer != null ? currentBuffer.remaining() : 0; - } - public BlockID getBlockID() { return blockID.get(); } - public long getTotalAckDataLength() { - return commitWatcher.getTotalAckDataLength(); - } - public long getWrittenDataLength() { return writtenDataLength; } @@ -236,82 +204,29 @@ public XceiverClientRatis getXceiverClient() { return xceiverClient; } - @VisibleForTesting - public long getTotalDataFlushedLength() { - return totalDataFlushedLength; - } - - @VisibleForTesting - public BufferPool getBufferPool() { - return bufferPool; - } - public IOException getIoException() { return ioException.get(); } - @VisibleForTesting - public Map> getCommitIndex2flushedDataMap() { - return commitWatcher.getCommitIndex2flushedDataMap(); - } - @Override - public void write(ByteBuf b) throws IOException { + public void write(ByteBuf buf) throws IOException { checkOpen(); - if (b == null) { + if (buf == null) { throw new NullPointerException(); } - int off = b.readerIndex(); - int len = b.readableBytes(); - - while (len > 0) { - allocateNewBufferIfNeeded(); - final int writeLen = Math.min(currentBufferRemaining, len); - // TODO: avoid buffer copy here - currentBuffer.put(b.nioBuffer(off, writeLen)); - currentBufferRemaining -= writeLen; - writeChunkIfNeeded(); - off += writeLen; - len -= writeLen; - writtenDataLength += writeLen; - doFlushOrWatchIfNeeded(); - } - } - - private void writeChunkIfNeeded() throws IOException { - if (currentBufferRemaining == 0) { - writeChunk(currentBuffer); - } - } - - private void doFlushOrWatchIfNeeded() throws IOException { - if (currentBufferRemaining == 0) { - if (bufferPool.getNumberOfUsedBuffers() % flushPeriod == 0) { - updateFlushLength(); - executePutBlock(false, false); - } - // Data in the bufferPool can not exceed streamBufferMaxSize - if (bufferPool.getNumberOfUsedBuffers() == bufferPool.getCapacity()) { - handleFullBuffer(); - } + final int len = buf.readableBytes(); + if (len == 0) { + return; } - } + writeChunkToContainer(buf); - private void allocateNewBufferIfNeeded() { - if (currentBufferRemaining == 0) { - currentBuffer = bufferPool.allocateBuffer(config.getBufferIncrement()); - currentBufferRemaining = currentBuffer.remaining(); - } + writtenDataLength += len; } private void updateFlushLength() { totalDataFlushedLength = writtenDataLength; } - private boolean isBufferPoolFull() { - return bufferPool.computeBufferData() == config.getStreamBufferMaxSize(); - } - /** * Will be called on the retryPath in case closedContainerException/ * TimeoutException. @@ -319,70 +234,9 @@ private boolean isBufferPoolFull() { * @throws IOException if error occurred */ - // In this case, the data is already cached in the currentBuffer. + // TODO: We need add new retry policy without depend on bufferPool. public void writeOnRetry(long len) throws IOException { - if (len == 0) { - return; - } - if (LOG.isDebugEnabled()) { - LOG.debug("Retrying write length {} for blockID {}", len, blockID); - } - Preconditions.checkArgument(len <= config.getStreamBufferMaxSize()); - int count = 0; - while (len > 0) { - ChunkBuffer buffer = bufferPool.getBuffer(count); - long writeLen = Math.min(buffer.position(), len); - if (!buffer.hasRemaining()) { - writeChunk(buffer); - } - len -= writeLen; - count++; - writtenDataLength += writeLen; - // we should not call isBufferFull/shouldFlush here. - // The buffer might already be full as whole data is already cached in - // the buffer. We should just validate - // if we wrote data of size streamBufferMaxSize/streamBufferFlushSize to - // call for handling full buffer/flush buffer condition. - if (writtenDataLength % config.getStreamBufferFlushSize() == 0) { - // reset the position to zero as now we will be reading the - // next buffer in the list - updateFlushLength(); - executePutBlock(false, false); - } - if (writtenDataLength == config.getStreamBufferMaxSize()) { - handleFullBuffer(); - } - } - } - /** - * This is a blocking call. It will wait for the flush till the commit index - * at the head of the commitIndex2flushedDataMap gets replicated to all or - * majority. - * @throws IOException - */ - private void handleFullBuffer() throws IOException { - try { - checkOpen(); - if (!commitWatcher.getFutureMap().isEmpty()) { - waitOnFlushFutures(); - } - } catch (ExecutionException e) { - handleExecutionException(e); - } catch (InterruptedException ex) { - Thread.currentThread().interrupt(); - handleInterruptedException(ex, true); - } - watchForCommit(true); - } - - - // It may happen that once the exception is encountered , we still might - // have successfully flushed up to a certain index. Make sure the buffers - // only contain data which have not been sufficiently replicated - private void adjustBuffersOnException() { - commitWatcher.releaseBuffersOnException(); - refreshCurrentBuffer(bufferPool); } /** @@ -397,7 +251,8 @@ private void watchForCommit(boolean bufferFull) throws IOException { checkOpen(); try { XceiverClientReply reply = bufferFull ? - commitWatcher.watchOnFirstIndex() : commitWatcher.watchOnLastIndex(); + commitWatcher.streamWatchOnFirstIndex() : + commitWatcher.streamWatchOnLastIndex(); if (reply != null) { List dnList = reply.getDatanodes(); if (!dnList.isEmpty()) { @@ -412,7 +267,6 @@ private void watchForCommit(boolean bufferFull) throws IOException { setIoException(ioe); throw getIoException(); } - refreshCurrentBuffer(bufferPool); } @@ -426,22 +280,7 @@ ContainerCommandResponseProto> executePutBlock(boolean close, boolean force) throws IOException { checkOpen(); long flushPos = totalDataFlushedLength; - final List byteBufferList; - if (!force) { - Preconditions.checkNotNull(bufferList); - byteBufferList = bufferList; - bufferList = null; - Preconditions.checkNotNull(byteBufferList); - } else { - byteBufferList = null; - } - - try { - CompletableFuture.allOf(futures.toArray(EMPTY_FUTURE_ARRAY)).get(); - } catch (Exception e) { - LOG.warn("Failed to write all chunks through stream: " + e); - throw new IOException(e); - } + flush(); if (close) { dataStreamCloseReply = out.closeAsync(); } @@ -471,15 +310,12 @@ ContainerCommandResponseProto> executePutBlock(boolean close, if (LOG.isDebugEnabled()) { LOG.debug( "Adding index " + asyncReply.getLogIndex() + " commitMap size " - + commitWatcher.getCommitInfoMapSize() + " flushLength " - + flushPos + " numBuffers " + byteBufferList.size() - + " blockID " + blockID + " bufferPool size" + bufferPool - .getSize() + " currentBufferIndex " + bufferPool - .getCurrentBufferIndex()); + + commitWatcher.getCommitInfoSetSize() + " flushLength " + + flushPos + " blockID " + blockID); } // for standalone protocol, logIndex will always be 0. - commitWatcher - .updateCommitInfoMap(asyncReply.getLogIndex(), byteBufferList); + commitWatcher.updateCommitInfoSet( + asyncReply.getLogIndex()); } return e; }, responseExecutor).exceptionally(e -> { @@ -503,36 +339,12 @@ ContainerCommandResponseProto> executePutBlock(boolean close, @Override public void flush() throws IOException { - if (xceiverClientFactory != null && xceiverClient != null - && bufferPool != null && bufferPool.getSize() > 0 - && (!config.isStreamBufferFlushDelay() || - writtenDataLength - totalDataFlushedLength - >= config.getStreamBufferSize())) { - try { - handleFlush(false); - } catch (ExecutionException e) { - // just set the exception here as well in order to maintain sanctity of - // ioException field - handleExecutionException(e); - } catch (InterruptedException ex) { - Thread.currentThread().interrupt(); - handleInterruptedException(ex, true); - } - } - } - - private void writeChunk(ChunkBuffer buffer) - throws IOException { - // This data in the buffer will be pushed to datanode and a reference will - // be added to the bufferList. Once putBlock gets executed, this list will - // be marked null. Hence, during first writeChunk call after every putBlock - // call or during the first call to writeChunk here, the list will be null. - - if (bufferList == null) { - bufferList = new ArrayList<>(); + try { + CompletableFuture.allOf(futures.toArray(EMPTY_FUTURE_ARRAY)).get(); + } catch (Exception e) { + LOG.warn("Failed to write all chunks through stream: " + e); + throw new IOException(e); } - bufferList.add(buffer); - writeChunkToContainer(buffer.duplicate(0, buffer.position())); } /** @@ -543,11 +355,6 @@ private void handleFlush(boolean close) checkOpen(); // flush the last chunk data residing on the currentBuffer if (totalDataFlushedLength < writtenDataLength) { - refreshCurrentBuffer(bufferPool); - Preconditions.checkArgument(currentBuffer.position() > 0); - if (currentBuffer.hasRemaining()) { - writeChunk(currentBuffer); - } // This can be a partially filled chunk. Since we are flushing the buffer // here, we just limit this buffer to the current position. So that next // write will happen in new buffer @@ -570,8 +377,7 @@ private void handleFlush(boolean close) @Override public void close() throws IOException { - if (xceiverClientFactory != null && xceiverClient != null - && bufferPool != null && bufferPool.getSize() > 0) { + if (xceiverClientFactory != null && xceiverClient != null) { try { handleFlush(true); dataStreamCloseReply.get(); @@ -583,10 +389,6 @@ public void close() throws IOException { } finally { cleanup(false); } - // TODO: Turn the below buffer empty check on when Standalone pipeline - // is removed in the write path in tests - // Preconditions.checkArgument(buffer.position() == 0); - // bufferPool.checkBufferPoolEmpty(); } } @@ -638,10 +440,6 @@ public void cleanup(boolean invalidateClient) { xceiverClientFactory = null; xceiverClient = null; commitWatcher.cleanup(); - if (bufferList != null) { - bufferList.clear(); - } - bufferList = null; responseExecutor.shutdown(); } @@ -655,7 +453,6 @@ private void checkOpen() throws IOException { if (isClosed()) { throw new IOException("BlockDataStreamOutput has been closed."); } else if (getIoException() != null) { - adjustBuffersOnException(); throw getIoException(); } } @@ -683,12 +480,11 @@ private boolean needSync(long position) { * @throws OzoneChecksumException if there is an error while computing * checksum */ - private void writeChunkToContainer(ChunkBuffer chunk) throws IOException { - int effectiveChunkSize = chunk.remaining(); + private void writeChunkToContainer(ByteBuf buf) + throws IOException { + ChecksumData checksumData = checksum.computeChecksum(buf.nioBuffer()); + int effectiveChunkSize = buf.readableBytes(); final long offset = chunkOffset.getAndAdd(effectiveChunkSize); - final ByteString data = chunk.toByteString( - bufferPool.byteStringConversion()); - ChecksumData checksumData = checksum.computeChecksum(chunk); ChunkInfo chunkInfo = ChunkInfo.newBuilder() .setChunkName(blockID.get().getLocalID() + "_chunk_" + ++chunkIndex) .setOffset(offset) @@ -703,21 +499,22 @@ private void writeChunkToContainer(ChunkBuffer chunk) throws IOException { CompletableFuture future = (needSync(offset + effectiveChunkSize) ? - out.writeAsync(data.asReadOnlyByteBuffer(), StandardWriteOption.SYNC) : - out.writeAsync(data.asReadOnlyByteBuffer())) - .whenCompleteAsync((r, e) -> { - if (e != null || !r.isSuccess()) { - if (e == null) { - e = new IOException("result is not success"); - } - String msg = "Failed to write chunk " + chunkInfo.getChunkName() + - " " + "into block " + blockID; - LOG.debug("{}, exception: {}", msg, e.getLocalizedMessage()); - CompletionException ce = new CompletionException(msg, e); - setIoException(ce); - throw ce; - } - }, responseExecutor); + out.writeAsync(buf.nioBuffer(), StandardWriteOption.SYNC) : + out.writeAsync(buf.nioBuffer())) + .whenCompleteAsync((r, e) -> { + if (e != null || !r.isSuccess()) { + if (e == null) { + e = new IOException("result is not success"); + } + String msg = + "Failed to write chunk " + chunkInfo.getChunkName() + + " " + "into block " + blockID; + LOG.debug("{}, exception: {}", msg, e.getLocalizedMessage()); + CompletionException ce = new CompletionException(msg, e); + setIoException(ce); + throw ce; + } + }, responseExecutor); futures.add(future); containerBlockData.addChunks(chunkInfo); @@ -754,7 +551,6 @@ private void handleInterruptedException(Exception ex, */ private void handleExecutionException(Exception ex) throws IOException { setIoException(ex); - adjustBuffersOnException(); throw getIoException(); } } diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/StreamCommitWatcher.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/StreamCommitWatcher.java new file mode 100644 index 000000000000..c187ffe902ba --- /dev/null +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/StreamCommitWatcher.java @@ -0,0 +1,166 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * This class maintains the map of the commitIndexes to be watched for + * successful replication in the datanodes in a given pipeline. It also releases + * the buffers associated with the user data back to {@Link BufferPool} once + * minimum replication criteria is achieved during an ozone key write. + */ +package org.apache.hadoop.hdds.scm.storage; + +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandResponseProto; +import org.apache.hadoop.hdds.scm.XceiverClientReply; +import org.apache.hadoop.hdds.scm.XceiverClientSpi; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ConcurrentSkipListSet; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeoutException; + +/** + * This class executes watchForCommit on ratis pipeline and releases + * buffers once data successfully gets replicated. + */ +public class StreamCommitWatcher { + + private static final Logger LOG = + LoggerFactory.getLogger(StreamCommitWatcher.class); + + private Set commitIndexSet; + + // future Map to hold up all putBlock futures + private ConcurrentHashMap> + futureMap; + + private XceiverClientSpi xceiverClient; + + public StreamCommitWatcher(XceiverClientSpi xceiverClient) { + this.xceiverClient = xceiverClient; + commitIndexSet = new ConcurrentSkipListSet(); + futureMap = new ConcurrentHashMap<>(); + } + + public void updateCommitInfoSet(long index) { + commitIndexSet.add(index); + } + + int getCommitInfoSetSize() { + return commitIndexSet.size(); + } + + /** + * Calls watch for commit for the first index in commitIndex2flushedDataMap to + * the Ratis client. + * @return {@link XceiverClientReply} reply from raft client + * @throws IOException in case watchForCommit fails + */ + public XceiverClientReply streamWatchOnFirstIndex() throws IOException { + if (!commitIndexSet.isEmpty()) { + // wait for the first commit index in the commitIndex2flushedDataMap + // to get committed to all or majority of nodes in case timeout + // happens. + long index = + commitIndexSet.stream().mapToLong(v -> v).min() + .getAsLong(); + if (LOG.isDebugEnabled()) { + LOG.debug("waiting for first index {} to catch up", index); + } + return streamWatchForCommit(index); + } else { + return null; + } + } + + /** + * Calls watch for commit for the last index in commitIndex2flushedDataMap to + * the Ratis client. + * @return {@link XceiverClientReply} reply from raft client + * @throws IOException in case watchForCommit fails + */ + public XceiverClientReply streamWatchOnLastIndex() + throws IOException { + if (!commitIndexSet.isEmpty()) { + // wait for the commit index in the commitIndex2flushedDataMap + // to get committed to all or majority of nodes in case timeout + // happens. + long index = + commitIndexSet.stream().mapToLong(v -> v).max() + .getAsLong(); + if (LOG.isDebugEnabled()) { + LOG.debug("waiting for last flush Index {} to catch up", index); + } + return streamWatchForCommit(index); + } else { + return null; + } + } + + /** + * calls watchForCommit API of the Ratis Client. This method is for streaming + * and no longer requires releaseBuffers + * @param commitIndex log index to watch for + * @return minimum commit index replicated to all nodes + * @throws IOException IOException in case watch gets timed out + */ + public XceiverClientReply streamWatchForCommit(long commitIndex) + throws IOException { + try { + XceiverClientReply reply = + xceiverClient.watchForCommit(commitIndex); + return reply; + } catch (InterruptedException e) { + // Re-interrupt the thread while catching InterruptedException + Thread.currentThread().interrupt(); + throw getIOExceptionForWatchForCommit(commitIndex, e); + } catch (TimeoutException | ExecutionException e) { + throw getIOExceptionForWatchForCommit(commitIndex, e); + } + } + + private IOException getIOExceptionForWatchForCommit(long commitIndex, + Exception e) { + LOG.warn("watchForCommit failed for index {}", commitIndex, e); + IOException ioException = new IOException( + "Unexpected Storage Container Exception: " + e.toString(), e); + return ioException; + } + + public ConcurrentMap> getFutureMap() { + return futureMap; + } + + public void cleanup() { + if (commitIndexSet != null) { + commitIndexSet.clear(); + } + if (futureMap != null) { + futureMap.clear(); + } + commitIndexSet = null; + } +} diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockDataStreamOutputEntry.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockDataStreamOutputEntry.java index 695474260176..98907bf8af4b 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockDataStreamOutputEntry.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockDataStreamOutputEntry.java @@ -25,7 +25,6 @@ import org.apache.hadoop.hdds.scm.XceiverClientFactory; import org.apache.hadoop.hdds.scm.pipeline.Pipeline; import org.apache.hadoop.hdds.scm.storage.BlockDataStreamOutput; -import org.apache.hadoop.hdds.scm.storage.BufferPool; import org.apache.hadoop.hdds.scm.storage.ByteBufStreamOutput; import org.apache.hadoop.hdds.security.token.OzoneBlockTokenIdentifier; import org.apache.hadoop.security.token.Token; @@ -52,15 +51,12 @@ public final class BlockDataStreamOutputEntry private long currentPosition; private final Token token; - private BufferPool bufferPool; - @SuppressWarnings({"parameternumber", "squid:S00107"}) private BlockDataStreamOutputEntry( BlockID blockID, String key, XceiverClientFactory xceiverClientManager, Pipeline pipeline, long length, - BufferPool bufferPool, Token token, OzoneClientConfig config ) { @@ -73,7 +69,6 @@ private BlockDataStreamOutputEntry( this.token = token; this.length = length; this.currentPosition = 0; - this.bufferPool = bufferPool; } long getLength() { @@ -98,7 +93,7 @@ private void checkStream() throws IOException { if (this.byteBufStreamOutput == null) { this.byteBufStreamOutput = new BlockDataStreamOutput(blockID, xceiverClientManager, - pipeline, bufferPool, config, token); + pipeline, config, token); } } @@ -135,20 +130,6 @@ boolean isClosed() { return false; } - long getTotalAckDataLength() { - if (byteBufStreamOutput != null) { - BlockDataStreamOutput out = - (BlockDataStreamOutput) this.byteBufStreamOutput; - blockID = out.getBlockID(); - return out.getTotalAckDataLength(); - } else { - // For a pre allocated block for which no write has been initiated, - // the ByteBufStreamOutput will be null here. - // In such cases, the default blockCommitSequenceId will be 0 - return 0; - } - } - Collection getFailedServers() { if (byteBufStreamOutput != null) { BlockDataStreamOutput out = @@ -198,7 +179,6 @@ public static class Builder { private XceiverClientFactory xceiverClientManager; private Pipeline pipeline; private long length; - private BufferPool bufferPool; private Token token; private OzoneClientConfig config; @@ -230,12 +210,6 @@ public Builder setLength(long len) { return this; } - - public Builder setBufferPool(BufferPool pool) { - this.bufferPool = pool; - return this; - } - public Builder setConfig(OzoneClientConfig clientConfig) { this.config = clientConfig; return this; @@ -252,7 +226,6 @@ public BlockDataStreamOutputEntry build() { xceiverClientManager, pipeline, length, - bufferPool, token, config); } } @@ -282,10 +255,6 @@ public long getCurrentPosition() { return currentPosition; } - public BufferPool getBufferPool() { - return bufferPool; - } - public void setCurrentPosition(long curPosition) { this.currentPosition = curPosition; } diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockDataStreamOutputEntryPool.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockDataStreamOutputEntryPool.java index 94c505f2af35..4bc55de262f1 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockDataStreamOutputEntryPool.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockDataStreamOutputEntryPool.java @@ -22,12 +22,10 @@ import com.google.common.base.Preconditions; import org.apache.hadoop.hdds.client.ReplicationConfig; import org.apache.hadoop.hdds.conf.OzoneConfiguration; -import org.apache.hadoop.hdds.scm.ByteStringConversion; import org.apache.hadoop.hdds.scm.OzoneClientConfig; import org.apache.hadoop.hdds.scm.XceiverClientFactory; import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList; import org.apache.hadoop.hdds.scm.pipeline.PipelineID; -import org.apache.hadoop.hdds.scm.storage.BufferPool; import org.apache.hadoop.ozone.om.helpers.OmKeyArgs; import org.apache.hadoop.ozone.om.helpers.OmKeyInfo; import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo; @@ -58,7 +56,6 @@ public class BlockDataStreamOutputEntryPool { private final OmKeyArgs keyArgs; private final XceiverClientFactory xceiverClientFactory; private final String requestID; - private final BufferPool bufferPool; private OmMultipartCommitUploadPartInfo commitUploadPartInfo; private final long openID; private final ExcludeList excludeList; @@ -86,13 +83,6 @@ public BlockDataStreamOutputEntryPool( this.requestID = requestId; this.openID = openID; this.excludeList = new ExcludeList(); - - this.bufferPool = - new BufferPool(config.getStreamBufferSize(), - (int) (config.getStreamBufferMaxSize() / config - .getStreamBufferSize()), - ByteStringConversion - .createByteBufferConversion(unsafeByteBufferConversion)); } /** @@ -114,8 +104,6 @@ public BlockDataStreamOutputEntryPool( config.setStreamBufferFlushDelay(false); requestID = null; int chunkSize = 0; - bufferPool = new BufferPool(chunkSize, 1); - currentStreamIndex = 0; openID = -1; excludeList = new ExcludeList(); @@ -154,7 +142,6 @@ private void addKeyLocationInfo(OmKeyLocationInfo subKeyInfo) { .setPipeline(subKeyInfo.getPipeline()) .setConfig(config) .setLength(subKeyInfo.getLength()) - .setBufferPool(bufferPool) .setToken(subKeyInfo.getToken()); streamEntries.add(builder.build()); } @@ -293,17 +280,10 @@ BlockDataStreamOutputEntry allocateBlockIfNeeded() throws IOException { return streamEntries.get(currentStreamIndex); } - long computeBufferData() { - return bufferPool.computeBufferData(); - } - void cleanup() { if (excludeList != null) { excludeList.clear(); } - if (bufferPool != null) { - bufferPool.clearBufferPool(); - } if (streamEntries != null) { streamEntries.clear(); diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyDataStreamOutput.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyDataStreamOutput.java index a9be11667c8d..c37f9cd51d3c 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyDataStreamOutput.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyDataStreamOutput.java @@ -279,27 +279,7 @@ private void handleException(BlockDataStreamOutputEntry streamEntry, } Pipeline pipeline = streamEntry.getPipeline(); PipelineID pipelineId = pipeline.getId(); - long totalSuccessfulFlushedData = streamEntry.getTotalAckDataLength(); - //set the correct length for the current stream - streamEntry.setCurrentPosition(totalSuccessfulFlushedData); - long bufferedDataLen = blockDataStreamOutputEntryPool.computeBufferData(); - if (containerExclusionException) { - LOG.debug( - "Encountered exception {}. The last committed block length is {}, " - + "uncommitted data length is {} retry count {}", exception, - totalSuccessfulFlushedData, bufferedDataLen, retryCount); - } else { - LOG.warn( - "Encountered exception {} on the pipeline {}. " - + "The last committed block length is {}, " - + "uncommitted data length is {} retry count {}", exception, - pipeline, totalSuccessfulFlushedData, bufferedDataLen, retryCount); - } - Preconditions.checkArgument( - bufferedDataLen <= config.getStreamBufferMaxSize()); - Preconditions.checkArgument( - offset - blockDataStreamOutputEntryPool.getKeyLength() == - bufferedDataLen); + long containerId = streamEntry.getBlockID().getContainerID(); Collection failedServers = streamEntry.getFailedServers(); Preconditions.checkNotNull(failedServers); @@ -337,13 +317,6 @@ private void handleException(BlockDataStreamOutputEntry streamEntry, blockDataStreamOutputEntryPool .discardPreallocatedBlocks(-1, pipelineId); } - if (bufferedDataLen > 0) { - // If the data is still cached in the underlying stream, we need to - // allocate new block and write this data in the datanode. - handleRetry(exception, bufferedDataLen); - // reset the retryCount after handling the exception - retryCount = 0; - } } private void markStreamClosed() {