diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockDataStreamOutput.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockDataStreamOutput.java index 2ae0ba75252e..aada48e2f596 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockDataStreamOutput.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockDataStreamOutput.java @@ -92,6 +92,11 @@ public class BlockDataStreamOutput implements ByteBufferStreamOutput { private int chunkIndex; private final AtomicLong chunkOffset = new AtomicLong(); + + // Similar to 'BufferPool' but this list maintains only references + // to the ByteBuffers. + private List bufferList; + // The IOException will be set by response handling thread in case there is an // exception received in the response. If the exception is set, the next // request will fail upfront. @@ -133,7 +138,8 @@ public BlockDataStreamOutput( XceiverClientFactory xceiverClientManager, Pipeline pipeline, OzoneClientConfig config, - Token token + Token token, + List bufferList ) throws IOException { this.xceiverClientFactory = xceiverClientManager; this.config = config; @@ -148,7 +154,7 @@ public BlockDataStreamOutput( // Alternatively, stream setup can be delayed till the first chunk write. this.out = setupStream(pipeline); this.token = token; - + this.bufferList = bufferList; flushPeriod = (int) (config.getStreamBufferFlushSize() / config .getStreamBufferSize()); @@ -159,7 +165,7 @@ public BlockDataStreamOutput( // A single thread executor handle the responses of async requests responseExecutor = Executors.newSingleThreadExecutor(); - commitWatcher = new StreamCommitWatcher(xceiverClient); + commitWatcher = new StreamCommitWatcher(xceiverClient, bufferList); totalDataFlushedLength = 0; writtenDataLength = 0; failedServers = new ArrayList<>(0); @@ -251,8 +257,11 @@ public void write(ByteBuffer b, int off, int len) throws IOException { if (len == 0) { return; } - writeChunkToContainer( - (ByteBuffer) b.asReadOnlyBuffer().position(off).limit(off + len)); + + final StreamBuffer buf = new StreamBuffer(b, off, len); + bufferList.add(buf); + + writeChunkToContainer(buf.duplicate()); writtenDataLength += len; } @@ -261,6 +270,10 @@ private void updateFlushLength() { totalDataFlushedLength = writtenDataLength; } + @VisibleForTesting + public long getTotalDataFlushedLength() { + return totalDataFlushedLength; + } /** * Will be called on the retryPath in case closedContainerException/ * TimeoutException. @@ -268,8 +281,27 @@ private void updateFlushLength() { * @throws IOException if error occurred */ - // TODO: We need add new retry policy without depend on bufferPool. public void writeOnRetry(long len) throws IOException { + if (len == 0) { + return; + } + if (LOG.isDebugEnabled()) { + LOG.debug("Retrying write length {} for blockID {}", len, blockID); + } + int count = 0; + while (len > 0) { + final StreamBuffer buf = bufferList.get(count); + final long writeLen = Math.min(buf.length(), len); + final ByteBuffer duplicated = buf.duplicate(); + if (writeLen != buf.length()) { + duplicated.limit(Math.toIntExact(len)); + } + writeChunkToContainer(duplicated); + len -= writeLen; + count++; + writtenDataLength += writeLen; + } + } @@ -314,6 +346,14 @@ ContainerCommandResponseProto> executePutBlock(boolean close, boolean force) throws IOException { checkOpen(); long flushPos = totalDataFlushedLength; + final List byteBufferList; + if (!force) { + Preconditions.checkNotNull(bufferList); + byteBufferList = bufferList; + Preconditions.checkNotNull(byteBufferList); + } else { + byteBufferList = null; + } flush(); if (close) { dataStreamCloseReply = out.closeAsync(); @@ -344,12 +384,12 @@ ContainerCommandResponseProto> executePutBlock(boolean close, if (LOG.isDebugEnabled()) { LOG.debug( "Adding index " + asyncReply.getLogIndex() + " commitMap size " - + commitWatcher.getCommitInfoSetSize() + " flushLength " + + commitWatcher.getCommitInfoMapSize() + " flushLength " + flushPos + " blockID " + blockID); } // for standalone protocol, logIndex will always be 0. - commitWatcher.updateCommitInfoSet( - asyncReply.getLogIndex()); + commitWatcher + .updateCommitInfoMap(asyncReply.getLogIndex(), byteBufferList); } return e; }, responseExecutor).exceptionally(e -> { @@ -589,4 +629,8 @@ private void handleExecutionException(Exception ex) throws IOException { setIoException(ex); throw getIoException(); } + + public long getTotalAckDataLength() { + return commitWatcher.getTotalAckDataLength(); + } } diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/StreamBuffer.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/StreamBuffer.java new file mode 100644 index 000000000000..f36019e2aeb8 --- /dev/null +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/StreamBuffer.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdds.scm.storage; + +import java.nio.ByteBuffer; + +/** + * Used for streaming write. + */ +public class StreamBuffer { + private final ByteBuffer buffer; + + public StreamBuffer(ByteBuffer buffer) { + this.buffer = buffer.asReadOnlyBuffer(); + } + + public StreamBuffer(ByteBuffer buffer, int offset, int length) { + this((ByteBuffer) buffer.asReadOnlyBuffer().position(offset) + .limit(offset + length)); + } + + public ByteBuffer duplicate() { + return buffer.duplicate(); + } + + public int length() { + return buffer.limit() - buffer.position(); + } + +} \ No newline at end of file diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/StreamCommitWatcher.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/StreamCommitWatcher.java index c187ffe902ba..3a59d0757105 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/StreamCommitWatcher.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/StreamCommitWatcher.java @@ -24,6 +24,7 @@ */ package org.apache.hadoop.hdds.scm.storage; +import com.google.common.base.Preconditions; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandResponseProto; import org.apache.hadoop.hdds.scm.XceiverClientReply; import org.apache.hadoop.hdds.scm.XceiverClientSpi; @@ -31,13 +32,16 @@ import org.slf4j.LoggerFactory; import java.io.IOException; -import java.util.Set; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.ConcurrentSkipListSet; +import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeoutException; +import java.util.stream.Collectors; /** * This class executes watchForCommit on ratis pipeline and releases @@ -48,7 +52,12 @@ public class StreamCommitWatcher { private static final Logger LOG = LoggerFactory.getLogger(StreamCommitWatcher.class); - private Set commitIndexSet; + private Map> commitIndexMap; + private List bufferList; + + // total data which has been successfully flushed and acknowledged + // by all servers + private long totalAckDataLength; // future Map to hold up all putBlock futures private ConcurrentHashMap bufferList) { this.xceiverClient = xceiverClient; - commitIndexSet = new ConcurrentSkipListSet(); + commitIndexMap = new ConcurrentSkipListMap<>(); futureMap = new ConcurrentHashMap<>(); + this.bufferList = bufferList; + totalAckDataLength = 0; } - public void updateCommitInfoSet(long index) { - commitIndexSet.add(index); + public void updateCommitInfoMap(long index, List buffers) { + commitIndexMap.computeIfAbsent(index, k -> new LinkedList<>()) + .addAll(buffers); } - int getCommitInfoSetSize() { - return commitIndexSet.size(); + int getCommitInfoMapSize() { + return commitIndexMap.size(); } /** @@ -78,12 +91,12 @@ int getCommitInfoSetSize() { * @throws IOException in case watchForCommit fails */ public XceiverClientReply streamWatchOnFirstIndex() throws IOException { - if (!commitIndexSet.isEmpty()) { + if (!commitIndexMap.isEmpty()) { // wait for the first commit index in the commitIndex2flushedDataMap // to get committed to all or majority of nodes in case timeout // happens. long index = - commitIndexSet.stream().mapToLong(v -> v).min() + commitIndexMap.keySet().stream().mapToLong(v -> v).min() .getAsLong(); if (LOG.isDebugEnabled()) { LOG.debug("waiting for first index {} to catch up", index); @@ -102,12 +115,12 @@ public XceiverClientReply streamWatchOnFirstIndex() throws IOException { */ public XceiverClientReply streamWatchOnLastIndex() throws IOException { - if (!commitIndexSet.isEmpty()) { + if (!commitIndexMap.isEmpty()) { // wait for the commit index in the commitIndex2flushedDataMap // to get committed to all or majority of nodes in case timeout // happens. long index = - commitIndexSet.stream().mapToLong(v -> v).max() + commitIndexMap.keySet().stream().mapToLong(v -> v).max() .getAsLong(); if (LOG.isDebugEnabled()) { LOG.debug("waiting for last flush Index {} to catch up", index); @@ -127,9 +140,16 @@ public XceiverClientReply streamWatchOnLastIndex() */ public XceiverClientReply streamWatchForCommit(long commitIndex) throws IOException { + final long index; try { XceiverClientReply reply = xceiverClient.watchForCommit(commitIndex); + if (reply == null) { + index = 0; + } else { + index = reply.getLogIndex(); + } + adjustBuffers(index); return reply; } catch (InterruptedException e) { // Re-interrupt the thread while catching InterruptedException @@ -140,11 +160,52 @@ public XceiverClientReply streamWatchForCommit(long commitIndex) } } + void releaseBuffersOnException() { + adjustBuffers(xceiverClient.getReplicatedMinCommitIndex()); + } + + private void adjustBuffers(long commitIndex) { + List keyList = commitIndexMap.keySet().stream() + .filter(p -> p <= commitIndex).collect(Collectors.toList()); + if (!keyList.isEmpty()) { + releaseBuffers(keyList); + } + } + + private long releaseBuffers(List indexes) { + Preconditions.checkArgument(!commitIndexMap.isEmpty()); + for (long index : indexes) { + Preconditions.checkState(commitIndexMap.containsKey(index)); + final List buffers = commitIndexMap.remove(index); + final long length = + buffers.stream().mapToLong(StreamBuffer::length).sum(); + totalAckDataLength += length; + // clear the future object from the future Map + final CompletableFuture remove = + futureMap.remove(totalAckDataLength); + if (remove == null) { + LOG.error("Couldn't find required future for " + totalAckDataLength); + for (Long key : futureMap.keySet()) { + LOG.error("Existing acknowledged data: " + key); + } + } + for (StreamBuffer byteBuffer : buffers) { + bufferList.remove(byteBuffer); + } + } + return totalAckDataLength; + } + + public long getTotalAckDataLength() { + return totalAckDataLength; + } + private IOException getIOExceptionForWatchForCommit(long commitIndex, Exception e) { LOG.warn("watchForCommit failed for index {}", commitIndex, e); IOException ioException = new IOException( "Unexpected Storage Container Exception: " + e.toString(), e); + releaseBuffersOnException(); return ioException; } @@ -155,12 +216,12 @@ ContainerCommandResponseProto>> getFutureMap() { } public void cleanup() { - if (commitIndexSet != null) { - commitIndexSet.clear(); + if (commitIndexMap != null) { + commitIndexMap.clear(); } if (futureMap != null) { futureMap.clear(); } - commitIndexSet = null; + commitIndexMap = null; } } diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockDataStreamOutputEntry.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockDataStreamOutputEntry.java index f0c3a43e891e..2cd5630549c3 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockDataStreamOutputEntry.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockDataStreamOutputEntry.java @@ -25,6 +25,7 @@ import org.apache.hadoop.hdds.scm.pipeline.Pipeline; import org.apache.hadoop.hdds.scm.storage.BlockDataStreamOutput; import org.apache.hadoop.hdds.scm.storage.ByteBufferStreamOutput; +import org.apache.hadoop.hdds.scm.storage.StreamBuffer; import org.apache.hadoop.hdds.security.token.OzoneBlockTokenIdentifier; import org.apache.hadoop.security.token.Token; @@ -32,6 +33,7 @@ import java.nio.ByteBuffer; import java.util.Collection; import java.util.Collections; +import java.util.List; /** * Helper class used inside {@link BlockDataStreamOutput}. @@ -50,6 +52,7 @@ public final class BlockDataStreamOutputEntry // the current position of this stream 0 <= currentPosition < length private long currentPosition; private final Token token; + private List bufferList; @SuppressWarnings({"parameternumber", "squid:S00107"}) private BlockDataStreamOutputEntry( @@ -58,7 +61,8 @@ private BlockDataStreamOutputEntry( Pipeline pipeline, long length, Token token, - OzoneClientConfig config + OzoneClientConfig config, + List bufferList ) { this.config = config; this.byteBufferStreamOutput = null; @@ -69,6 +73,7 @@ private BlockDataStreamOutputEntry( this.token = token; this.length = length; this.currentPosition = 0; + this.bufferList = bufferList; } long getLength() { @@ -92,8 +97,8 @@ long getRemaining() { private void checkStream() throws IOException { if (this.byteBufferStreamOutput == null) { this.byteBufferStreamOutput = - new BlockDataStreamOutput(blockID, xceiverClientManager, - pipeline, config, token); + new BlockDataStreamOutput(blockID, xceiverClientManager, pipeline, + config, token, bufferList); } } @@ -151,6 +156,20 @@ long getWrittenDataLength() { } } + long getTotalAckDataLength() { + if (byteBufferStreamOutput != null) { + BlockDataStreamOutput out = + (BlockDataStreamOutput) this.byteBufferStreamOutput; + blockID = out.getBlockID(); + return out.getTotalAckDataLength(); + } else { + // For a pre allocated block for which no write has been initiated, + // the OutputStream will be null here. + // In such cases, the default blockCommitSequenceId will be 0 + return 0; + } + } + void cleanup(boolean invalidateClient) throws IOException { checkStream(); BlockDataStreamOutput out = @@ -180,6 +199,7 @@ public static class Builder { private long length; private Token token; private OzoneClientConfig config; + private List bufferList; public Builder setBlockID(BlockID bID) { this.blockID = bID; @@ -219,13 +239,18 @@ public Builder setToken(Token bToken) { return this; } + public Builder setBufferList(List bList) { + this.bufferList = bList; + return this; + } + public BlockDataStreamOutputEntry build() { return new BlockDataStreamOutputEntry(blockID, key, xceiverClientManager, pipeline, length, - token, config); + token, config, bufferList); } } diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockDataStreamOutputEntryPool.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockDataStreamOutputEntryPool.java index 4bc55de262f1..e49b0b79adf6 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockDataStreamOutputEntryPool.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockDataStreamOutputEntryPool.java @@ -26,6 +26,7 @@ import org.apache.hadoop.hdds.scm.XceiverClientFactory; import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList; import org.apache.hadoop.hdds.scm.pipeline.PipelineID; +import org.apache.hadoop.hdds.scm.storage.StreamBuffer; import org.apache.hadoop.ozone.om.helpers.OmKeyArgs; import org.apache.hadoop.ozone.om.helpers.OmKeyInfo; import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo; @@ -59,6 +60,7 @@ public class BlockDataStreamOutputEntryPool { private OmMultipartCommitUploadPartInfo commitUploadPartInfo; private final long openID; private final ExcludeList excludeList; + private List bufferList; @SuppressWarnings({"parameternumber", "squid:S00107"}) public BlockDataStreamOutputEntryPool( @@ -83,6 +85,7 @@ public BlockDataStreamOutputEntryPool( this.requestID = requestId; this.openID = openID; this.excludeList = new ExcludeList(); + this.bufferList = new ArrayList<>(); } /** @@ -142,7 +145,8 @@ private void addKeyLocationInfo(OmKeyLocationInfo subKeyInfo) { .setPipeline(subKeyInfo.getPipeline()) .setConfig(config) .setLength(subKeyInfo.getLength()) - .setToken(subKeyInfo.getToken()); + .setToken(subKeyInfo.getToken()) + .setBufferList(bufferList); streamEntries.add(builder.build()); } @@ -301,4 +305,12 @@ public ExcludeList getExcludeList() { boolean isEmpty() { return streamEntries.isEmpty(); } + + long computeBufferData() { + long totalDataLen =0; + for (StreamBuffer b : bufferList){ + totalDataLen += b.length(); + } + return totalDataLen; + } } diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyDataStreamOutput.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyDataStreamOutput.java index 9bba89d0a8a0..2540e42e24ae 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyDataStreamOutput.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyDataStreamOutput.java @@ -278,11 +278,14 @@ private void handleException(BlockDataStreamOutputEntry streamEntry, } Pipeline pipeline = streamEntry.getPipeline(); PipelineID pipelineId = pipeline.getId(); - + long totalSuccessfulFlushedData = streamEntry.getTotalAckDataLength(); + //set the correct length for the current stream + streamEntry.setCurrentPosition(totalSuccessfulFlushedData); long containerId = streamEntry.getBlockID().getContainerID(); Collection failedServers = streamEntry.getFailedServers(); Preconditions.checkNotNull(failedServers); ExcludeList excludeList = blockDataStreamOutputEntryPool.getExcludeList(); + long bufferedDataLen = blockDataStreamOutputEntryPool.computeBufferData(); if (!failedServers.isEmpty()) { excludeList.addDatanodes(failedServers); } @@ -316,6 +319,13 @@ private void handleException(BlockDataStreamOutputEntry streamEntry, blockDataStreamOutputEntryPool .discardPreallocatedBlocks(-1, pipelineId); } + if (bufferedDataLen > 0) { + // If the data is still cached in the underlying stream, we need to + // allocate new block and write this data in the datanode. + handleRetry(exception, bufferedDataLen); + // reset the retryCount after handling the exception + retryCount = 0; + } } private void markStreamClosed() { diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockDataStreamOutput.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockDataStreamOutput.java index d3b2d22577e9..05a101951b80 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockDataStreamOutput.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockDataStreamOutput.java @@ -21,15 +21,19 @@ import org.apache.hadoop.hdds.client.ReplicationType; import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.scm.OzoneClientConfig; +import org.apache.hadoop.hdds.scm.storage.BlockDataStreamOutput; +import org.apache.hadoop.hdds.scm.storage.ByteBufferStreamOutput; import org.apache.hadoop.ozone.MiniOzoneCluster; import org.apache.hadoop.ozone.OzoneConfigKeys; import org.apache.hadoop.ozone.client.ObjectStore; import org.apache.hadoop.ozone.client.OzoneClient; import org.apache.hadoop.ozone.client.OzoneClientFactory; +import org.apache.hadoop.ozone.client.io.KeyDataStreamOutput; import org.apache.hadoop.ozone.client.io.OzoneDataStreamOutput; import org.apache.hadoop.ozone.container.ContainerTestHelper; import org.apache.hadoop.ozone.container.TestHelper; import org.junit.AfterClass; +import org.junit.Assert; import org.junit.BeforeClass; import org.junit.Rule; import org.junit.Test; @@ -127,21 +131,25 @@ public static void shutdown() { @Test public void testHalfChunkWrite() throws Exception { testWrite(chunkSize / 2); + testWriteWithFailure(chunkSize/2); } @Test public void testSingleChunkWrite() throws Exception { testWrite(chunkSize); + testWriteWithFailure(chunkSize); } @Test public void testMultiChunkWrite() throws Exception { testWrite(chunkSize + 50); + testWriteWithFailure(chunkSize + 50); } @Test public void testMultiBlockWrite() throws Exception { testWrite(blockSize + 50); + testWriteWithFailure(blockSize + 50); } private void testWrite(int dataLength) throws Exception { @@ -156,6 +164,28 @@ private void testWrite(int dataLength) throws Exception { key.close(); validateData(keyName, data); } + + private void testWriteWithFailure(int dataLength) throws Exception { + String keyName = getKeyName(); + OzoneDataStreamOutput key = createKey( + keyName, ReplicationType.RATIS, 0); + byte[] data = + ContainerTestHelper.getFixedLengthString(keyString, dataLength) + .getBytes(UTF_8); + ByteBuffer b = ByteBuffer.wrap(data); + key.write(b); + KeyDataStreamOutput keyDataStreamOutput = + (KeyDataStreamOutput) key.getByteBufStreamOutput(); + ByteBufferStreamOutput stream = + keyDataStreamOutput.getStreamEntries().get(0).getByteBufStreamOutput(); + Assert.assertTrue(stream instanceof BlockDataStreamOutput); + TestHelper.waitForContainerClose(key, cluster); + key.write(b); + key.close(); + String dataString = new String(data, UTF_8); + validateData(keyName, dataString.concat(dataString).getBytes(UTF_8)); + } + private OzoneDataStreamOutput createKey(String keyName, ReplicationType type, long size) throws Exception { return TestHelper.createStreamKey( diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/TestHelper.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/TestHelper.java index 0e48dd9d2d67..82fff089cea3 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/TestHelper.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/TestHelper.java @@ -40,7 +40,9 @@ import org.apache.hadoop.ozone.MiniOzoneCluster; import org.apache.hadoop.ozone.OzoneConsts; import org.apache.hadoop.ozone.client.ObjectStore; +import org.apache.hadoop.ozone.client.io.BlockDataStreamOutputEntry; import org.apache.hadoop.ozone.client.io.BlockOutputStreamEntry; +import org.apache.hadoop.ozone.client.io.KeyDataStreamOutput; import org.apache.hadoop.ozone.client.io.KeyOutputStream; import org.apache.hadoop.ozone.client.io.OzoneDataStreamOutput; import org.apache.hadoop.ozone.client.io.OzoneInputStream; @@ -189,6 +191,24 @@ public static void waitForContainerClose(OzoneOutputStream outputStream, waitForContainerClose(cluster, containerIdList.toArray(new Long[0])); } + + public static void waitForContainerClose(OzoneDataStreamOutput outputStream, + MiniOzoneCluster cluster) throws Exception { + KeyDataStreamOutput keyOutputStream = + (KeyDataStreamOutput) outputStream.getByteBufStreamOutput(); + List streamEntryList = + keyOutputStream.getStreamEntries(); + List containerIdList = new ArrayList<>(); + for (BlockDataStreamOutputEntry entry : streamEntryList) { + long id = entry.getBlockID().getContainerID(); + if (!containerIdList.contains(id)) { + containerIdList.add(id); + } + } + Assert.assertTrue(!containerIdList.isEmpty()); + waitForContainerClose(cluster, containerIdList.toArray(new Long[0])); + } + public static void waitForPipelineClose(OzoneOutputStream outputStream, MiniOzoneCluster cluster, boolean waitForContainerCreation) throws Exception {