diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/ErrorInjector.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/ErrorInjector.java new file mode 100644 index 000000000000..35806967a5cd --- /dev/null +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/ErrorInjector.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdds.scm; + +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; +import org.apache.hadoop.hdds.scm.pipeline.Pipeline; +import org.apache.ratis.protocol.ClientId; +import org.apache.ratis.protocol.RaftClientReply; + +/** + * Client side error injector allowing simulating receiving errors from server side. + */ +@FunctionalInterface +public interface ErrorInjector { + RaftClientReply getResponse(ContainerProtos.ContainerCommandRequestProto request, ClientId id, Pipeline pipeline); +} diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientCreator.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientCreator.java index cd46bc49a1cb..75ae01c10058 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientCreator.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientCreator.java @@ -31,6 +31,12 @@ * Factory for XceiverClientSpi implementations. Client instances are not cached. */ public class XceiverClientCreator implements XceiverClientFactory { + private static ErrorInjector errorInjector; + + public static void enableErrorInjection(ErrorInjector injector) { + errorInjector = injector; + } + private final ConfigurationSource conf; private final boolean topologyAwareRead; private final ClientTrustManager trustManager; @@ -60,7 +66,7 @@ protected XceiverClientSpi newClient(Pipeline pipeline) throws IOException { XceiverClientSpi client; switch (pipeline.getType()) { case RATIS: - client = XceiverClientRatis.newXceiverClientRatis(pipeline, conf, trustManager); + client = XceiverClientRatis.newXceiverClientRatis(pipeline, conf, trustManager, errorInjector); break; case STAND_ALONE: client = new XceiverClientGrpc(pipeline, conf, trustManager); diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientManager.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientManager.java index 285a47ec574a..07b704417216 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientManager.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientManager.java @@ -61,6 +61,7 @@ public class XceiverClientManager extends XceiverClientCreator { private static final Logger LOG = LoggerFactory.getLogger(XceiverClientManager.class); + private final Cache clientCache; private final CacheMetrics cacheMetrics; diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java index eb0ed0a885cb..b0ef85cfbf7a 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java @@ -79,12 +79,12 @@ public final class XceiverClientRatis extends XceiverClientSpi { public static XceiverClientRatis newXceiverClientRatis( org.apache.hadoop.hdds.scm.pipeline.Pipeline pipeline, ConfigurationSource ozoneConf) { - return newXceiverClientRatis(pipeline, ozoneConf, null); + return newXceiverClientRatis(pipeline, ozoneConf, null, null); } public static XceiverClientRatis newXceiverClientRatis( org.apache.hadoop.hdds.scm.pipeline.Pipeline pipeline, - ConfigurationSource ozoneConf, ClientTrustManager trustManager) { + ConfigurationSource ozoneConf, ClientTrustManager trustManager, ErrorInjector errorInjector) { final String rpcType = ozoneConf .get(ScmConfigKeys.HDDS_CONTAINER_RATIS_RPC_TYPE_KEY, ScmConfigKeys.HDDS_CONTAINER_RATIS_RPC_TYPE_DEFAULT); @@ -93,7 +93,7 @@ public static XceiverClientRatis newXceiverClientRatis( SecurityConfig(ozoneConf), trustManager); return new XceiverClientRatis(pipeline, SupportedRpcType.valueOfIgnoreCase(rpcType), - retryPolicy, tlsConfig, ozoneConf); + retryPolicy, tlsConfig, ozoneConf, errorInjector); } private final Pipeline pipeline; @@ -110,13 +110,14 @@ public static XceiverClientRatis newXceiverClientRatis( = XceiverClientManager.getXceiverClientMetrics(); private final RaftProtos.ReplicationLevel watchType; private final int majority; + private final ErrorInjector errorInjector; /** * Constructs a client. */ private XceiverClientRatis(Pipeline pipeline, RpcType rpcType, RetryPolicy retryPolicy, GrpcTlsConfig tlsConfig, - ConfigurationSource configuration) { + ConfigurationSource configuration, ErrorInjector errorInjector) { super(); this.pipeline = pipeline; this.majority = (pipeline.getReplicationConfig().getRequiredNodes() / 2) + 1; @@ -142,6 +143,7 @@ private XceiverClientRatis(Pipeline pipeline, RpcType rpcType, LOG.trace("new XceiverClientRatis for pipeline " + pipeline.getId(), new Throwable("TRACE")); } + this.errorInjector = errorInjector; } private long updateCommitInfosMap(RaftClientReply reply, RaftProtos.ReplicationLevel level) { @@ -248,6 +250,12 @@ public ConcurrentMap getCommitInfoMap() { private CompletableFuture sendRequestAsync( ContainerCommandRequestProto request) { + if (errorInjector != null) { + RaftClientReply response = errorInjector.getResponse(request, getClient().getId(), pipeline); + if (response != null) { + return CompletableFuture.completedFuture(response); + } + } return TracingUtil.executeInNewSpan( "XceiverClientRatis." + request.getCmdType().name(), () -> { diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java index 43ac69818f91..e88b097c4990 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java @@ -102,7 +102,7 @@ public class BlockOutputStream extends OutputStream { = new AtomicReference<>(); private final BlockData.Builder containerBlockData; - private XceiverClientFactory xceiverClientFactory; + private volatile XceiverClientFactory xceiverClientFactory; private XceiverClientSpi xceiverClient; private OzoneClientConfig config; private StreamBufferArgs streamBufferArgs; @@ -216,7 +216,8 @@ public BlockOutputStream( this.token.encodeToUrlString(); //number of buffers used before doing a flush - refreshCurrentBuffer(); + currentBuffer = null; + currentBufferRemaining = 0; flushPeriod = (int) (streamBufferArgs.getStreamBufferFlushSize() / streamBufferArgs .getStreamBufferSize()); @@ -254,12 +255,6 @@ private boolean allDataNodesSupportPiggybacking() { return true; } - synchronized void refreshCurrentBuffer() { - currentBuffer = bufferPool.getCurrentBuffer(); - currentBufferRemaining = - currentBuffer != null ? currentBuffer.remaining() : 0; - } - public BlockID getBlockID() { return blockID.get(); } @@ -418,42 +413,44 @@ private void updatePutBlockLength() { * @param len length of data to write * @throws IOException if error occurred */ - - // In this case, the data is already cached in the currentBuffer. public synchronized void writeOnRetry(long len) throws IOException { if (len == 0) { return; } + + // In this case, the data from the failing (previous) block already cached in the allocated buffers in + // the BufferPool. For each pending buffers in the BufferPool, we sequentially flush it and wait synchronously. + + List allocatedBuffers = bufferPool.getAllocatedBuffers(); if (LOG.isDebugEnabled()) { - LOG.debug("Retrying write length {} for blockID {}", len, blockID); + LOG.debug("{}: Retrying write length {} on target blockID {}, {} buffers", this, len, blockID, + allocatedBuffers.size()); } Preconditions.checkArgument(len <= streamBufferArgs.getStreamBufferMaxSize()); int count = 0; - List allocatedBuffers = bufferPool.getAllocatedBuffers(); while (len > 0) { ChunkBuffer buffer = allocatedBuffers.get(count); long writeLen = Math.min(buffer.position(), len); - if (!buffer.hasRemaining()) { - writeChunk(buffer); - } len -= writeLen; count++; writtenDataLength += writeLen; - // we should not call isBufferFull/shouldFlush here. - // The buffer might already be full as whole data is already cached in - // the buffer. We should just validate - // if we wrote data of size streamBufferMaxSize/streamBufferFlushSize to - // call for handling full buffer/flush buffer condition. - if (writtenDataLength % streamBufferArgs.getStreamBufferFlushSize() == 0) { - // reset the position to zero as now we will be reading the - // next buffer in the list - updateWriteChunkLength(); - updatePutBlockLength(); - CompletableFuture putBlockResultFuture = executePutBlock(false, false); - recordWatchForCommitAsync(putBlockResultFuture); + updateWriteChunkLength(); + updatePutBlockLength(); + LOG.debug("Write chunk on retry buffer = {}", buffer); + CompletableFuture putBlockFuture; + if (allowPutBlockPiggybacking) { + putBlockFuture = writeChunkAndPutBlock(buffer, false); + } else { + writeChunk(buffer); + putBlockFuture = executePutBlock(false, false); } - if (writtenDataLength == streamBufferArgs.getStreamBufferMaxSize()) { - handleFullBuffer(); + CompletableFuture watchForCommitAsync = watchForCommitAsync(putBlockFuture); + try { + watchForCommitAsync.get(); + } catch (InterruptedException e) { + handleInterruptedException(e, true); + } catch (ExecutionException e) { + handleExecutionException(e); } } } @@ -479,14 +476,6 @@ private void handleFullBuffer() throws IOException { void releaseBuffersOnException() { } - // It may happen that once the exception is encountered , we still might - // have successfully flushed up to a certain index. Make sure the buffers - // only contain data which have not been sufficiently replicated - private void adjustBuffersOnException() { - releaseBuffersOnException(); - refreshCurrentBuffer(); - } - /** * Watch for a specific commit index. */ @@ -633,6 +622,9 @@ private CompletableFuture writeChunkAndPutBlock(ChunkBuffer buff protected void handleFlush(boolean close) throws IOException { try { handleFlushInternal(close); + if (close) { + waitForAllPendingFlushes(); + } } catch (ExecutionException e) { handleExecutionException(e); } catch (InterruptedException ex) { @@ -675,6 +667,17 @@ private void handleFlushInternal(boolean close) } } + public void waitForAllPendingFlushes() throws IOException { + // When closing, must wait for all flush futures to complete. + try { + allPendingFlushFutures.get(); + } catch (InterruptedException e) { + handleInterruptedException(e, true); + } catch (ExecutionException e) { + handleExecutionException(e); + } + } + private synchronized CompletableFuture handleFlushInternalSynchronized(boolean close) throws IOException { CompletableFuture putBlockResultFuture = null; // flush the last chunk data residing on the currentBuffer @@ -740,6 +743,7 @@ public void close() throws IOException { // Preconditions.checkArgument(buffer.position() == 0); // bufferPool.checkBufferPoolEmpty(); } else { + waitForAllPendingFlushes(); cleanup(false); } } @@ -783,7 +787,7 @@ public void setIoException(Exception e) { void cleanup() { } - public void cleanup(boolean invalidateClient) { + public synchronized void cleanup(boolean invalidateClient) { if (xceiverClientFactory != null) { xceiverClientFactory.releaseClient(xceiverClient, invalidateClient); } @@ -811,7 +815,6 @@ void checkOpen() throws IOException { if (isClosed()) { throw new IOException("BlockOutputStream has been closed."); } else if (getIoException() != null) { - adjustBuffersOnException(); throw getIoException(); } } @@ -1148,7 +1151,6 @@ void handleInterruptedException(Exception ex, */ private void handleExecutionException(Exception ex) throws IOException { setIoException(ex); - adjustBuffersOnException(); throw getIoException(); } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java index 28b9e151ff36..b3398de07ad3 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java @@ -581,7 +581,10 @@ private CompletableFuture writeStateMachineData( writeChunkFuture.thenApply(r -> { if (r.getResult() != ContainerProtos.Result.SUCCESS && r.getResult() != ContainerProtos.Result.CONTAINER_NOT_OPEN - && r.getResult() != ContainerProtos.Result.CLOSED_CONTAINER_IO) { + && r.getResult() != ContainerProtos.Result.CLOSED_CONTAINER_IO + // After concurrent flushes are allowed on the same key, chunk file inconsistencies can happen and + // that should not crash the pipeline. + && r.getResult() != ContainerProtos.Result.CHUNK_FILE_INCONSISTENCY) { StorageContainerException sce = new StorageContainerException(r.getMessage(), r.getResult()); LOG.error(gid + ": writeChunk writeStateMachineData failed: blockId" + @@ -1061,7 +1064,8 @@ public CompletableFuture applyTransaction(TransactionContext trx) { // unhealthy if (r.getResult() != ContainerProtos.Result.SUCCESS && r.getResult() != ContainerProtos.Result.CONTAINER_NOT_OPEN - && r.getResult() != ContainerProtos.Result.CLOSED_CONTAINER_IO) { + && r.getResult() != ContainerProtos.Result.CLOSED_CONTAINER_IO + && r.getResult() != ContainerProtos.Result.CHUNK_FILE_INCONSISTENCY) { StorageContainerException sce = new StorageContainerException(r.getMessage(), r.getResult()); LOG.error( diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntry.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntry.java index 5e6ecceefa1e..2ae3e4755315 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntry.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntry.java @@ -22,6 +22,8 @@ import java.util.Collection; import java.util.Collections; import java.util.concurrent.ExecutorService; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.Condition; import java.util.function.Supplier; import org.apache.hadoop.fs.Syncable; @@ -41,6 +43,8 @@ import com.google.common.annotations.VisibleForTesting; import org.apache.ratis.util.JavaUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * A BlockOutputStreamEntry manages the data writes into the DataNodes. @@ -51,9 +55,9 @@ * but there can be other implementations that are using a different way. */ public class BlockOutputStreamEntry extends OutputStream { - + public static final Logger LOG = LoggerFactory.getLogger(BlockOutputStreamEntry.class); private final OzoneClientConfig config; - private OutputStream outputStream; + private BlockOutputStream outputStream; private BlockID blockID; private final String key; private final XceiverClientFactory xceiverClientManager; @@ -69,6 +73,18 @@ public class BlockOutputStreamEntry extends OutputStream { private final StreamBufferArgs streamBufferArgs; private final Supplier executorServiceSupplier; + /** + * An indicator that this BlockOutputStream is created to handoff writes from another faulty BlockOutputStream. + * Once this flag is on, this BlockOutputStream can only handle writeOnRetry. + */ + private volatile boolean isHandlingRetry; + + /** + * To record how many calls(write, flush) are being handled by this block. + */ + private AtomicInteger inflightCalls = new AtomicInteger(); + + BlockOutputStreamEntry(Builder b) { this.config = b.config; this.outputStream = null; @@ -83,6 +99,7 @@ public class BlockOutputStreamEntry extends OutputStream { this.clientMetrics = b.clientMetrics; this.streamBufferArgs = b.streamBufferArgs; this.executorServiceSupplier = b.executorServiceSupplier; + this.isHandlingRetry = b.forRetry; } @Override @@ -102,6 +119,37 @@ void checkStream() throws IOException { } } + /** Register when a call (write or flush) is received on this block. */ + void registerCallReceived() { + inflightCalls.incrementAndGet(); + } + + /** + * Register when a call (write or flush) is finished on this block. + * @return true if all the calls are done. + */ + boolean registerCallFinished() { + return inflightCalls.decrementAndGet() == 0; + } + + void waitForRetryHandling(Condition retryHandlingCond) throws InterruptedException { + while (isHandlingRetry) { + LOG.info("{} : Block to wait for retry handling.", this); + retryHandlingCond.await(); + LOG.info("{} : Done waiting for retry handling.", this); + } + } + + void finishRetryHandling(Condition retryHandlingCond) { + LOG.info("{}: Exiting retry handling mode", this); + isHandlingRetry = false; + retryHandlingCond.signalAll(); + } + + void waitForAllPendingFlushes() throws IOException { + outputStream.waitForAllPendingFlushes(); + } + /** * Creates the outputStreams that are necessary to start the write. * Implementors can override this to instantiate multiple streams instead. @@ -144,6 +192,7 @@ void writeOnRetry(long len) throws IOException { BlockOutputStream out = (BlockOutputStream) getOutputStream(); out.writeOnRetry(len); incCurrentPosition(len); + LOG.info("{}: Finish retrying with len {}, currentPosition {}", this, len, currentPosition); } @Override @@ -368,6 +417,7 @@ public static class Builder { private ContainerClientMetrics clientMetrics; private StreamBufferArgs streamBufferArgs; private Supplier executorServiceSupplier; + private boolean forRetry; public Pipeline getPipeline() { return pipeline; @@ -433,6 +483,11 @@ public Builder setExecutorServiceSupplier(Supplier executorServ return this; } + public Builder setForRetry(boolean forRetry) { + this.forRetry = forRetry; + return this; + } + public BlockOutputStreamEntry build() { return new BlockOutputStreamEntry(this); } diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntryPool.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntryPool.java index 99899c6874e7..3705a1363771 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntryPool.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntryPool.java @@ -141,7 +141,7 @@ public synchronized void addPreallocateBlocks(OmKeyLocationInfoGroup version, lo // only the blocks allocated in this open session (block createVersion // equals to open session version) for (OmKeyLocationInfo subKeyInfo : version.getLocationList(openVersion)) { - addKeyLocationInfo(subKeyInfo); + addKeyLocationInfo(subKeyInfo, false); } } @@ -154,7 +154,7 @@ public synchronized void addPreallocateBlocks(OmKeyLocationInfoGroup version, lo * key to be written. * @return a BlockOutputStreamEntry instance that handles how data is written. */ - BlockOutputStreamEntry createStreamEntry(OmKeyLocationInfo subKeyInfo) { + BlockOutputStreamEntry createStreamEntry(OmKeyLocationInfo subKeyInfo, boolean forRetry) { return new BlockOutputStreamEntry.Builder() .setBlockID(subKeyInfo.getBlockID()) @@ -168,12 +168,13 @@ BlockOutputStreamEntry createStreamEntry(OmKeyLocationInfo subKeyInfo) { .setClientMetrics(clientMetrics) .setStreamBufferArgs(streamBufferArgs) .setExecutorServiceSupplier(executorServiceSupplier) + .setForRetry(forRetry) .build(); } - private synchronized void addKeyLocationInfo(OmKeyLocationInfo subKeyInfo) { + private synchronized void addKeyLocationInfo(OmKeyLocationInfo subKeyInfo, boolean forRetry) { Preconditions.checkNotNull(subKeyInfo.getPipeline()); - streamEntries.add(createStreamEntry(subKeyInfo)); + streamEntries.add(createStreamEntry(subKeyInfo, forRetry)); } /** @@ -295,13 +296,13 @@ synchronized long getKeyLength() { * * @throws IOException */ - private void allocateNewBlock() throws IOException { + private void allocateNewBlock(boolean forRetry) throws IOException { if (!excludeList.isEmpty()) { LOG.debug("Allocating block with {}", excludeList); } OmKeyLocationInfo subKeyInfo = omClient.allocateBlock(keyArgs, openID, excludeList); - addKeyLocationInfo(subKeyInfo); + addKeyLocationInfo(subKeyInfo, forRetry); } /** @@ -379,7 +380,7 @@ BlockOutputStreamEntry getCurrentStreamEntry() { * @return the new current open stream to write to * @throws IOException if the block allocation failed. */ - synchronized BlockOutputStreamEntry allocateBlockIfNeeded() throws IOException { + synchronized BlockOutputStreamEntry allocateBlockIfNeeded(boolean forRetry) throws IOException { BlockOutputStreamEntry streamEntry = getCurrentStreamEntry(); if (streamEntry != null && streamEntry.isClosed()) { // a stream entry gets closed either by : @@ -391,7 +392,7 @@ synchronized BlockOutputStreamEntry allocateBlockIfNeeded() throws IOException { Preconditions.checkNotNull(omClient); // allocate a new block, if a exception happens, log an error and // throw exception to the caller directly, and the write fails. - allocateNewBlock(); + allocateNewBlock(forRetry); } // in theory, this condition should never violate due the check above // still do a sanity check. diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockOutputStreamEntryPool.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockOutputStreamEntryPool.java index 6eb9aed0d3ad..f891724270e4 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockOutputStreamEntryPool.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockOutputStreamEntryPool.java @@ -37,7 +37,7 @@ public ECBlockOutputStreamEntryPool(ECKeyOutputStream.Builder builder) { } @Override - ECBlockOutputStreamEntry createStreamEntry(OmKeyLocationInfo subKeyInfo) { + ECBlockOutputStreamEntry createStreamEntry(OmKeyLocationInfo subKeyInfo, boolean forRetry) { final ECBlockOutputStreamEntry.Builder b = new ECBlockOutputStreamEntry.Builder(); b.setBlockID(subKeyInfo.getBlockID()) .setKey(getKeyName()) diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECKeyOutputStream.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECKeyOutputStream.java index 0de61f8485d3..ea3a3592a556 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECKeyOutputStream.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECKeyOutputStream.java @@ -315,7 +315,7 @@ private void generateParityCells() throws IOException { private void writeDataCells(ECChunkBuffers stripe) throws IOException { final ECBlockOutputStreamEntryPool blockOutputStreamEntryPool = getBlockOutputStreamEntryPool(); - blockOutputStreamEntryPool.allocateBlockIfNeeded(); + blockOutputStreamEntryPool.allocateBlockIfNeeded(false); ByteBuffer[] dataCells = stripe.getDataBuffers(); for (int i = 0; i < numDataBlks; i++) { if (dataCells[i].limit() > 0) { diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java index 549607c59ad1..4f9e5db49a92 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java @@ -25,6 +25,9 @@ import java.util.Map; import java.util.UUID; import java.util.concurrent.ExecutorService; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; import java.util.function.Function; import java.util.function.Supplier; import java.util.stream.Collectors; @@ -58,6 +61,7 @@ import com.google.common.base.Preconditions; import org.apache.ratis.protocol.exceptions.AlreadyClosedException; import org.apache.ratis.protocol.exceptions.RaftRetryFailureException; +import org.apache.ratis.util.function.CheckedRunnable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -109,14 +113,28 @@ enum StreamAction { private boolean atomicKeyCreation; private ContainerClientMetrics clientMetrics; private OzoneManagerVersion ozoneManagerVersion; + private final Lock writeLock = new ReentrantLock(); + private final Condition retryHandlingCondition = writeLock.newCondition(); private final int maxConcurrentWritePerKey; private final KeyOutputStreamSemaphore keyOutputStreamSemaphore; + @VisibleForTesting KeyOutputStreamSemaphore getRequestSemaphore() { return keyOutputStreamSemaphore; } + /** Required to spy the object in testing. */ + @VisibleForTesting + @SuppressWarnings("unused") + KeyOutputStream() { + maxConcurrentWritePerKey = 0; + keyOutputStreamSemaphore = null; + blockOutputStreamEntryPool = null; + retryPolicyMap = null; + replication = null; + } + public KeyOutputStream(ReplicationConfig replicationConfig, BlockOutputStreamEntryPool blockOutputStreamEntryPool) { this.replication = replicationConfig; closed = false; @@ -187,7 +205,7 @@ public KeyOutputStream(Builder b) { * @param version the set of blocks that are pre-allocated. * @param openVersion the version corresponding to the pre-allocation. */ - public synchronized void addPreallocateBlocks(OmKeyLocationInfoGroup version, long openVersion) { + public void addPreallocateBlocks(OmKeyLocationInfoGroup version, long openVersion) { blockOutputStreamEntryPool.addPreallocateBlocks(version, openVersion); } @@ -227,22 +245,31 @@ public void write(byte[] b, int off, int len) return; } - synchronized (this) { + doInWriteLock(() -> { handleWrite(b, off, len, false); writeOffset += len; - } + }); } finally { getRequestSemaphore().release(); } } + private void doInWriteLock(CheckedRunnable block) throws E { + writeLock.lock(); + try { + block.run(); + } finally { + writeLock.unlock(); + } + } + @VisibleForTesting void handleWrite(byte[] b, int off, long len, boolean retry) throws IOException { while (len > 0) { try { BlockOutputStreamEntry current = - blockOutputStreamEntryPool.allocateBlockIfNeeded(); + blockOutputStreamEntryPool.allocateBlockIfNeeded(retry); // length(len) will be in int range if the call is happening through // write API of blockOutputStream. Length can be in long range if it // comes via Exception path. @@ -272,12 +299,18 @@ private int writeToOutputStream(BlockOutputStreamEntry current, boolean retry, long len, byte[] b, int writeLen, int off, long currentPos) throws IOException { try { + current.registerCallReceived(); if (retry) { current.writeOnRetry(len); } else { + waitForRetryHandling(current); current.write(b, off, writeLen); offset += writeLen; } + current.registerCallFinished(); + } catch (InterruptedException e) { + current.registerCallFinished(); + throw new InterruptedIOException(); } catch (IOException ioe) { // for the current iteration, totalDataWritten - currentPos gives the // amount of data already written to the buffer @@ -295,11 +328,24 @@ private int writeToOutputStream(BlockOutputStreamEntry current, offset += writeLen; } LOG.debug("writeLen {}, total len {}", writeLen, len); - handleException(current, ioe); + handleException(current, ioe, retry); } return writeLen; } + private void handleException(BlockOutputStreamEntry entry, IOException exception, boolean fromRetry) + throws IOException { + doInWriteLock(() -> { + handleExceptionInternal(entry, exception); + BlockOutputStreamEntry current = blockOutputStreamEntryPool.getCurrentStreamEntry(); + if (!fromRetry && entry.registerCallFinished()) { + // When the faulty block finishes handling all its pending call, the current block can exit retry + // handling mode and unblock normal calls. + current.finishRetryHandling(retryHandlingCondition); + } + }); + } + /** * It performs following actions : * a. Updates the committed length at datanode for the current stream in @@ -310,8 +356,15 @@ private int writeToOutputStream(BlockOutputStreamEntry current, * @param exception actual exception that occurred * @throws IOException Throws IOException if Write fails */ - private synchronized void handleException(BlockOutputStreamEntry streamEntry, - IOException exception) throws IOException { + private void handleExceptionInternal(BlockOutputStreamEntry streamEntry, IOException exception) throws IOException { + try { + // Wait for all pending flushes in the faulty stream. It's possible that a prior write is pending completion + // successfully. Errors are ignored here and will be handled by the individual flush call. We just want to ensure + // all the pending are complete before handling exception. + streamEntry.waitForAllPendingFlushes(); + } catch (IOException ignored) { + } + Throwable t = HddsClientUtils.checkForException(exception); Preconditions.checkNotNull(t); boolean retryFailure = checkForRetryFailure(t); @@ -338,8 +391,6 @@ private synchronized void handleException(BlockOutputStreamEntry streamEntry, } Preconditions.checkArgument( bufferedDataLen <= streamBufferArgs.getStreamBufferMaxSize()); - Preconditions.checkArgument( - offset - blockOutputStreamEntryPool.getKeyLength() == bufferedDataLen); long containerId = streamEntry.getBlockID().getContainerID(); Collection failedServers = streamEntry.getFailedServers(); Preconditions.checkNotNull(failedServers); @@ -498,12 +549,12 @@ public void hsync() throws IOException { final long hsyncPos = writeOffset; handleFlushOrClose(StreamAction.HSYNC); - synchronized (this) { + doInWriteLock(() -> { Preconditions.checkState(offset >= hsyncPos, "offset = %s < hsyncPos = %s", offset, hsyncPos); MetricUtil.captureLatencyNs(clientMetrics::addHsyncLatency, () -> blockOutputStreamEntryPool.hsyncKey(hsyncPos)); - } + }); } finally { getRequestSemaphore().release(); } @@ -532,14 +583,23 @@ private void handleFlushOrClose(StreamAction op) throws IOException { BlockOutputStreamEntry entry = blockOutputStreamEntryPool.getCurrentStreamEntry(); if (entry != null) { + // If the current block is to handle retries, wait until all the retries are done. + waitForRetryHandling(entry); + entry.registerCallReceived(); try { handleStreamAction(entry, op); + entry.registerCallFinished(); } catch (IOException ioe) { - handleException(entry, ioe); + handleException(entry, ioe, false); continue; + } catch (Exception e) { + entry.registerCallFinished(); + throw e; } } return; + } catch (InterruptedException e) { + throw new InterruptedIOException(); } catch (Exception e) { markStreamClosed(); throw e; @@ -548,6 +608,10 @@ private void handleFlushOrClose(StreamAction op) throws IOException { } } + private void waitForRetryHandling(BlockOutputStreamEntry currentEntry) throws InterruptedException { + doInWriteLock(() -> currentEntry.waitForRetryHandling(retryHandlingCondition)); + } + private void handleStreamAction(BlockOutputStreamEntry entry, StreamAction op) throws IOException { Collection failedServers = entry.getFailedServers(); @@ -583,7 +647,11 @@ private void handleStreamAction(BlockOutputStreamEntry entry, * @throws IOException */ @Override - public synchronized void close() throws IOException { + public void close() throws IOException { + doInWriteLock(this::closeInternal); + } + + private void closeInternal() throws IOException { if (closed) { return; } @@ -781,7 +849,7 @@ public KeyOutputStream build() { * the last state of the volatile {@link #closed} field. * @throws IOException if the connection is closed. */ - private synchronized void checkNotClosed() throws IOException { + private void checkNotClosed() throws IOException { if (closed) { throw new IOException( ": " + FSExceptionMessages.STREAM_IS_CLOSED + " Key: " diff --git a/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/io/TestKeyOutputStream.java b/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/io/TestKeyOutputStream.java index 6b6abceff36a..d90a335321b1 100644 --- a/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/io/TestKeyOutputStream.java +++ b/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/io/TestKeyOutputStream.java @@ -34,7 +34,7 @@ import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.Mockito.doAnswer; -import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -54,7 +54,7 @@ static void init() { void testConcurrentWriteLimitOne() throws Exception { // Verify the semaphore is working to limit the number of concurrent writes allowed. KeyOutputStreamSemaphore sema1 = new KeyOutputStreamSemaphore(1); - KeyOutputStream keyOutputStream = mock(KeyOutputStream.class); + KeyOutputStream keyOutputStream = spy(KeyOutputStream.class); when(keyOutputStream.getRequestSemaphore()).thenReturn(sema1); final AtomicInteger countWrite = new AtomicInteger(0); diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestHSync.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestHSync.java index f27ebaa97274..49b515d53c57 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestHSync.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestHSync.java @@ -45,6 +45,9 @@ import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileAlreadyExistsException; import org.apache.hadoop.hdds.conf.StorageUnit; +import org.apache.hadoop.hdds.scm.ErrorInjector; +import org.apache.hadoop.hdds.scm.XceiverClientManager; +import org.apache.hadoop.hdds.scm.pipeline.Pipeline; import org.apache.hadoop.hdds.scm.storage.BlockOutputStream; import org.apache.hadoop.hdds.scm.storage.BufferPool; import org.apache.hadoop.hdds.utils.IOUtils; @@ -95,6 +98,11 @@ import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.util.Time; import org.apache.ozone.test.GenericTestUtils; +import org.apache.ratis.protocol.ClientId; +import org.apache.ratis.protocol.Message; +import org.apache.ratis.protocol.RaftClientReply; +import org.apache.ratis.protocol.RaftGroupId; +import org.apache.ratis.protocol.RaftPeerId; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.MethodOrderer.OrderAnnotation; @@ -518,6 +526,7 @@ private List getOpenKeyInfo(BucketLayout bucketLayout) { @Test public void testUncommittedBlocks() throws Exception { + waitForEmptyDeletedTable(); // Set the fs.defaultFS final String rootPath = String.format("%s://%s/", OZONE_OFS_URI_SCHEME, CONF.get(OZONE_OM_ADDRESS_KEY)); @@ -698,6 +707,99 @@ static void runTestHSync(FileSystem fs, Path file, }, 500, 3000); } + + public static Stream concurrentExceptionHandling() { + return Stream.of( + Arguments.of(4, 1), + Arguments.of(4, 4), + Arguments.of(8, 4) + ); + } + + @ParameterizedTest + @MethodSource("concurrentExceptionHandling") + public void testConcurrentExceptionHandling(int syncerThreads, int errors) throws Exception { + final String rootPath = String.format("%s://%s/", OZONE_OFS_URI_SCHEME, CONF.get(OZONE_OM_ADDRESS_KEY)); + CONF.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY, rootPath); + + ErrorInjectorImpl errorInjector = new ErrorInjectorImpl(); + XceiverClientManager.enableErrorInjection(errorInjector); + + final String dir = OZONE_ROOT + bucket.getVolumeName() + OZONE_URI_DELIMITER + bucket.getName(); + + try (FileSystem fs = FileSystem.get(CONF)) { + final Path file = new Path(dir, "exceptionhandling"); + byte[] data = new byte[8]; + ThreadLocalRandom.current().nextBytes(data); + int writes; + try (FSDataOutputStream out = fs.create(file, true)) { + writes = runConcurrentWriteHSyncWithException(file, out, data, syncerThreads, errors, errorInjector); + } + validateWrittenFile(file, fs, data, writes); + fs.delete(file, false); + } + } + + private int runConcurrentWriteHSyncWithException(Path file, + final FSDataOutputStream out, byte[] data, int syncThreadsCount, int errors, + ErrorInjectorImpl errorInjector) throws Exception { + + AtomicReference writerException = new AtomicReference<>(); + AtomicReference syncerException = new AtomicReference<>(); + + LOG.info("runConcurrentWriteHSyncWithException {} with size {}", file, data.length); + AtomicInteger writes = new AtomicInteger(); + final long start = Time.monotonicNow(); + + Runnable syncer = () -> { + while ((Time.monotonicNow() - start < 10000)) { + try { + out.write(data); + writes.incrementAndGet(); + out.hsync(); + } catch (Exception e) { + LOG.error("Error calling hsync", e); + syncerException.compareAndSet(null, e); + throw new RuntimeException(e); + } + } + }; + + Thread[] syncThreads = new Thread[syncThreadsCount]; + for (int i = 0; i < syncThreadsCount; i++) { + syncThreads[i] = new Thread(syncer); + syncThreads[i].setName("Syncer-" + i); + syncThreads[i].start(); + } + + // Inject error at 3rd second. + Runnable startErrorInjector = () -> { + while ((Time.monotonicNow() - start <= 3000)) { + try { + Thread.sleep(10); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + errorInjector.start(errors); + LOG.info("Enabled error injection in XceiverClientRatis"); + }; + + new Thread(startErrorInjector).start(); + + for (Thread sync : syncThreads) { + sync.join(); + } + + if (syncerException.get() != null) { + throw syncerException.get(); + } + if (writerException.get() != null) { + throw writerException.get(); + } + return writes.get(); + } + private int runConcurrentWriteHSync(Path file, final FSDataOutputStream out, byte[] data, int syncThreadsCount) throws Exception { @@ -1320,4 +1422,33 @@ private Map getAllDeletedKeys(Table= 0) { + ContainerProtos.ContainerCommandResponseProto proto = ContainerProtos.ContainerCommandResponseProto.newBuilder() + .setResult(ContainerProtos.Result.CLOSED_CONTAINER_IO) + .setMessage("Simulated error #" + errorNum) + .setCmdType(request.getCmdType()) + .build(); + RaftClientReply reply = RaftClientReply.newBuilder() + .setSuccess(true) + .setMessage(Message.valueOf(proto.toByteString())) + .setClientId(clientId) + .setServerId(RaftPeerId.getRaftPeerId(pipeline.getLeaderId().toString())) + .setGroupId(RaftGroupId.randomId()) + .build(); + return reply; + } + + return null; + } + } }