diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/OzoneClientConfig.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/OzoneClientConfig.java index 14a8aacbc015..ca3ef5149635 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/OzoneClientConfig.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/OzoneClientConfig.java @@ -143,6 +143,13 @@ public enum ChecksumCombineMode { tags = ConfigTag.CLIENT) private int maxECStripeWriteRetries = 10; + @Config(key = "ec.stripe.queue.size", + defaultValue = "2", + description = "The max number of EC stripes can be buffered in client " + + " before flushing into datanodes.", + tags = ConfigTag.CLIENT) + private int ecStripeQueueSize = 2; + @Config(key = "exclude.nodes.expiry.time", defaultValue = "600000", description = "Time after which an excluded node is reconsidered for" + @@ -288,6 +295,10 @@ public int getMaxECStripeWriteRetries() { return this.maxECStripeWriteRetries; } + public int getEcStripeQueueSize() { + return this.ecStripeQueueSize; + } + public long getExcludeNodesExpiryTime() { return excludeNodesExpiryTime; } diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECKeyOutputStream.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECKeyOutputStream.java index 12440b44cf8f..7d5a06fc5291 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECKeyOutputStream.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECKeyOutputStream.java @@ -23,6 +23,14 @@ import java.util.Arrays; import java.util.List; import java.util.Set; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; import java.util.stream.Collectors; import java.util.stream.IntStream; @@ -56,11 +64,16 @@ public final class ECKeyOutputStream extends KeyOutputStream { private OzoneClientConfig config; private ECChunkBuffers ecChunkBufferCache; + private final BlockingQueue ecStripeQueue; + private int chunkIndex; private int ecChunkSize; private final int numDataBlks; private final int numParityBlks; private final ByteBufferPool bufferPool; private final RawErasureEncoder encoder; + private final ExecutorService flushExecutor; + private final Future flushFuture; + private final AtomicLong flushCheckpoint; private enum StripeWriteStatus { SUCCESS, @@ -92,6 +105,16 @@ public List getLocationInfoList() { return blockOutputStreamEntryPool.getLocationInfoList(); } + @VisibleForTesting + public void insertFlushCheckpoint(long version) throws IOException { + addStripeToQueue(new CheckpointDummyStripe(version)); + } + + @VisibleForTesting + public long getFlushCheckpoint() { + return flushCheckpoint.get(); + } + private ECKeyOutputStream(Builder builder) { super(builder.getClientMetrics()); this.config = builder.getClientConfig(); @@ -105,6 +128,8 @@ private ECKeyOutputStream(Builder builder) { this.numParityBlks = builder.getReplicationConfig().getParity(); ecChunkBufferCache = new ECChunkBuffers( ecChunkSize, numDataBlks, numParityBlks, bufferPool); + chunkIndex = 0; + ecStripeQueue = new ArrayBlockingQueue<>(config.getEcStripeQueueSize()); OmKeyInfo info = builder.getOpenHandler().getKeyInfo(); blockOutputStreamEntryPool = new ECBlockOutputStreamEntryPool(config, @@ -119,6 +144,9 @@ private ECKeyOutputStream(Builder builder) { this.writeOffset = 0; this.encoder = CodecUtil.createRawEncoderWithFallback( builder.getReplicationConfig()); + this.flushExecutor = Executors.newSingleThreadExecutor(); + this.flushFuture = this.flushExecutor.submit(this::flushStripeFromQueue); + this.flushCheckpoint = new AtomicLong(0); } /** @@ -165,14 +193,14 @@ public void write(byte[] b, int off, int len) throws IOException { } } catch (Exception e) { markStreamClosed(); - throw new IOException(e.getMessage()); + throw e; } writeOffset += len; } - private StripeWriteStatus rewriteStripeToNewBlockGroup() throws IOException { + private void rollbackAndReset(ECChunkBuffers stripe) throws IOException { // Rollback the length/offset updated as part of this failed stripe write. - final ByteBuffer[] dataBuffers = ecChunkBufferCache.getDataBuffers(); + final ByteBuffer[] dataBuffers = stripe.getDataBuffers(); offset -= Arrays.stream(dataBuffers).mapToInt(Buffer::limit).sum(); final ECBlockOutputStreamEntry failedStreamEntry = @@ -185,28 +213,6 @@ private StripeWriteStatus rewriteStripeToNewBlockGroup() throws IOException { failedStreamEntry.getPipeline().getId()); // Let's close the current entry. failedStreamEntry.close(); - - // Let's rewrite the last stripe, so that it will be written to new block - // group. - // TODO: we can improve to write partial stripe failures. In that case, - // we just need to write only available buffers. - blockOutputStreamEntryPool.allocateBlockIfNeeded(); - final ECBlockOutputStreamEntry currentStreamEntry = - blockOutputStreamEntryPool.getCurrentStreamEntry(); - for (int i = 0; i < numDataBlks; i++) { - if (dataBuffers[i].limit() > 0) { - handleOutputStreamWrite(i, dataBuffers[i].limit(), false); - } - currentStreamEntry.useNextBlockStream(); - } - return handleParityWrites(); - } - - private void encodeAndWriteParityCells() throws IOException { - generateParityCells(); - if (handleParityWrites() == StripeWriteStatus.FAILED) { - retryStripeWrite(config.getMaxECStripeWriteRetries()); - } } private void logStreamError(List failedStreams, @@ -228,8 +234,8 @@ private void logStreamError(List failedStreams, } } - private StripeWriteStatus handleParityWrites() throws IOException { - writeParityCells(); + private StripeWriteStatus commitStripeWrite(ECChunkBuffers stripe) + throws IOException { ECBlockOutputStreamEntry streamEntry = blockOutputStreamEntryPool.getCurrentStreamEntry(); @@ -246,7 +252,7 @@ private StripeWriteStatus handleParityWrites() throws IOException { // By this time, we should have finished full stripe. So, lets call // executePutBlock for all. final boolean isLastStripe = streamEntry.getRemaining() <= 0 || - ecChunkBufferCache.getLastDataCell().limit() < ecChunkSize; + stripe.getLastDataCell().limit() < ecChunkSize; ByteString checksum = streamEntry.calculateChecksum(); streamEntry.executePutBlock(isLastStripe, streamEntry.getCurrentPosition(), checksum); @@ -261,7 +267,7 @@ private StripeWriteStatus handleParityWrites() throws IOException { } streamEntry.updateBlockGroupToAckedPosition( streamEntry.getCurrentPosition()); - ecChunkBufferCache.clear(); + stripe.clear(); if (streamEntry.getRemaining() <= 0) { streamEntry.close(); @@ -340,63 +346,66 @@ private void generateParityCells() throws IOException { } } - private void writeParityCells() { + private void writeDataCells(ECChunkBuffers stripe) throws IOException { + blockOutputStreamEntryPool.allocateBlockIfNeeded(); + ByteBuffer[] dataCells = stripe.getDataBuffers(); + for (int i = 0; i < numDataBlks; i++) { + if (dataCells[i].limit() > 0) { + handleOutputStreamWrite(dataCells[i], false); + } + blockOutputStreamEntryPool.getCurrentStreamEntry().useNextBlockStream(); + } + } + + private void writeParityCells(ECChunkBuffers stripe) { // Move the stream entry cursor to parity block index blockOutputStreamEntryPool .getCurrentStreamEntry().forceToFirstParityBlock(); - ByteBuffer[] parityCells = ecChunkBufferCache.getParityBuffers(); + ByteBuffer[] parityCells = stripe.getParityBuffers(); for (int i = 0; i < numParityBlks; i++) { - handleOutputStreamWrite(numDataBlks + i, parityCells[i].limit(), true); + handleOutputStreamWrite(parityCells[i], true); blockOutputStreamEntryPool.getCurrentStreamEntry().useNextBlockStream(); } } private int handleWrite(byte[] b, int off, int len) throws IOException { - - blockOutputStreamEntryPool.allocateBlockIfNeeded(); - - int currIdx = blockOutputStreamEntryPool - .getCurrentStreamEntry().getCurrentStreamIdx(); - int bufferRem = ecChunkBufferCache.dataBuffers[currIdx].remaining(); + int bufferRem = ecChunkBufferCache.dataBuffers[chunkIndex].remaining(); final int writeLen = Math.min(len, Math.min(bufferRem, ecChunkSize)); - int pos = ecChunkBufferCache.addToDataBuffer(currIdx, b, off, writeLen); + int pos = ecChunkBufferCache.addToDataBuffer(chunkIndex, b, off, writeLen); - // if this cell is full, send data to the OutputStream + // if this cell is full, use next buffer if (pos == ecChunkSize) { - handleOutputStreamWrite(currIdx, pos, false); - blockOutputStreamEntryPool.getCurrentStreamEntry().useNextBlockStream(); + chunkIndex++; // if this is last data cell in the stripe, - // compute and write the parity cells - if (currIdx == numDataBlks - 1) { - encodeAndWriteParityCells(); + // compute parity cells and write data + if (chunkIndex == numDataBlks) { + generateParityCells(); + addStripeToQueue(ecChunkBufferCache); + ecChunkBufferCache = new ECChunkBuffers(ecChunkSize, + numDataBlks, numParityBlks, bufferPool); + chunkIndex = 0; } } return writeLen; } - private void handleOutputStreamWrite(int currIdx, int len, boolean isParity) { - ByteBuffer bytesToWrite = isParity ? - ecChunkBufferCache.getParityBuffers()[currIdx - numDataBlks] : - ecChunkBufferCache.getDataBuffers()[currIdx]; + private void handleOutputStreamWrite(ByteBuffer buffer, boolean isParity) { try { // Since it's a full cell, let's write all content from buffer. // At a time we write max cell size in EC. So, it should safe to cast // the len to int to use the super class defined write API. // The len cannot be bigger than cell buffer size. - assert len <= ecChunkSize : " The len: " + len + ". EC chunk size: " - + ecChunkSize; - assert len <= bytesToWrite - .limit() : " The len: " + len + ". Chunk buffer limit: " - + bytesToWrite.limit(); + assert buffer.limit() <= ecChunkSize : "The buffer size: " + + buffer.limit() + " should not exceed EC chunk size: " + ecChunkSize; writeToOutputStream(blockOutputStreamEntryPool.getCurrentStreamEntry(), - bytesToWrite.array(), len, 0, isParity); + buffer.array(), buffer.limit(), 0, isParity); } catch (Exception e) { markStreamAsFailed(e); } } - private long writeToOutputStream(ECBlockOutputStreamEntry current, + private void writeToOutputStream(ECBlockOutputStreamEntry current, byte[] b, int writeLen, int off, boolean isParity) throws IOException { try { @@ -414,7 +423,6 @@ private long writeToOutputStream(ECBlockOutputStreamEntry current, .getCurrentStreamIdx(), ioe); handleException(current, ioe); } - return writeLen; } private void handleException(BlockOutputStreamEntry streamEntry, @@ -481,37 +489,95 @@ public void close() throws IOException { try { // If stripe buffer is not empty, encode and flush the stripe. if (ecChunkBufferCache.getFirstDataCell().position() > 0) { - final int index = blockOutputStreamEntryPool.getCurrentStreamEntry() - .getCurrentStreamIdx(); - ByteBuffer lastCell = ecChunkBufferCache.getDataBuffers()[index]; - - // Finish writing the current partial cached chunk - if (lastCell.position() % ecChunkSize != 0) { - handleOutputStreamWrite(index, lastCell.position(), false); - } - - encodeAndWriteParityCells(); + generateParityCells(); + addStripeToQueue(ecChunkBufferCache); } + // Send EOF mark to flush thread. + addStripeToQueue(new EOFDummyStripe()); + + // Wait for all the stripes to be written. + flushFuture.get(); + flushExecutor.shutdownNow(); closeCurrentStreamEntry(); Preconditions.checkArgument(writeOffset == offset, "Expected writeOffset= " + writeOffset + " Expected offset=" + offset); blockOutputStreamEntryPool.commitKey(offset); + } catch (ExecutionException e) { + Throwable cause = e.getCause(); + if (cause instanceof IOException) { + throw (IOException) cause; + } else if (cause instanceof RuntimeException) { + throw (RuntimeException) cause; + } else { + throw new IOException(cause); + } + } catch (InterruptedException e) { + throw new IOException("Flushing thread was interrupted", e); } finally { blockOutputStreamEntryPool.cleanup(); } - ecChunkBufferCache.release(); } - private void retryStripeWrite(int times) throws IOException { - for (int i = 0; i < times; i++) { - if (rewriteStripeToNewBlockGroup() == StripeWriteStatus.SUCCESS) { + private void addStripeToQueue(ECChunkBuffers stripe) throws IOException { + try { + do { + // If flushFuture is done, it means that the flush thread has + // encountered an exception. Call get() to throw that exception here. + if (flushFuture.isDone()) { + flushFuture.get(); + // We should never reach here. + throw new IOException("Flush thread has ended before stream close"); + } + } while (!ecStripeQueue.offer(stripe, 1, TimeUnit.SECONDS)); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new IOException("Interrupted while adding stripe to queue", e); + } catch (ExecutionException e) { + if (e.getCause() instanceof IOException) { + throw (IOException) e.getCause(); + } else if (e.getCause() instanceof RuntimeException) { + throw (RuntimeException) e.getCause(); + } else { + throw new IOException(e.getCause()); + } + } + } + + private boolean flushStripeFromQueue() throws IOException { + try { + ECChunkBuffers stripe = ecStripeQueue.take(); + while (!(stripe instanceof EOFDummyStripe)) { + if (stripe instanceof CheckpointDummyStripe) { + flushCheckpoint.set(((CheckpointDummyStripe) stripe).version); + } else { + flushStripeToDatanodes(stripe); + stripe.release(); + } + stripe = ecStripeQueue.take(); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new IOException("Interrupted while polling stripe from queue", e); + } + return true; + } + + private void flushStripeToDatanodes(ECChunkBuffers stripe) + throws IOException { + int maxRetry = config.getMaxECStripeWriteRetries(); + for (int i = 0; i <= maxRetry; i++) { + writeDataCells(stripe); + writeParityCells(stripe); + if (commitStripeWrite(stripe) == StripeWriteStatus.SUCCESS) { return; } + // In case of failure, cleanup before retry + rollbackAndReset(stripe); } throw new IOException("Completed max allowed retries " + - times + " on stripe failures."); + maxRetry + " on stripe failures."); } public static void padBufferToLimit(ByteBuffer buf, int limit) { @@ -580,12 +646,30 @@ private void checkNotClosed() throws IOException { } } + private static class EOFDummyStripe extends ECChunkBuffers { + EOFDummyStripe() { + } + } + + private static class CheckpointDummyStripe extends ECChunkBuffers { + private final long version; + CheckpointDummyStripe(long version) { + super(); + this.version = version; + } + } + private static class ECChunkBuffers { private final ByteBuffer[] dataBuffers; private final ByteBuffer[] parityBuffers; private int cellSize; private ByteBufferPool byteBufferPool; + ECChunkBuffers() { + dataBuffers = null; + parityBuffers = null; + } + ECChunkBuffers(int cellSize, int numData, int numParity, ByteBufferPool byteBufferPool) { this.cellSize = cellSize; diff --git a/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/TestOzoneECClient.java b/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/TestOzoneECClient.java index 208ad82d76f2..237ed34e5bca 100644 --- a/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/TestOzoneECClient.java +++ b/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/TestOzoneECClient.java @@ -45,6 +45,7 @@ import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos; import org.apache.ozone.erasurecode.rawcoder.RSRawErasureCoderFactory; import org.apache.ozone.erasurecode.rawcoder.RawErasureEncoder; +import org.apache.ozone.test.GenericTestUtils; import org.apache.ratis.thirdparty.com.google.protobuf.ByteString; import org.junit.After; import org.junit.Assert; @@ -610,29 +611,29 @@ public void test10D4PConfigWithPartialStripe() @Test public void testWriteShouldFailIfMoreThanParityNodesFail() - throws IOException { + throws Exception { testNodeFailuresWhileWriting(new int[] {0, 1, 2}, 3, 2); } @Test public void testWriteShouldSuccessIfLessThanParityNodesFail() - throws IOException { + throws Exception { testNodeFailuresWhileWriting(new int[] {0}, 2, 2); } @Test - public void testWriteShouldSuccessIf4NodesFailed() throws IOException { + public void testWriteShouldSuccessIf4NodesFailed() throws Exception { testNodeFailuresWhileWriting(new int[] {0, 1, 2, 3}, 1, 2); } @Test public void testWriteShouldSuccessWithAdditional1BlockGroupAfterFailure() - throws IOException { + throws Exception { testNodeFailuresWhileWriting(new int[] {0, 1, 2, 3}, 10, 3); } @Test - public void testStripeWriteRetriesOn2Failures() throws IOException { + public void testStripeWriteRetriesOn2Failures() throws Exception { OzoneConfiguration con = new OzoneConfiguration(); con.setStorageSize(OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE, 2, StorageUnit.KB); // Cluster has 15 nodes. So, first we will create 3 block groups with @@ -655,7 +656,7 @@ public void testStripeWriteRetriesOn2Failures() throws IOException { } @Test - public void testStripeWriteRetriesOn3Failures() throws IOException { + public void testStripeWriteRetriesOn3Failures() throws Exception { OzoneConfiguration con = new OzoneConfiguration(); con.setStorageSize(OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE, 2, StorageUnit.KB); @@ -677,9 +678,9 @@ public void testStripeWriteRetriesOn3Failures() throws IOException { } // The mocked impl throws IllegalStateException when there are not enough - // nodes in allocateBlock request. But write() converts it to IOException. - @Test(expected = IOException.class) - public void testStripeWriteRetriesOnAllNodeFailures() throws IOException { + // nodes in allocateBlock request. + @Test(expected = IllegalStateException.class) + public void testStripeWriteRetriesOnAllNodeFailures() throws Exception { OzoneConfiguration con = new OzoneConfiguration(); con.setStorageSize(OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE, 2, StorageUnit.KB); @@ -697,7 +698,7 @@ public void testStripeWriteRetriesOnAllNodeFailures() throws IOException { @Test public void testStripeWriteRetriesOn4FailuresWith3RetriesAllowed() - throws IOException { + throws Exception { OzoneConfiguration con = new OzoneConfiguration(); con.setStorageSize(OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE, 2, StorageUnit.KB); con.setInt(OzoneConfigKeys.OZONE_CLIENT_MAX_EC_STRIPE_WRITE_RETRIES, 3); @@ -726,7 +727,7 @@ public void testStripeWriteRetriesOn4FailuresWith3RetriesAllowed() } public void testStripeWriteRetriesOnFailures(OzoneConfiguration con, - int clusterSize, int[] nodesIndexesToMarkFailure) throws IOException { + int clusterSize, int[] nodesIndexesToMarkFailure) throws Exception { close(); MultiNodePipelineBlockAllocator blkAllocator = new MultiNodePipelineBlockAllocator(con, dataBlocks + parityBlocks, @@ -744,6 +745,7 @@ public void testStripeWriteRetriesOnFailures(OzoneConfiguration con, for (int i = 0; i < dataBlocks; i++) { out.write(inputChunks[i]); } + waitForFlushingThreadToFinish((ECKeyOutputStream) out.getOutputStream()); Assert.assertTrue( ((MockXceiverClientFactory) factoryStub).getStorages().size() == 5); List failedDNs = new ArrayList<>(); @@ -787,7 +789,7 @@ public void testStripeWriteRetriesOnFailures(OzoneConfiguration con, public void testNodeFailuresWhileWriting(int[] nodesIndexesToMarkFailure, int numChunksToWriteAfterFailure, int numExpectedBlockGrps) - throws IOException { + throws Exception { store.createVolume(volumeName); OzoneVolume volume = store.getVolume(volumeName); volume.createBucket(bucketName); @@ -800,6 +802,7 @@ public void testNodeFailuresWhileWriting(int[] nodesIndexesToMarkFailure, for (int i = 0; i < dataBlocks; i++) { out.write(inputChunks[i]); } + waitForFlushingThreadToFinish((ECKeyOutputStream) out.getOutputStream()); List failedDNs = new ArrayList<>(); List dns = allocator.getClusterDns(); @@ -841,22 +844,22 @@ public void testNodeFailuresWhileWriting(int[] nodesIndexesToMarkFailure, } @Test - public void testExcludeOnDNFailure() throws IOException { + public void testExcludeOnDNFailure() throws Exception { testExcludeFailedDN(IntStream.range(0, 5), IntStream.empty()); } @Test - public void testExcludeOnDNClosed() throws IOException { + public void testExcludeOnDNClosed() throws Exception { testExcludeFailedDN(IntStream.empty(), IntStream.range(0, 5)); } @Test - public void testExcludeOnDNMixed() throws IOException { + public void testExcludeOnDNMixed() throws Exception { testExcludeFailedDN(IntStream.range(0, 3), IntStream.range(3, 5)); } private void testExcludeFailedDN(IntStream failedDNIndex, - IntStream closedDNIndex) throws IOException { + IntStream closedDNIndex) throws Exception { close(); OzoneConfiguration con = new OzoneConfiguration(); MultiNodePipelineBlockAllocator blkAllocator = @@ -882,6 +885,7 @@ private void testExcludeFailedDN(IntStream failedDNIndex, for (int i = 0; i < dataBlocks; i++) { out.write(inputChunks[i % dataBlocks]); } + waitForFlushingThreadToFinish((ECKeyOutputStream) out.getOutputStream()); // Then let's mark datanodes with closed container List closedDNs = closedDNIndex @@ -899,6 +903,7 @@ private void testExcludeFailedDN(IntStream failedDNIndex, for (int i = 0; i < dataBlocks; i++) { out.write(inputChunks[i % dataBlocks]); } + waitForFlushingThreadToFinish((ECKeyOutputStream) out.getOutputStream()); // Assert excludeList only includes failedDNs Assert.assertArrayEquals(failedDNs.toArray(new DatanodeDetails[0]), @@ -909,7 +914,7 @@ private void testExcludeFailedDN(IntStream failedDNIndex, @Test public void testLargeWriteOfMultipleStripesWithStripeFailure() - throws IOException { + throws Exception { close(); OzoneConfiguration con = new OzoneConfiguration(); // block size of 3KB could hold 3 full stripes @@ -943,6 +948,7 @@ public void testLargeWriteOfMultipleStripesWithStripeFailure() out.write(inputChunks[i]); } } + waitForFlushingThreadToFinish((ECKeyOutputStream) out.getOutputStream()); List failedDNs = new ArrayList<>(); List dns = allocator.getClusterDns(); @@ -1055,7 +1061,7 @@ public void testPartialStripeWithPartialChunkRetry() @Test public void testDiscardPreAllocatedBlocksPreventRetryExceeds() - throws IOException { + throws Exception { close(); OzoneConfiguration con = new OzoneConfiguration(); int maxRetries = 3; @@ -1114,6 +1120,7 @@ public void testDiscardPreAllocatedBlocksPreventRetryExceeds() out.write(inputChunks[i]); } } + waitForFlushingThreadToFinish((ECKeyOutputStream) out.getOutputStream()); // Make the writes fail to trigger retry List failedDNs = new ArrayList<>(); @@ -1221,4 +1228,12 @@ private List getAllLocationInfoList( } return locationInfoList; } + + private static void waitForFlushingThreadToFinish( + ECKeyOutputStream ecOut) throws Exception { + final long checkpoint = System.currentTimeMillis(); + ecOut.insertFlushCheckpoint(checkpoint); + GenericTestUtils.waitFor(() -> ecOut.getFlushCheckpoint() == checkpoint, + 100, 10000); + } } diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestECKeyOutputStream.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestECKeyOutputStream.java index e53f943bef79..80131fdb67ef 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestECKeyOutputStream.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestECKeyOutputStream.java @@ -401,19 +401,25 @@ public void testWriteShouldSucceedWhenDNKilled() throws Exception { try (OzoneOutputStream out = bucket.createKey(keyName, 1024, new ECReplicationConfig(3, 2, ECReplicationConfig.EcCodec.RS, chunkSize), new HashMap<>())) { + ECKeyOutputStream ecOut = (ECKeyOutputStream) out.getOutputStream(); out.write(inputData); // Kill a node from first pipeline - nodeToKill = - ((ECKeyOutputStream) out.getOutputStream()).getStreamEntries() - .get(0).getPipeline().getFirstNode(); + nodeToKill = ecOut.getStreamEntries() + .get(0).getPipeline().getFirstNode(); cluster.shutdownHddsDatanode(nodeToKill); out.write(inputData); - // Check the second blockGroup pipeline to make sure that the failed not - // is not selected. - Assert.assertFalse( - ((ECKeyOutputStream) out.getOutputStream()).getStreamEntries() - .get(1).getPipeline().getNodes().contains(nodeToKill)); + + // Wait for flushing thread to finish its work. + final long checkpoint = System.currentTimeMillis(); + ecOut.insertFlushCheckpoint(checkpoint); + GenericTestUtils.waitFor(() -> ecOut.getFlushCheckpoint() == checkpoint, + 100, 10000); + + // Check the second blockGroup pipeline to make sure that the failed + // node is not selected. + Assert.assertFalse(ecOut.getStreamEntries() + .get(1).getPipeline().getNodes().contains(nodeToKill)); } try (OzoneInputStream is = bucket.readKey(keyName)) {