diff --git a/hadoop-hdds/client/pom.xml b/hadoop-hdds/client/pom.xml index 4e75e42d9847..482c06732c1e 100644 --- a/hadoop-hdds/client/pom.xml +++ b/hadoop-hdds/client/pom.xml @@ -66,6 +66,10 @@ https://maven.apache.org/xsd/maven-4.0.0.xsd"> ${spotbugs.version} provided + + io.netty + netty-buffer + diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java index 6982d41fbce5..f37cd1c94ae8 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java @@ -53,6 +53,7 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import org.apache.ratis.client.RaftClient; +import org.apache.ratis.client.api.DataStreamApi; import org.apache.ratis.grpc.GrpcTlsConfig; import org.apache.ratis.proto.RaftProtos; import org.apache.ratis.protocol.exceptions.GroupMismatchException; @@ -359,4 +360,8 @@ public XceiverClientReply sendCommandAsync( throw new UnsupportedOperationException( "Operation Not supported for ratis client"); } + + public DataStreamApi getDataStreamApi() { + return this.getClient().getDataStreamApi(); + } } diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockDataStreamOutput.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockDataStreamOutput.java new file mode 100644 index 000000000000..f658df1af9c2 --- /dev/null +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockDataStreamOutput.java @@ -0,0 +1,760 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdds.scm.storage; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import io.netty.buffer.ByteBuf; +import org.apache.hadoop.hdds.client.BlockID; +import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.BlockData; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChunkInfo; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.KeyValue; +import org.apache.hadoop.hdds.ratis.ContainerCommandRequestMessage; +import org.apache.hadoop.hdds.scm.OzoneClientConfig; +import org.apache.hadoop.hdds.scm.XceiverClientFactory; +import org.apache.hadoop.hdds.scm.XceiverClientRatis; +import org.apache.hadoop.hdds.scm.XceiverClientReply; +import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException; +import org.apache.hadoop.hdds.scm.pipeline.Pipeline; +import org.apache.hadoop.ozone.common.Checksum; +import org.apache.hadoop.ozone.common.ChecksumData; +import org.apache.hadoop.ozone.common.ChunkBuffer; +import org.apache.hadoop.ozone.common.OzoneChecksumException; +import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.security.token.TokenIdentifier; +import org.apache.ratis.client.api.DataStreamOutput; +import org.apache.ratis.io.StandardWriteOption; +import org.apache.ratis.protocol.DataStreamReply; +import org.apache.ratis.thirdparty.com.google.protobuf.ByteString; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; + +import static org.apache.hadoop.hdds.scm.storage.ContainerProtocolCalls.putBlockAsync; + +/** + * An {@link ByteBufStreamOutput} used by the REST service in combination + * with the SCMClient to write the value of a key to a sequence + * of container chunks. Writes are buffered locally and periodically written to + * the container as a new chunk. In order to preserve the semantics that + * replacement of a pre-existing key is atomic, each instance of the stream has + * an internal unique identifier. This unique identifier and a monotonically + * increasing chunk index form a composite key that is used as the chunk name. + * After all data is written, a putKey call creates or updates the corresponding + * container key, and this call includes the full list of chunks that make up + * the key data. The list of chunks is updated all at once. Therefore, a + * concurrent reader never can see an intermediate state in which different + * chunks of data from different versions of the key data are interleaved. + * This class encapsulates all state management for buffering and writing + * through to the container. + */ +public class BlockDataStreamOutput implements ByteBufStreamOutput { + public static final Logger LOG = + LoggerFactory.getLogger(BlockDataStreamOutput.class); + public static final String EXCEPTION_MSG = + "Unexpected Storage Container Exception: "; + private static final CompletableFuture[] EMPTY_FUTURE_ARRAY = {}; + + private AtomicReference blockID; + + private final BlockData.Builder containerBlockData; + private XceiverClientFactory xceiverClientFactory; + private XceiverClientRatis xceiverClient; + private OzoneClientConfig config; + + private int chunkIndex; + private final AtomicLong chunkOffset = new AtomicLong(); + private final BufferPool bufferPool; + // The IOException will be set by response handling thread in case there is an + // exception received in the response. If the exception is set, the next + // request will fail upfront. + private final AtomicReference ioException; + private final ExecutorService responseExecutor; + + // the effective length of data flushed so far + private long totalDataFlushedLength; + + // effective data write attempted so far for the block + private long writtenDataLength; + + // List containing buffers for which the putBlock call will + // update the length in the datanodes. This list will just maintain + // references to the buffers in the BufferPool which will be cleared + // when the watchForCommit acknowledges a putBlock logIndex has been + // committed on all datanodes. This list will be a place holder for buffers + // which got written between successive putBlock calls. + private List bufferList; + + // This object will maintain the commitIndexes and byteBufferList in order + // Also, corresponding to the logIndex, the corresponding list of buffers will + // be released from the buffer pool. + private final CommitWatcher commitWatcher; + + private final List failedServers; + private final Checksum checksum; + + //number of buffers used before doing a flush/putBlock. + private int flushPeriod; + //bytes remaining to write in the current buffer. + private int currentBufferRemaining; + //current buffer allocated to write + private ChunkBuffer currentBuffer; + private final Token token; + private final DataStreamOutput out; + private CompletableFuture dataStreamCloseReply; + private List> futures = new ArrayList<>(); + private final long syncSize = 0; // TODO: disk sync is disabled for now + private long syncPosition = 0; + + /** + * Creates a new BlockDataStreamOutput. + * + * @param blockID block ID + * @param xceiverClientManager client manager that controls client + * @param pipeline pipeline where block will be written + * @param bufferPool pool of buffers + */ + public BlockDataStreamOutput( + BlockID blockID, + XceiverClientFactory xceiverClientManager, + Pipeline pipeline, + BufferPool bufferPool, + OzoneClientConfig config, + Token token + ) throws IOException { + this.xceiverClientFactory = xceiverClientManager; + this.config = config; + this.blockID = new AtomicReference<>(blockID); + KeyValue keyValue = + KeyValue.newBuilder().setKey("TYPE").setValue("KEY").build(); + this.containerBlockData = + BlockData.newBuilder().setBlockID(blockID.getDatanodeBlockIDProtobuf()) + .addMetadata(keyValue); + this.xceiverClient = + (XceiverClientRatis)xceiverClientManager.acquireClient(pipeline); + // Alternatively, stream setup can be delayed till the first chunk write. + this.out = setupStream(); + this.bufferPool = bufferPool; + this.token = token; + + //number of buffers used before doing a flush + refreshCurrentBuffer(bufferPool); + flushPeriod = (int) (config.getStreamBufferFlushSize() / config + .getStreamBufferSize()); + + Preconditions + .checkArgument( + (long) flushPeriod * config.getStreamBufferSize() == config + .getStreamBufferFlushSize()); + + // A single thread executor handle the responses of async requests + responseExecutor = Executors.newSingleThreadExecutor(); + commitWatcher = new CommitWatcher(bufferPool, xceiverClient); + bufferList = null; + totalDataFlushedLength = 0; + writtenDataLength = 0; + failedServers = new ArrayList<>(0); + ioException = new AtomicReference<>(null); + checksum = new Checksum(config.getChecksumType(), + config.getBytesPerChecksum()); + } + + private DataStreamOutput setupStream() throws IOException { + // Execute a dummy WriteChunk request to get the path of the target file, + // but does NOT write any data to it. + ContainerProtos.WriteChunkRequestProto.Builder writeChunkRequest = + ContainerProtos.WriteChunkRequestProto.newBuilder() + .setBlockID(blockID.get().getDatanodeBlockIDProtobuf()); + + String id = xceiverClient.getPipeline().getFirstNode().getUuidString(); + ContainerProtos.ContainerCommandRequestProto.Builder builder = + ContainerProtos.ContainerCommandRequestProto.newBuilder() + .setCmdType(ContainerProtos.Type.StreamInit) + .setContainerID(blockID.get().getContainerID()) + .setDatanodeUuid(id).setWriteChunk(writeChunkRequest); + + ContainerCommandRequestMessage message = + ContainerCommandRequestMessage.toMessage(builder.build(), null); + + return Preconditions.checkNotNull(xceiverClient.getDataStreamApi()) + .stream(message.getContent().asReadOnlyByteBuffer()); + } + + private void refreshCurrentBuffer(BufferPool pool) { + currentBuffer = pool.getCurrentBuffer(); + currentBufferRemaining = + currentBuffer != null ? currentBuffer.remaining() : 0; + } + + public BlockID getBlockID() { + return blockID.get(); + } + + public long getTotalAckDataLength() { + return commitWatcher.getTotalAckDataLength(); + } + + public long getWrittenDataLength() { + return writtenDataLength; + } + + public List getFailedServers() { + return failedServers; + } + + @VisibleForTesting + public XceiverClientRatis getXceiverClient() { + return xceiverClient; + } + + @VisibleForTesting + public long getTotalDataFlushedLength() { + return totalDataFlushedLength; + } + + @VisibleForTesting + public BufferPool getBufferPool() { + return bufferPool; + } + + public IOException getIoException() { + return ioException.get(); + } + + @VisibleForTesting + public Map> getCommitIndex2flushedDataMap() { + return commitWatcher.getCommitIndex2flushedDataMap(); + } + + @Override + public void write(ByteBuf b) throws IOException { + checkOpen(); + if (b == null) { + throw new NullPointerException(); + } + int off = b.readerIndex(); + int len = b.readableBytes(); + + while (len > 0) { + allocateNewBufferIfNeeded(); + final int writeLen = Math.min(currentBufferRemaining, len); + // TODO: avoid buffer copy here + currentBuffer.put(b.nioBuffer(off, writeLen)); + currentBufferRemaining -= writeLen; + writeChunkIfNeeded(); + off += writeLen; + len -= writeLen; + writtenDataLength += writeLen; + doFlushOrWatchIfNeeded(); + } + } + + private void writeChunkIfNeeded() throws IOException { + if (currentBufferRemaining == 0) { + writeChunk(currentBuffer); + } + } + + private void doFlushOrWatchIfNeeded() throws IOException { + if (currentBufferRemaining == 0) { + if (bufferPool.getNumberOfUsedBuffers() % flushPeriod == 0) { + updateFlushLength(); + executePutBlock(false, false); + } + // Data in the bufferPool can not exceed streamBufferMaxSize + if (bufferPool.getNumberOfUsedBuffers() == bufferPool.getCapacity()) { + handleFullBuffer(); + } + } + } + + private void allocateNewBufferIfNeeded() { + if (currentBufferRemaining == 0) { + currentBuffer = bufferPool.allocateBuffer(config.getBufferIncrement()); + currentBufferRemaining = currentBuffer.remaining(); + } + } + + private void updateFlushLength() { + totalDataFlushedLength = writtenDataLength; + } + + private boolean isBufferPoolFull() { + return bufferPool.computeBufferData() == config.getStreamBufferMaxSize(); + } + + /** + * Will be called on the retryPath in case closedContainerException/ + * TimeoutException. + * @param len length of data to write + * @throws IOException if error occurred + */ + + // In this case, the data is already cached in the currentBuffer. + public void writeOnRetry(long len) throws IOException { + if (len == 0) { + return; + } + if (LOG.isDebugEnabled()) { + LOG.debug("Retrying write length {} for blockID {}", len, blockID); + } + Preconditions.checkArgument(len <= config.getStreamBufferMaxSize()); + int count = 0; + while (len > 0) { + ChunkBuffer buffer = bufferPool.getBuffer(count); + long writeLen = Math.min(buffer.position(), len); + if (!buffer.hasRemaining()) { + writeChunk(buffer); + } + len -= writeLen; + count++; + writtenDataLength += writeLen; + // we should not call isBufferFull/shouldFlush here. + // The buffer might already be full as whole data is already cached in + // the buffer. We should just validate + // if we wrote data of size streamBufferMaxSize/streamBufferFlushSize to + // call for handling full buffer/flush buffer condition. + if (writtenDataLength % config.getStreamBufferFlushSize() == 0) { + // reset the position to zero as now we will be reading the + // next buffer in the list + updateFlushLength(); + executePutBlock(false, false); + } + if (writtenDataLength == config.getStreamBufferMaxSize()) { + handleFullBuffer(); + } + } + } + + /** + * This is a blocking call. It will wait for the flush till the commit index + * at the head of the commitIndex2flushedDataMap gets replicated to all or + * majority. + * @throws IOException + */ + private void handleFullBuffer() throws IOException { + try { + checkOpen(); + if (!commitWatcher.getFutureMap().isEmpty()) { + waitOnFlushFutures(); + } + } catch (ExecutionException e) { + handleExecutionException(e); + } catch (InterruptedException ex) { + Thread.currentThread().interrupt(); + handleInterruptedException(ex, true); + } + watchForCommit(true); + } + + + // It may happen that once the exception is encountered , we still might + // have successfully flushed up to a certain index. Make sure the buffers + // only contain data which have not been sufficiently replicated + private void adjustBuffersOnException() { + commitWatcher.releaseBuffersOnException(); + refreshCurrentBuffer(bufferPool); + } + + /** + * calls watchForCommit API of the Ratis Client. For Standalone client, + * it is a no op. + * @param bufferFull flag indicating whether bufferFull condition is hit or + * its called as part flush/close + * @return minimum commit index replicated to all nodes + * @throws IOException IOException in case watch gets timed out + */ + private void watchForCommit(boolean bufferFull) throws IOException { + checkOpen(); + try { + XceiverClientReply reply = bufferFull ? + commitWatcher.watchOnFirstIndex() : commitWatcher.watchOnLastIndex(); + if (reply != null) { + List dnList = reply.getDatanodes(); + if (!dnList.isEmpty()) { + Pipeline pipe = xceiverClient.getPipeline(); + + LOG.warn("Failed to commit BlockId {} on {}. Failed nodes: {}", + blockID, pipe, dnList); + failedServers.addAll(dnList); + } + } + } catch (IOException ioe) { + setIoException(ioe); + throw getIoException(); + } + refreshCurrentBuffer(bufferPool); + + } + + /** + * @param close whether putBlock is happening as part of closing the stream + * @param force true if no data was written since most recent putBlock and + * stream is being closed + */ + private CompletableFuture executePutBlock(boolean close, + boolean force) throws IOException { + checkOpen(); + long flushPos = totalDataFlushedLength; + final List byteBufferList; + if (!force) { + Preconditions.checkNotNull(bufferList); + byteBufferList = bufferList; + bufferList = null; + Preconditions.checkNotNull(byteBufferList); + } else { + byteBufferList = null; + } + + try { + CompletableFuture.allOf(futures.toArray(EMPTY_FUTURE_ARRAY)).get(); + } catch (Exception e) { + LOG.warn("Failed to write all chunks through stream: " + e); + throw new IOException(e); + } + if (close) { + dataStreamCloseReply = out.closeAsync(); + } + + CompletableFuture flushFuture = null; + try { + BlockData blockData = containerBlockData.build(); + XceiverClientReply asyncReply = + putBlockAsync(xceiverClient, blockData, close, token); + CompletableFuture future = + asyncReply.getResponse(); + flushFuture = future.thenApplyAsync(e -> { + try { + validateResponse(e); + } catch (IOException sce) { + throw new CompletionException(sce); + } + // if the ioException is not set, putBlock is successful + if (getIoException() == null && !force) { + BlockID responseBlockID = BlockID.getFromProtobuf( + e.getPutBlock().getCommittedBlockLength().getBlockID()); + Preconditions.checkState(blockID.get().getContainerBlockID() + .equals(responseBlockID.getContainerBlockID())); + // updates the bcsId of the block + blockID.set(responseBlockID); + if (LOG.isDebugEnabled()) { + LOG.debug( + "Adding index " + asyncReply.getLogIndex() + " commitMap size " + + commitWatcher.getCommitInfoMapSize() + " flushLength " + + flushPos + " numBuffers " + byteBufferList.size() + + " blockID " + blockID + " bufferPool size" + bufferPool + .getSize() + " currentBufferIndex " + bufferPool + .getCurrentBufferIndex()); + } + // for standalone protocol, logIndex will always be 0. + commitWatcher + .updateCommitInfoMap(asyncReply.getLogIndex(), byteBufferList); + } + return e; + }, responseExecutor).exceptionally(e -> { + if (LOG.isDebugEnabled()) { + LOG.debug("putBlock failed for blockID {} with exception {}", + blockID, e.getLocalizedMessage()); + } + CompletionException ce = new CompletionException(e); + setIoException(ce); + throw ce; + }); + } catch (IOException | ExecutionException e) { + throw new IOException(EXCEPTION_MSG + e.toString(), e); + } catch (InterruptedException ex) { + Thread.currentThread().interrupt(); + handleInterruptedException(ex, false); + } + commitWatcher.getFutureMap().put(flushPos, flushFuture); + return flushFuture; + } + + @Override + public void flush() throws IOException { + if (xceiverClientFactory != null && xceiverClient != null + && bufferPool != null && bufferPool.getSize() > 0 + && (!config.isStreamBufferFlushDelay() || + writtenDataLength - totalDataFlushedLength + >= config.getStreamBufferSize())) { + try { + handleFlush(false); + } catch (ExecutionException e) { + // just set the exception here as well in order to maintain sanctity of + // ioException field + handleExecutionException(e); + } catch (InterruptedException ex) { + Thread.currentThread().interrupt(); + handleInterruptedException(ex, true); + } + } + } + + private void writeChunk(ChunkBuffer buffer) + throws IOException { + // This data in the buffer will be pushed to datanode and a reference will + // be added to the bufferList. Once putBlock gets executed, this list will + // be marked null. Hence, during first writeChunk call after every putBlock + // call or during the first call to writeChunk here, the list will be null. + + if (bufferList == null) { + bufferList = new ArrayList<>(); + } + bufferList.add(buffer); + writeChunkToContainer(buffer.duplicate(0, buffer.position())); + } + + /** + * @param close whether the flush is happening as part of closing the stream + */ + private void handleFlush(boolean close) + throws IOException, InterruptedException, ExecutionException { + checkOpen(); + // flush the last chunk data residing on the currentBuffer + if (totalDataFlushedLength < writtenDataLength) { + refreshCurrentBuffer(bufferPool); + Preconditions.checkArgument(currentBuffer.position() > 0); + if (currentBuffer.hasRemaining()) { + writeChunk(currentBuffer); + } + // This can be a partially filled chunk. Since we are flushing the buffer + // here, we just limit this buffer to the current position. So that next + // write will happen in new buffer + updateFlushLength(); + executePutBlock(close, false); + } else if (close) { + // forcing an "empty" putBlock if stream is being closed without new + // data since latest flush - we need to send the "EOF" flag + executePutBlock(true, true); + } + waitOnFlushFutures(); + watchForCommit(false); + // just check again if the exception is hit while waiting for the + // futures to ensure flush has indeed succeeded + + // irrespective of whether the commitIndex2flushedDataMap is empty + // or not, ensure there is no exception set + checkOpen(); + } + + @Override + public void close() throws IOException { + if (xceiverClientFactory != null && xceiverClient != null + && bufferPool != null && bufferPool.getSize() > 0) { + try { + handleFlush(true); + dataStreamCloseReply.get(); + } catch (ExecutionException e) { + handleExecutionException(e); + } catch (InterruptedException ex) { + Thread.currentThread().interrupt(); + handleInterruptedException(ex, true); + } finally { + cleanup(false); + } + // TODO: Turn the below buffer empty check on when Standalone pipeline + // is removed in the write path in tests + // Preconditions.checkArgument(buffer.position() == 0); + // bufferPool.checkBufferPoolEmpty(); + + } + } + + private void waitOnFlushFutures() + throws InterruptedException, ExecutionException { + CompletableFuture combinedFuture = CompletableFuture.allOf( + commitWatcher.getFutureMap().values().toArray( + new CompletableFuture[commitWatcher.getFutureMap().size()])); + // wait for all the transactions to complete + combinedFuture.get(); + } + + private void validateResponse( + ContainerProtos.ContainerCommandResponseProto responseProto) + throws IOException { + try { + // if the ioException is already set, it means a prev request has failed + // just throw the exception. The current operation will fail with the + // original error + IOException exception = getIoException(); + if (exception != null) { + throw exception; + } + ContainerProtocolCalls.validateContainerResponse(responseProto); + } catch (StorageContainerException sce) { + setIoException(sce); + throw sce; + } + } + + + private void setIoException(Exception e) { + IOException ioe = getIoException(); + if (ioe == null) { + IOException exception = new IOException(EXCEPTION_MSG + e.toString(), e); + ioException.compareAndSet(null, exception); + } else { + LOG.debug("Previous request had already failed with " + ioe.toString() + + " so subsequent request also encounters" + + " Storage Container Exception ", e); + } + } + + public void cleanup(boolean invalidateClient) { + if (xceiverClientFactory != null) { + xceiverClientFactory.releaseClient(xceiverClient, invalidateClient); + } + xceiverClientFactory = null; + xceiverClient = null; + commitWatcher.cleanup(); + if (bufferList != null) { + bufferList.clear(); + } + bufferList = null; + responseExecutor.shutdown(); + } + + /** + * Checks if the stream is open or exception has occurred. + * If not, throws an exception. + * + * @throws IOException if stream is closed + */ + private void checkOpen() throws IOException { + if (isClosed()) { + throw new IOException("BlockDataStreamOutput has been closed."); + } else if (getIoException() != null) { + adjustBuffersOnException(); + throw getIoException(); + } + } + + public boolean isClosed() { + return xceiverClient == null; + } + + private boolean needSync(long position) { + if (syncSize > 0) { + // TODO: or position >= fileLength + if (position - syncPosition >= syncSize) { + syncPosition = position; + return true; + } + } + return false; + } + + /** + * Writes buffered data as a new chunk to the container and saves chunk + * information to be used later in putKey call. + * + * @throws IOException if there is an I/O error while performing the call + * @throws OzoneChecksumException if there is an error while computing + * checksum + */ + private void writeChunkToContainer(ChunkBuffer chunk) throws IOException { + int effectiveChunkSize = chunk.remaining(); + final long offset = chunkOffset.getAndAdd(effectiveChunkSize); + final ByteString data = chunk.toByteString( + bufferPool.byteStringConversion()); + ChecksumData checksumData = checksum.computeChecksum(chunk); + ChunkInfo chunkInfo = ChunkInfo.newBuilder() + .setChunkName(blockID.get().getLocalID() + "_chunk_" + ++chunkIndex) + .setOffset(offset) + .setLen(effectiveChunkSize) + .setChecksumData(checksumData.getProtoBufMessage()) + .build(); + + if (LOG.isDebugEnabled()) { + LOG.debug("Writing chunk {} length {} at offset {}", + chunkInfo.getChunkName(), effectiveChunkSize, offset); + } + + CompletableFuture future = + (needSync(offset + effectiveChunkSize) ? + out.writeAsync(data.asReadOnlyByteBuffer(), StandardWriteOption.SYNC) : + out.writeAsync(data.asReadOnlyByteBuffer())) + .whenCompleteAsync((r, e) -> { + if (e != null || !r.isSuccess()) { + if (e == null) { + e = new IOException("result is not success"); + } + String msg = "Failed to write chunk " + chunkInfo.getChunkName() + + " " + "into block " + blockID; + LOG.debug("{}, exception: {}", msg, e.getLocalizedMessage()); + CompletionException ce = new CompletionException(msg, e); + setIoException(ce); + throw ce; + } + }, responseExecutor); + + futures.add(future); + containerBlockData.addChunks(chunkInfo); + } + + @VisibleForTesting + public void setXceiverClient(XceiverClientRatis xceiverClient) { + this.xceiverClient = xceiverClient; + } + + /** + * Handles InterruptedExecution. + * + * @param ex + * @param processExecutionException is optional, if passed as TRUE, then + * handle ExecutionException else skip it. + * @throws IOException + */ + private void handleInterruptedException(Exception ex, + boolean processExecutionException) + throws IOException { + LOG.error("Command execution was interrupted."); + if(processExecutionException) { + handleExecutionException(ex); + } else { + throw new IOException(EXCEPTION_MSG + ex.toString(), ex); + } + } + + /** + * Handles ExecutionException by adjusting buffers. + * @param ex + * @throws IOException + */ + private void handleExecutionException(Exception ex) throws IOException { + setIoException(ex); + adjustBuffersOnException(); + throw getIoException(); + } +} diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ByteBufStreamOutput.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ByteBufStreamOutput.java new file mode 100644 index 000000000000..7f40737b709f --- /dev/null +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ByteBufStreamOutput.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdds.scm.storage; + +import io.netty.buffer.ByteBuf; + +import java.io.Closeable; +import java.io.IOException; + +/** +* This interface is for writing an output stream of ByteBuffers. +* An ByteBufStreamOutput accepts Netty ByteBuf and sends them to some sink. +*/ +public interface ByteBufStreamOutput extends Closeable { + /** + * Try to write all the bytes in ByteBuf b to DataStream. + * + * @param b the data. + * @exception IOException if an I/O error occurs. + */ + void write(ByteBuf b) throws IOException; + + /** + * Try to write the [off:off + len) slice in ByteBuf b to DataStream. + * + * @param b the data. + * @param off the start offset in the data. + * @param len the number of bytes to write. + * @exception IOException if an I/O error occurs. + */ + default void write(ByteBuf b, int off, int len) throws IOException { + write(b.slice(off, len)); + } + + /** + * Flushes this DataStream output and forces any buffered output bytes + * to be written out. + * + * @exception IOException if an I/O error occurs. + */ + void flush() throws IOException; +} diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/OzoneBucket.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/OzoneBucket.java index 8a8cde9d1490..61cbb69081ce 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/OzoneBucket.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/OzoneBucket.java @@ -30,6 +30,7 @@ import org.apache.hadoop.hdds.client.ReplicationType; import org.apache.hadoop.hdds.scm.client.HddsClientUtils; import org.apache.hadoop.ozone.OmUtils; +import org.apache.hadoop.ozone.client.io.OzoneDataStreamOutput; import org.apache.hadoop.ozone.client.io.OzoneInputStream; import org.apache.hadoop.ozone.client.io.OzoneOutputStream; import org.apache.hadoop.ozone.client.protocol.ClientProtocol; @@ -528,6 +529,24 @@ public OzoneOutputStream createKey(String key, long size, .createKey(volumeName, name, key, size, replicationConfig, keyMetadata); } + /** + * Creates a new key in the bucket. + * + * @param key Name of the key to be created. + * @param size Size of the data the key will point to. + * @param replicationConfig Replication configuration. + * @return OzoneDataStreamOutput to which the data has to be written. + * @throws IOException + */ + public OzoneDataStreamOutput createStreamKey(String key, long size, + ReplicationConfig replicationConfig, + Map keyMetadata) + throws IOException { + return proxy + .createStreamKey(volumeName, name, key, size, replicationConfig, + keyMetadata); + } + /** * Reads an existing key from the bucket. * diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockDataStreamOutputEntry.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockDataStreamOutputEntry.java new file mode 100644 index 000000000000..695474260176 --- /dev/null +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockDataStreamOutputEntry.java @@ -0,0 +1,294 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.ozone.client.io; + +import com.google.common.annotations.VisibleForTesting; +import io.netty.buffer.ByteBuf; +import org.apache.hadoop.hdds.client.BlockID; +import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.scm.OzoneClientConfig; +import org.apache.hadoop.hdds.scm.XceiverClientFactory; +import org.apache.hadoop.hdds.scm.pipeline.Pipeline; +import org.apache.hadoop.hdds.scm.storage.BlockDataStreamOutput; +import org.apache.hadoop.hdds.scm.storage.BufferPool; +import org.apache.hadoop.hdds.scm.storage.ByteBufStreamOutput; +import org.apache.hadoop.hdds.security.token.OzoneBlockTokenIdentifier; +import org.apache.hadoop.security.token.Token; + +import java.io.IOException; +import java.util.Collection; +import java.util.Collections; + +/** + * Helper class used inside {@link BlockDataStreamOutput}. + * */ +public final class BlockDataStreamOutputEntry + implements ByteBufStreamOutput { + + private final OzoneClientConfig config; + private ByteBufStreamOutput byteBufStreamOutput; + private BlockID blockID; + private final String key; + private final XceiverClientFactory xceiverClientManager; + private final Pipeline pipeline; + // total number of bytes that should be written to this stream + private final long length; + // the current position of this stream 0 <= currentPosition < length + private long currentPosition; + private final Token token; + + private BufferPool bufferPool; + + @SuppressWarnings({"parameternumber", "squid:S00107"}) + private BlockDataStreamOutputEntry( + BlockID blockID, String key, + XceiverClientFactory xceiverClientManager, + Pipeline pipeline, + long length, + BufferPool bufferPool, + Token token, + OzoneClientConfig config + ) { + this.config = config; + this.byteBufStreamOutput = null; + this.blockID = blockID; + this.key = key; + this.xceiverClientManager = xceiverClientManager; + this.pipeline = pipeline; + this.token = token; + this.length = length; + this.currentPosition = 0; + this.bufferPool = bufferPool; + } + + long getLength() { + return length; + } + + Token getToken() { + return token; + } + + long getRemaining() { + return length - currentPosition; + } + + /** + * BlockDataStreamOutput is initialized in this function. This makes sure that + * xceiverClient initialization is not done during preallocation and only + * done when data is written. + * @throws IOException if xceiverClient initialization fails + */ + private void checkStream() throws IOException { + if (this.byteBufStreamOutput == null) { + this.byteBufStreamOutput = + new BlockDataStreamOutput(blockID, xceiverClientManager, + pipeline, bufferPool, config, token); + } + } + + @Override + public void write(ByteBuf b) throws IOException { + checkStream(); + final int len = b.readableBytes(); + byteBufStreamOutput.write(b); + this.currentPosition += len; + } + + @Override + public void flush() throws IOException { + if (this.byteBufStreamOutput != null) { + this.byteBufStreamOutput.flush(); + } + } + + @Override + public void close() throws IOException { + if (this.byteBufStreamOutput != null) { + this.byteBufStreamOutput.close(); + // after closing the chunkOutPutStream, blockId would have been + // reconstructed with updated bcsId + this.blockID = + ((BlockDataStreamOutput) byteBufStreamOutput).getBlockID(); + } + } + + boolean isClosed() { + if (byteBufStreamOutput != null) { + return ((BlockDataStreamOutput) byteBufStreamOutput).isClosed(); + } + return false; + } + + long getTotalAckDataLength() { + if (byteBufStreamOutput != null) { + BlockDataStreamOutput out = + (BlockDataStreamOutput) this.byteBufStreamOutput; + blockID = out.getBlockID(); + return out.getTotalAckDataLength(); + } else { + // For a pre allocated block for which no write has been initiated, + // the ByteBufStreamOutput will be null here. + // In such cases, the default blockCommitSequenceId will be 0 + return 0; + } + } + + Collection getFailedServers() { + if (byteBufStreamOutput != null) { + BlockDataStreamOutput out = + (BlockDataStreamOutput) this.byteBufStreamOutput; + return out.getFailedServers(); + } + return Collections.emptyList(); + } + + long getWrittenDataLength() { + if (byteBufStreamOutput != null) { + BlockDataStreamOutput out = + (BlockDataStreamOutput) this.byteBufStreamOutput; + return out.getWrittenDataLength(); + } else { + // For a pre allocated block for which no write has been initiated, + // the ByteBufStreamOutput will be null here. + // In such cases, the default blockCommitSequenceId will be 0 + return 0; + } + } + + void cleanup(boolean invalidateClient) throws IOException { + checkStream(); + BlockDataStreamOutput out = + (BlockDataStreamOutput) this.byteBufStreamOutput; + out.cleanup(invalidateClient); + + } + + void writeOnRetry(long len) throws IOException { + checkStream(); + BlockDataStreamOutput out = + (BlockDataStreamOutput) this.byteBufStreamOutput; + out.writeOnRetry(len); + this.currentPosition += len; + + } + + /** + * Builder class for BlockDataStreamOutputEntry. + * */ + public static class Builder { + + private BlockID blockID; + private String key; + private XceiverClientFactory xceiverClientManager; + private Pipeline pipeline; + private long length; + private BufferPool bufferPool; + private Token token; + private OzoneClientConfig config; + + public Builder setBlockID(BlockID bID) { + this.blockID = bID; + return this; + } + + public Builder setKey(String keys) { + this.key = keys; + return this; + } + + public Builder setXceiverClientManager( + XceiverClientFactory + xClientManager) { + this.xceiverClientManager = xClientManager; + return this; + } + + public Builder setPipeline(Pipeline ppln) { + this.pipeline = ppln; + return this; + } + + + public Builder setLength(long len) { + this.length = len; + return this; + } + + + public Builder setBufferPool(BufferPool pool) { + this.bufferPool = pool; + return this; + } + + public Builder setConfig(OzoneClientConfig clientConfig) { + this.config = clientConfig; + return this; + } + + public Builder setToken(Token bToken) { + this.token = bToken; + return this; + } + + public BlockDataStreamOutputEntry build() { + return new BlockDataStreamOutputEntry(blockID, + key, + xceiverClientManager, + pipeline, + length, + bufferPool, + token, config); + } + } + + @VisibleForTesting + public ByteBufStreamOutput getByteBufStreamOutput() { + return byteBufStreamOutput; + } + + public BlockID getBlockID() { + return blockID; + } + + public String getKey() { + return key; + } + + public XceiverClientFactory getXceiverClientManager() { + return xceiverClientManager; + } + + public Pipeline getPipeline() { + return pipeline; + } + + public long getCurrentPosition() { + return currentPosition; + } + + public BufferPool getBufferPool() { + return bufferPool; + } + + public void setCurrentPosition(long curPosition) { + this.currentPosition = curPosition; + } +} + + diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockDataStreamOutputEntryPool.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockDataStreamOutputEntryPool.java new file mode 100644 index 000000000000..94c505f2af35 --- /dev/null +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockDataStreamOutputEntryPool.java @@ -0,0 +1,324 @@ + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.ozone.client.io; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import org.apache.hadoop.hdds.client.ReplicationConfig; +import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdds.scm.ByteStringConversion; +import org.apache.hadoop.hdds.scm.OzoneClientConfig; +import org.apache.hadoop.hdds.scm.XceiverClientFactory; +import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList; +import org.apache.hadoop.hdds.scm.pipeline.PipelineID; +import org.apache.hadoop.hdds.scm.storage.BufferPool; +import org.apache.hadoop.ozone.om.helpers.OmKeyArgs; +import org.apache.hadoop.ozone.om.helpers.OmKeyInfo; +import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo; +import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfoGroup; +import org.apache.hadoop.ozone.om.helpers.OmMultipartCommitUploadPartInfo; +import org.apache.hadoop.ozone.om.protocol.OzoneManagerProtocol; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.ListIterator; + +/** + * This class manages the stream entries list and handles block allocation + * from OzoneManager. + */ +public class BlockDataStreamOutputEntryPool { + + public static final Logger LOG = + LoggerFactory.getLogger(BlockDataStreamOutputEntryPool.class); + + private final List streamEntries; + private final OzoneClientConfig config; + private int currentStreamIndex; + private final OzoneManagerProtocol omClient; + private final OmKeyArgs keyArgs; + private final XceiverClientFactory xceiverClientFactory; + private final String requestID; + private final BufferPool bufferPool; + private OmMultipartCommitUploadPartInfo commitUploadPartInfo; + private final long openID; + private final ExcludeList excludeList; + + @SuppressWarnings({"parameternumber", "squid:S00107"}) + public BlockDataStreamOutputEntryPool( + OzoneClientConfig config, + OzoneManagerProtocol omClient, + String requestId, ReplicationConfig replicationConfig, + String uploadID, int partNumber, + boolean isMultipart, OmKeyInfo info, + boolean unsafeByteBufferConversion, + XceiverClientFactory xceiverClientFactory, long openID + ) { + this.config = config; + this.xceiverClientFactory = xceiverClientFactory; + streamEntries = new ArrayList<>(); + currentStreamIndex = 0; + this.omClient = omClient; + this.keyArgs = new OmKeyArgs.Builder().setVolumeName(info.getVolumeName()) + .setBucketName(info.getBucketName()).setKeyName(info.getKeyName()) + .setReplicationConfig(replicationConfig).setDataSize(info.getDataSize()) + .setIsMultipartKey(isMultipart).setMultipartUploadID(uploadID) + .setMultipartUploadPartNumber(partNumber).build(); + this.requestID = requestId; + this.openID = openID; + this.excludeList = new ExcludeList(); + + this.bufferPool = + new BufferPool(config.getStreamBufferSize(), + (int) (config.getStreamBufferMaxSize() / config + .getStreamBufferSize()), + ByteStringConversion + .createByteBufferConversion(unsafeByteBufferConversion)); + } + + /** + * A constructor for testing purpose only. + * + * @see KeyDataStreamOutput#KeyDataStreamOutput() + */ + @VisibleForTesting + BlockDataStreamOutputEntryPool() { + streamEntries = new ArrayList<>(); + omClient = null; + keyArgs = null; + xceiverClientFactory = null; + config = + new OzoneConfiguration().getObject(OzoneClientConfig.class); + config.setStreamBufferSize(0); + config.setStreamBufferMaxSize(0); + config.setStreamBufferFlushSize(0); + config.setStreamBufferFlushDelay(false); + requestID = null; + int chunkSize = 0; + bufferPool = new BufferPool(chunkSize, 1); + + currentStreamIndex = 0; + openID = -1; + excludeList = new ExcludeList(); + } + + /** + * When a key is opened, it is possible that there are some blocks already + * allocated to it for this open session. In this case, to make use of these + * blocks, we need to add these blocks to stream entries. But, a key's version + * also includes blocks from previous versions, we need to avoid adding these + * old blocks to stream entries, because these old blocks should not be picked + * for write. To do this, the following method checks that, only those + * blocks created in this particular open version are added to stream entries. + * + * @param version the set of blocks that are pre-allocated. + * @param openVersion the version corresponding to the pre-allocation. + * @throws IOException + */ + public void addPreallocateBlocks(OmKeyLocationInfoGroup version, + long openVersion) throws IOException { + // server may return any number of blocks, (0 to any) + // only the blocks allocated in this open session (block createVersion + // equals to open session version) + for (OmKeyLocationInfo subKeyInfo : version.getLocationList(openVersion)) { + addKeyLocationInfo(subKeyInfo); + } + } + + private void addKeyLocationInfo(OmKeyLocationInfo subKeyInfo) { + Preconditions.checkNotNull(subKeyInfo.getPipeline()); + BlockDataStreamOutputEntry.Builder builder = + new BlockDataStreamOutputEntry.Builder() + .setBlockID(subKeyInfo.getBlockID()) + .setKey(keyArgs.getKeyName()) + .setXceiverClientManager(xceiverClientFactory) + .setPipeline(subKeyInfo.getPipeline()) + .setConfig(config) + .setLength(subKeyInfo.getLength()) + .setBufferPool(bufferPool) + .setToken(subKeyInfo.getToken()); + streamEntries.add(builder.build()); + } + + public List getLocationInfoList() { + List locationInfoList = new ArrayList<>(); + for (BlockDataStreamOutputEntry streamEntry : streamEntries) { + long length = streamEntry.getCurrentPosition(); + + // Commit only those blocks to OzoneManager which are not empty + if (length != 0) { + OmKeyLocationInfo info = + new OmKeyLocationInfo.Builder().setBlockID(streamEntry.getBlockID()) + .setLength(streamEntry.getCurrentPosition()).setOffset(0) + .setToken(streamEntry.getToken()) + .setPipeline(streamEntry.getPipeline()).build(); + locationInfoList.add(info); + } + if (LOG.isDebugEnabled()) { + LOG.debug( + "block written " + streamEntry.getBlockID() + ", length " + length + + " bcsID " + streamEntry.getBlockID() + .getBlockCommitSequenceId()); + } + } + return locationInfoList; + } + + /** + * Discards the subsequent pre allocated blocks and removes the streamEntries + * from the streamEntries list for the container which is closed. + * @param containerID id of the closed container + * @param pipelineId id of the associated pipeline + */ + void discardPreallocatedBlocks(long containerID, PipelineID pipelineId) { + // currentStreamIndex < streamEntries.size() signifies that, there are still + // pre allocated blocks available. + + // This will be called only to discard the next subsequent unused blocks + // in the streamEntryList. + if (currentStreamIndex + 1 < streamEntries.size()) { + ListIterator streamEntryIterator = + streamEntries.listIterator(currentStreamIndex + 1); + while (streamEntryIterator.hasNext()) { + BlockDataStreamOutputEntry streamEntry = streamEntryIterator.next(); + Preconditions.checkArgument(streamEntry.getCurrentPosition() == 0); + if ((streamEntry.getPipeline().getId().equals(pipelineId)) || + (containerID != -1 && + streamEntry.getBlockID().getContainerID() == containerID)) { + streamEntryIterator.remove(); + } + } + } + } + + List getStreamEntries() { + return streamEntries; + } + + XceiverClientFactory getXceiverClientFactory() { + return xceiverClientFactory; + } + + String getKeyName() { + return keyArgs.getKeyName(); + } + + long getKeyLength() { + return streamEntries.stream().mapToLong( + BlockDataStreamOutputEntry::getCurrentPosition).sum(); + } + /** + * Contact OM to get a new block. Set the new block with the index (e.g. + * first block has index = 0, second has index = 1 etc.) + * + * The returned block is made to new BlockDataStreamOutputEntry to write. + * + * @throws IOException + */ + private void allocateNewBlock() throws IOException { + if (!excludeList.isEmpty()) { + LOG.debug("Allocating block with {}", excludeList); + } + OmKeyLocationInfo subKeyInfo = + omClient.allocateBlock(keyArgs, openID, excludeList); + addKeyLocationInfo(subKeyInfo); + } + + + void commitKey(long offset) throws IOException { + if (keyArgs != null) { + // in test, this could be null + long length = getKeyLength(); + Preconditions.checkArgument(offset == length); + keyArgs.setDataSize(length); + keyArgs.setLocationInfoList(getLocationInfoList()); + // When the key is multipart upload part file upload, we should not + // commit the key, as this is not an actual key, this is a just a + // partial key of a large file. + if (keyArgs.getIsMultipartKey()) { + commitUploadPartInfo = + omClient.commitMultipartUploadPart(keyArgs, openID); + } else { + omClient.commitKey(keyArgs, openID); + } + } else { + LOG.warn("Closing KeyDataStreamOutput, but key args is null"); + } + } + + public BlockDataStreamOutputEntry getCurrentStreamEntry() { + if (streamEntries.isEmpty() || streamEntries.size() <= currentStreamIndex) { + return null; + } else { + return streamEntries.get(currentStreamIndex); + } + } + + BlockDataStreamOutputEntry allocateBlockIfNeeded() throws IOException { + BlockDataStreamOutputEntry streamEntry = getCurrentStreamEntry(); + if (streamEntry != null && streamEntry.isClosed()) { + // a stream entry gets closed either by : + // a. If the stream gets full + // b. it has encountered an exception + currentStreamIndex++; + } + if (streamEntries.size() <= currentStreamIndex) { + Preconditions.checkNotNull(omClient); + // allocate a new block, if a exception happens, log an error and + // throw exception to the caller directly, and the write fails. + allocateNewBlock(); + } + // in theory, this condition should never violate due the check above + // still do a sanity check. + Preconditions.checkArgument(currentStreamIndex < streamEntries.size()); + return streamEntries.get(currentStreamIndex); + } + + long computeBufferData() { + return bufferPool.computeBufferData(); + } + + void cleanup() { + if (excludeList != null) { + excludeList.clear(); + } + if (bufferPool != null) { + bufferPool.clearBufferPool(); + } + + if (streamEntries != null) { + streamEntries.clear(); + } + } + + public OmMultipartCommitUploadPartInfo getCommitUploadPartInfo() { + return commitUploadPartInfo; + } + + public ExcludeList getExcludeList() { + return excludeList; + } + + boolean isEmpty() { + return streamEntries.isEmpty(); + } +} diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyDataStreamOutput.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyDataStreamOutput.java new file mode 100644 index 000000000000..a9be11667c8d --- /dev/null +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyDataStreamOutput.java @@ -0,0 +1,629 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.ozone.client.io; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import io.netty.buffer.ByteBuf; +import org.apache.hadoop.fs.FSExceptionMessages; +import org.apache.hadoop.fs.FileEncryptionInfo; +import org.apache.hadoop.hdds.client.ReplicationConfig; +import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.scm.OzoneClientConfig; +import org.apache.hadoop.hdds.scm.XceiverClientFactory; +import org.apache.hadoop.hdds.scm.client.HddsClientUtils; +import org.apache.hadoop.hdds.scm.container.ContainerID; +import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList; +import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException; +import org.apache.hadoop.hdds.scm.pipeline.Pipeline; +import org.apache.hadoop.hdds.scm.pipeline.PipelineID; +import org.apache.hadoop.hdds.scm.storage.ByteBufStreamOutput; +import org.apache.hadoop.io.retry.RetryPolicies; +import org.apache.hadoop.io.retry.RetryPolicy; +import org.apache.hadoop.ozone.om.helpers.OmKeyInfo; +import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo; +import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfoGroup; +import org.apache.hadoop.ozone.om.helpers.OmMultipartCommitUploadPartInfo; +import org.apache.hadoop.ozone.om.helpers.OpenKeySession; +import org.apache.hadoop.ozone.om.protocol.OzoneManagerProtocol; +import org.apache.ratis.protocol.exceptions.AlreadyClosedException; +import org.apache.ratis.protocol.exceptions.RaftRetryFailureException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.io.InterruptedIOException; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.function.Function; +import java.util.stream.Collectors; + +/** + * Maintaining a list of BlockInputStream. Write based on offset. + * + * Note that this may write to multiple containers in one write call. In case + * that first container succeeded but later ones failed, the succeeded writes + * are not rolled back. + * + * TODO : currently not support multi-thread access. + */ +public class KeyDataStreamOutput implements ByteBufStreamOutput { + + private OzoneClientConfig config; + + /** + * Defines stream action while calling handleFlushOrClose. + */ + enum StreamAction { + FLUSH, CLOSE, FULL + } + + public static final Logger LOG = + LoggerFactory.getLogger(KeyDataStreamOutput.class); + + private boolean closed; + private FileEncryptionInfo feInfo; + private final Map, RetryPolicy> retryPolicyMap; + private int retryCount; + // how much of data is actually written yet to underlying stream + private long offset; + // how much data has been ingested into the stream + private long writeOffset; + // whether an exception is encountered while write and whole write could + // not succeed + private boolean isException; + private final BlockDataStreamOutputEntryPool blockDataStreamOutputEntryPool; + + private long clientID; + + /** + * A constructor for testing purpose only. + */ + @VisibleForTesting + public KeyDataStreamOutput() { + closed = false; + this.retryPolicyMap = HddsClientUtils.getExceptionList() + .stream() + .collect(Collectors.toMap(Function.identity(), + e -> RetryPolicies.TRY_ONCE_THEN_FAIL)); + retryCount = 0; + offset = 0; + blockDataStreamOutputEntryPool = new BlockDataStreamOutputEntryPool(); + } + + @VisibleForTesting + public List getStreamEntries() { + return blockDataStreamOutputEntryPool.getStreamEntries(); + } + + @VisibleForTesting + public XceiverClientFactory getXceiverClientFactory() { + return blockDataStreamOutputEntryPool.getXceiverClientFactory(); + } + + @VisibleForTesting + public List getLocationInfoList() { + return blockDataStreamOutputEntryPool.getLocationInfoList(); + } + + @VisibleForTesting + public int getRetryCount() { + return retryCount; + } + + @VisibleForTesting + public long getClientID() { + return clientID; + } + + @SuppressWarnings({"parameternumber", "squid:S00107"}) + public KeyDataStreamOutput( + OzoneClientConfig config, + OpenKeySession handler, + XceiverClientFactory xceiverClientManager, + OzoneManagerProtocol omClient, int chunkSize, + String requestId, ReplicationConfig replicationConfig, + String uploadID, int partNumber, boolean isMultipart, + boolean unsafeByteBufferConversion + ) { + this.config = config; + OmKeyInfo info = handler.getKeyInfo(); + blockDataStreamOutputEntryPool = + new BlockDataStreamOutputEntryPool( + config, + omClient, + requestId, replicationConfig, + uploadID, partNumber, + isMultipart, info, + unsafeByteBufferConversion, + xceiverClientManager, + handler.getId()); + + // Retrieve the file encryption key info, null if file is not in + // encrypted bucket. + this.feInfo = info.getFileEncryptionInfo(); + this.retryPolicyMap = HddsClientUtils.getRetryPolicyByException( + config.getMaxRetryCount(), config.getRetryInterval()); + this.retryCount = 0; + this.isException = false; + this.writeOffset = 0; + this.clientID = handler.getId(); + } + + /** + * When a key is opened, it is possible that there are some blocks already + * allocated to it for this open session. In this case, to make use of these + * blocks, we need to add these blocks to stream entries. But, a key's version + * also includes blocks from previous versions, we need to avoid adding these + * old blocks to stream entries, because these old blocks should not be picked + * for write. To do this, the following method checks that, only those + * blocks created in this particular open version are added to stream entries. + * + * @param version the set of blocks that are pre-allocated. + * @param openVersion the version corresponding to the pre-allocation. + * @throws IOException + */ + public void addPreallocateBlocks(OmKeyLocationInfoGroup version, + long openVersion) throws IOException { + blockDataStreamOutputEntryPool.addPreallocateBlocks(version, openVersion); + } + + @Override + public void write(ByteBuf b) throws IOException { + checkNotClosed(); + if (b == null) { + throw new NullPointerException(); + } + final int len = b.readableBytes(); + handleWrite(b, b.readerIndex(), len, false); + writeOffset += len; + } + + private void handleWrite(ByteBuf b, int off, long len, boolean retry) + throws IOException { + while (len > 0) { + try { + BlockDataStreamOutputEntry current = + blockDataStreamOutputEntryPool.allocateBlockIfNeeded(); + // length(len) will be in int range if the call is happening through + // write API of blockDataStreamOutput. Length can be in long range + // if it comes via Exception path. + int expectedWriteLen = Math.min((int) len, + (int) current.getRemaining()); + long currentPos = current.getWrittenDataLength(); + // writeLen will be updated based on whether the write was succeeded + // or if it sees an exception, how much the actual write was + // acknowledged. + int writtenLength = + writeToDataStreamOutput(current, retry, len, b, + expectedWriteLen, off, currentPos); + if (current.getRemaining() <= 0) { + // since the current block is already written close the stream. + handleFlushOrClose(StreamAction.FULL); + } + len -= writtenLength; + off += writtenLength; + } catch (Exception e) { + markStreamClosed(); + throw new IOException(e); + } + } + } + + private int writeToDataStreamOutput(BlockDataStreamOutputEntry current, + boolean retry, long len, ByteBuf b, int writeLen, int off, + long currentPos) throws IOException { + try { + if (retry) { + current.writeOnRetry(len); + } else { + current.write(b, off, writeLen); + offset += writeLen; + } + } catch (IOException ioe) { + // for the current iteration, totalDataWritten - currentPos gives the + // amount of data already written to the buffer + + // In the retryPath, the total data to be written will always be equal + // to or less than the max length of the buffer allocated. + // The len specified here is the combined sum of the data length of + // the buffers + Preconditions.checkState(!retry || len <= config + .getStreamBufferMaxSize()); + int dataWritten = (int) (current.getWrittenDataLength() - currentPos); + writeLen = retry ? (int) len : dataWritten; + // In retry path, the data written is already accounted in offset. + if (!retry) { + offset += writeLen; + } + LOG.debug("writeLen {}, total len {}", writeLen, len); + handleException(current, ioe); + } + return writeLen; + } + + /** + * It performs following actions : + * a. Updates the committed length at datanode for the current stream in + * datanode. + * b. Reads the data from the underlying buffer and writes it the next stream. + * + * @param streamEntry StreamEntry + * @param exception actual exception that occurred + * @throws IOException Throws IOException if Write fails + */ + private void handleException(BlockDataStreamOutputEntry streamEntry, + IOException exception) throws IOException { + Throwable t = HddsClientUtils.checkForException(exception); + Preconditions.checkNotNull(t); + boolean retryFailure = checkForRetryFailure(t); + boolean containerExclusionException = false; + if (!retryFailure) { + containerExclusionException = checkIfContainerToExclude(t); + } + Pipeline pipeline = streamEntry.getPipeline(); + PipelineID pipelineId = pipeline.getId(); + long totalSuccessfulFlushedData = streamEntry.getTotalAckDataLength(); + //set the correct length for the current stream + streamEntry.setCurrentPosition(totalSuccessfulFlushedData); + long bufferedDataLen = blockDataStreamOutputEntryPool.computeBufferData(); + if (containerExclusionException) { + LOG.debug( + "Encountered exception {}. The last committed block length is {}, " + + "uncommitted data length is {} retry count {}", exception, + totalSuccessfulFlushedData, bufferedDataLen, retryCount); + } else { + LOG.warn( + "Encountered exception {} on the pipeline {}. " + + "The last committed block length is {}, " + + "uncommitted data length is {} retry count {}", exception, + pipeline, totalSuccessfulFlushedData, bufferedDataLen, retryCount); + } + Preconditions.checkArgument( + bufferedDataLen <= config.getStreamBufferMaxSize()); + Preconditions.checkArgument( + offset - blockDataStreamOutputEntryPool.getKeyLength() == + bufferedDataLen); + long containerId = streamEntry.getBlockID().getContainerID(); + Collection failedServers = streamEntry.getFailedServers(); + Preconditions.checkNotNull(failedServers); + ExcludeList excludeList = blockDataStreamOutputEntryPool.getExcludeList(); + if (!failedServers.isEmpty()) { + excludeList.addDatanodes(failedServers); + } + + // if the container needs to be excluded , add the container to the + // exclusion list , otherwise add the pipeline to the exclusion list + if (containerExclusionException) { + excludeList.addConatinerId(ContainerID.valueOf(containerId)); + } else { + excludeList.addPipeline(pipelineId); + } + // just clean up the current stream. + streamEntry.cleanup(retryFailure); + + // discard all subsequent blocks the containers and pipelines which + // are in the exclude list so that, the very next retry should never + // write data on the closed container/pipeline + if (containerExclusionException) { + // discard subsequent pre allocated blocks from the streamEntries list + // from the closed container + blockDataStreamOutputEntryPool + .discardPreallocatedBlocks(streamEntry.getBlockID().getContainerID(), + null); + } else { + // In case there is timeoutException or Watch for commit happening over + // majority or the client connection failure to the leader in the + // pipeline, just discard all the pre allocated blocks on this pipeline. + // Next block allocation will happen with excluding this specific pipeline + // This will ensure if 2 way commit happens , it cannot span over multiple + // blocks + blockDataStreamOutputEntryPool + .discardPreallocatedBlocks(-1, pipelineId); + } + if (bufferedDataLen > 0) { + // If the data is still cached in the underlying stream, we need to + // allocate new block and write this data in the datanode. + handleRetry(exception, bufferedDataLen); + // reset the retryCount after handling the exception + retryCount = 0; + } + } + + private void markStreamClosed() { + blockDataStreamOutputEntryPool.cleanup(); + closed = true; + } + + private void handleRetry(IOException exception, long len) throws IOException { + RetryPolicy retryPolicy = retryPolicyMap + .get(HddsClientUtils.checkForException(exception).getClass()); + if (retryPolicy == null) { + retryPolicy = retryPolicyMap.get(Exception.class); + } + RetryPolicy.RetryAction action = null; + try { + action = retryPolicy.shouldRetry(exception, retryCount, 0, true); + } catch (Exception e) { + setExceptionAndThrow(new IOException(e)); + } + if (action.action == RetryPolicy.RetryAction.RetryDecision.FAIL) { + String msg = ""; + if (action.reason != null) { + msg = "Retry request failed. " + action.reason; + LOG.error(msg, exception); + } + setExceptionAndThrow(new IOException(msg, exception)); + } + + // Throw the exception if the thread is interrupted + if (Thread.currentThread().isInterrupted()) { + LOG.warn("Interrupted while trying for retry"); + setExceptionAndThrow(exception); + } + Preconditions.checkArgument( + action.action == RetryPolicy.RetryAction.RetryDecision.RETRY); + if (action.delayMillis > 0) { + try { + Thread.sleep(action.delayMillis); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + IOException ioe = (IOException) new InterruptedIOException( + "Interrupted: action=" + action + ", retry policy=" + retryPolicy) + .initCause(e); + setExceptionAndThrow(ioe); + } + } + retryCount++; + if (LOG.isTraceEnabled()) { + LOG.trace("Retrying Write request. Already tried {} time(s); " + + "retry policy is {} ", retryCount, retryPolicy); + } + handleWrite(null, 0, len, true); + } + + private void setExceptionAndThrow(IOException ioe) throws IOException { + isException = true; + throw ioe; + } + + /** + * Checks if the provided exception signifies retry failure in ratis client. + * In case of retry failure, ratis client throws RaftRetryFailureException + * and all succeeding operations are failed with AlreadyClosedException. + */ + private boolean checkForRetryFailure(Throwable t) { + return t instanceof RaftRetryFailureException + || t instanceof AlreadyClosedException; + } + + // Every container specific exception from datatnode will be seen as + // StorageContainerException + private boolean checkIfContainerToExclude(Throwable t) { + return t instanceof StorageContainerException; + } + + @Override + public void flush() throws IOException { + checkNotClosed(); + handleFlushOrClose(StreamAction.FLUSH); + } + + /** + * Close or Flush the latest outputStream depending upon the action. + * This function gets called when while write is going on, the current stream + * gets full or explicit flush or close request is made by client. when the + * stream gets full and we try to close the stream , we might end up hitting + * an exception in the exception handling path, we write the data residing in + * in the buffer pool to a new Block. In cases, as such, when the data gets + * written to new stream , it will be at max half full. In such cases, we + * should just write the data and not close the stream as the block won't be + * completely full. + * + * @param op Flag which decides whether to call close or flush on the + * outputStream. + * @throws IOException In case, flush or close fails with exception. + */ + @SuppressWarnings("squid:S1141") + private void handleFlushOrClose(StreamAction op) throws IOException { + if (!blockDataStreamOutputEntryPool.isEmpty()) { + while (true) { + try { + BlockDataStreamOutputEntry entry = + blockDataStreamOutputEntryPool.getCurrentStreamEntry(); + if (entry != null) { + try { + handleStreamAction(entry, op); + } catch (IOException ioe) { + handleException(entry, ioe); + continue; + } + } + return; + } catch (Exception e) { + markStreamClosed(); + throw e; + } + } + } + } + + private void handleStreamAction(BlockDataStreamOutputEntry entry, + StreamAction op) throws IOException { + Collection failedServers = entry.getFailedServers(); + // failed servers can be null in case there is no data written in + // the stream + if (!failedServers.isEmpty()) { + blockDataStreamOutputEntryPool.getExcludeList().addDatanodes( + failedServers); + } + switch (op) { + case CLOSE: + entry.close(); + break; + case FULL: + if (entry.getRemaining() == 0) { + entry.close(); + } + break; + case FLUSH: + entry.flush(); + break; + default: + throw new IOException("Invalid Operation"); + } + } + + /** + * Commit the key to OM, this will add the blocks as the new key blocks. + * + * @throws IOException + */ + @Override + public void close() throws IOException { + if (closed) { + return; + } + closed = true; + try { + handleFlushOrClose(StreamAction.CLOSE); + if (!isException) { + Preconditions.checkArgument(writeOffset == offset); + } + blockDataStreamOutputEntryPool.commitKey(offset); + } finally { + blockDataStreamOutputEntryPool.cleanup(); + } + } + + public OmMultipartCommitUploadPartInfo getCommitUploadPartInfo() { + return blockDataStreamOutputEntryPool.getCommitUploadPartInfo(); + } + + public FileEncryptionInfo getFileEncryptionInfo() { + return feInfo; + } + + @VisibleForTesting + public ExcludeList getExcludeList() { + return blockDataStreamOutputEntryPool.getExcludeList(); + } + + /** + * Builder class of KeyDataStreamOutput. + */ + public static class Builder { + private OpenKeySession openHandler; + private XceiverClientFactory xceiverManager; + private OzoneManagerProtocol omClient; + private int chunkSize; + private String requestID; + private String multipartUploadID; + private int multipartNumber; + private boolean isMultipartKey; + private boolean unsafeByteBufferConversion; + private OzoneClientConfig clientConfig; + private ReplicationConfig replicationConfig; + + public Builder setMultipartUploadID(String uploadID) { + this.multipartUploadID = uploadID; + return this; + } + + public Builder setMultipartNumber(int partNumber) { + this.multipartNumber = partNumber; + return this; + } + + public Builder setHandler(OpenKeySession handler) { + this.openHandler = handler; + return this; + } + + public Builder setXceiverClientManager(XceiverClientFactory manager) { + this.xceiverManager = manager; + return this; + } + + public Builder setOmClient(OzoneManagerProtocol client) { + this.omClient = client; + return this; + } + + public Builder setChunkSize(int size) { + this.chunkSize = size; + return this; + } + + public Builder setRequestID(String id) { + this.requestID = id; + return this; + } + + public Builder setIsMultipartKey(boolean isMultipart) { + this.isMultipartKey = isMultipart; + return this; + } + + public Builder setConfig(OzoneClientConfig config) { + this.clientConfig = config; + return this; + } + + public Builder enableUnsafeByteBufferConversion(boolean enabled) { + this.unsafeByteBufferConversion = enabled; + return this; + } + + + public Builder setReplicationConfig(ReplicationConfig replConfig) { + this.replicationConfig = replConfig; + return this; + } + + public KeyDataStreamOutput build() { + return new KeyDataStreamOutput( + clientConfig, + openHandler, + xceiverManager, + omClient, + chunkSize, + requestID, + replicationConfig, + multipartUploadID, + multipartNumber, + isMultipartKey, + unsafeByteBufferConversion); + } + + } + + /** + * Verify that the output stream is open. Non blocking; this gives + * the last state of the volatile {@link #closed} field. + * @throws IOException if the connection is closed. + */ + private void checkNotClosed() throws IOException { + if (closed) { + throw new IOException( + ": " + FSExceptionMessages.STREAM_IS_CLOSED + " Key: " + + blockDataStreamOutputEntryPool.getKeyName()); + } + } +} diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/OzoneDataStreamOutput.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/OzoneDataStreamOutput.java new file mode 100644 index 000000000000..378b86872e43 --- /dev/null +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/OzoneDataStreamOutput.java @@ -0,0 +1,70 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.hadoop.ozone.client.io; + +import io.netty.buffer.ByteBuf; +import org.apache.hadoop.hdds.scm.storage.ByteBufStreamOutput; +import org.apache.hadoop.ozone.om.helpers.OmMultipartCommitUploadPartInfo; + +import java.io.IOException; + +/** + * OzoneDataStreamOutput is used to write data into Ozone. + * It uses SCM's {@link KeyDataStreamOutput} for writing the data. + */ +public class OzoneDataStreamOutput implements ByteBufStreamOutput { + + private final ByteBufStreamOutput byteBufStreamOutput; + + /** + * Constructs OzoneDataStreamOutput with KeyDataStreamOutput. + * + * @param byteBufStreamOutput + */ + public OzoneDataStreamOutput(ByteBufStreamOutput byteBufStreamOutput) { + this.byteBufStreamOutput = byteBufStreamOutput; + } + + @Override + public void write(ByteBuf b) throws IOException { + byteBufStreamOutput.write(b); + } + + @Override + public synchronized void flush() throws IOException { + byteBufStreamOutput.flush(); + } + + @Override + public synchronized void close() throws IOException { + //commitKey can be done here, if needed. + byteBufStreamOutput.close(); + } + + public OmMultipartCommitUploadPartInfo getCommitUploadPartInfo() { + if (byteBufStreamOutput instanceof KeyDataStreamOutput) { + return ((KeyDataStreamOutput) + byteBufStreamOutput).getCommitUploadPartInfo(); + } + // Otherwise return null. + return null; + } + + public ByteBufStreamOutput getByteBufStreamOutput() { + return byteBufStreamOutput; + } +} diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/protocol/ClientProtocol.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/protocol/ClientProtocol.java index c4f98e8a633a..0b4301106a51 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/protocol/ClientProtocol.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/protocol/ClientProtocol.java @@ -38,6 +38,7 @@ import org.apache.hadoop.ozone.client.OzoneMultipartUploadPartListParts; import org.apache.hadoop.ozone.client.OzoneVolume; import org.apache.hadoop.ozone.client.VolumeArgs; +import org.apache.hadoop.ozone.client.io.OzoneDataStreamOutput; import org.apache.hadoop.ozone.client.io.OzoneInputStream; import org.apache.hadoop.ozone.client.io.OzoneOutputStream; import org.apache.hadoop.ozone.om.OMConfigKeys; @@ -291,6 +292,20 @@ OzoneOutputStream createKey(String volumeName, String bucketName, Map metadata) throws IOException; + /** + * Writes a key in an existing bucket. + * @param volumeName Name of the Volume + * @param bucketName Name of the Bucket + * @param keyName Name of the Key + * @param size Size of the data + * @param metadata custom key value metadata + * @return {@link OzoneDataStreamOutput} + * + */ + OzoneDataStreamOutput createStreamKey(String volumeName, String bucketName, + String keyName, long size, ReplicationConfig replicationConfig, + Map metadata) + throws IOException; /** * Reads a key from an existing bucket. diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java index bdb02dddc2a6..665a6a5adcf7 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java @@ -72,11 +72,13 @@ import org.apache.hadoop.ozone.client.OzoneMultipartUploadPartListParts; import org.apache.hadoop.ozone.client.OzoneVolume; import org.apache.hadoop.ozone.client.VolumeArgs; +import org.apache.hadoop.ozone.client.io.KeyDataStreamOutput; import org.apache.hadoop.ozone.client.io.KeyInputStream; import org.apache.hadoop.ozone.client.io.KeyOutputStream; import org.apache.hadoop.ozone.client.io.LengthInputStream; import org.apache.hadoop.ozone.client.io.MultipartCryptoKeyInputStream; import org.apache.hadoop.ozone.client.io.OzoneCryptoInputStream; +import org.apache.hadoop.ozone.client.io.OzoneDataStreamOutput; import org.apache.hadoop.ozone.client.io.OzoneInputStream; import org.apache.hadoop.ozone.client.io.OzoneOutputStream; import org.apache.hadoop.ozone.client.protocol.ClientProtocol; @@ -801,6 +803,48 @@ public OzoneOutputStream createKey( return createOutputStream(openKey, requestId, replicationConfig); } + @Override + public OzoneDataStreamOutput createStreamKey( + String volumeName, String bucketName, String keyName, long size, + ReplicationConfig replicationConfig, + Map metadata) + throws IOException { + verifyVolumeName(volumeName); + verifyBucketName(bucketName); + if (checkKeyNameEnabled) { + HddsClientUtils.verifyKeyName(keyName); + } + HddsClientUtils.checkNotNull(keyName, replicationConfig); + String requestId = UUID.randomUUID().toString(); + + OmKeyArgs.Builder builder = new OmKeyArgs.Builder() + .setVolumeName(volumeName) + .setBucketName(bucketName) + .setKeyName(keyName) + .setDataSize(size) + .setReplicationConfig(replicationConfig) + .addAllMetadata(metadata) + .setAcls(getAclList()); + + if (Boolean.parseBoolean(metadata.get(OzoneConsts.GDPR_FLAG))) { + try{ + GDPRSymmetricKey gKey = new GDPRSymmetricKey(new SecureRandom()); + builder.addAllMetadata(gKey.getKeyDetails()); + } catch (Exception e) { + if (e instanceof InvalidKeyException && + e.getMessage().contains("Illegal key size or default parameters")) { + LOG.error("Missing Unlimited Strength Policy jars. Please install " + + "Java Cryptography Extension (JCE) Unlimited Strength " + + "Jurisdiction Policy Files"); + } + throw new IOException(e); + } + } + + OpenKeySession openKey = ozoneManagerClient.openKey(builder.build()); + return createDataStreamOutput(openKey, requestId, replicationConfig); + } + private KeyProvider.KeyVersion getDEK(FileEncryptionInfo feInfo) throws IOException { // check crypto protocol version @@ -1382,6 +1426,24 @@ private OzoneInputStream createInputStream( cryptoInputStreams); } } + private OzoneDataStreamOutput createDataStreamOutput(OpenKeySession openKey, + String requestId, ReplicationConfig replicationConfig) + throws IOException { + KeyDataStreamOutput keyOutputStream = + new KeyDataStreamOutput.Builder() + .setHandler(openKey) + .setXceiverClientManager(xceiverClientManager) + .setOmClient(ozoneManagerClient) + .setRequestID(requestId) + .setReplicationConfig(replicationConfig) + .enableUnsafeByteBufferConversion(unsafeByteBufferConversion) + .setConfig(clientConfig) + .build(); + keyOutputStream + .addPreallocateBlocks(openKey.getKeyInfo().getLatestVersionLocations(), + openKey.getOpenVersion()); + return new OzoneDataStreamOutput(keyOutputStream); + } private OzoneOutputStream createOutputStream(OpenKeySession openKey, String requestId, ReplicationConfig replicationConfig) diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockDataStreamOutput.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockDataStreamOutput.java new file mode 100644 index 000000000000..4d52d8949046 --- /dev/null +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockDataStreamOutput.java @@ -0,0 +1,181 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.hadoop.ozone.client.rpc; + +import io.netty.buffer.Unpooled; +import org.apache.hadoop.conf.StorageUnit; +import org.apache.hadoop.hdds.client.ReplicationType; +import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChecksumType; +import org.apache.hadoop.hdds.scm.OzoneClientConfig; +import org.apache.hadoop.ozone.MiniOzoneCluster; +import org.apache.hadoop.ozone.OzoneConfigKeys; +import org.apache.hadoop.ozone.client.ObjectStore; +import org.apache.hadoop.ozone.client.OzoneClient; +import org.apache.hadoop.ozone.client.OzoneClientFactory; +import org.apache.hadoop.ozone.client.io.OzoneDataStreamOutput; +import org.apache.hadoop.ozone.container.ContainerTestHelper; +import org.apache.hadoop.ozone.container.TestHelper; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.Timeout; + +import java.io.IOException; +import java.util.UUID; +import java.util.concurrent.TimeUnit; + +import static java.nio.charset.StandardCharsets.UTF_8; +import static org.apache.hadoop.hdds.scm.ScmConfigKeys.HDDS_SCM_WATCHER_TIMEOUT; +import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_STALENODE_INTERVAL; + +/** + * Tests BlockDataStreamOutput class. + */ +public class TestBlockDataStreamOutput { + + /** + * Set a timeout for each test. + */ + @Rule + public Timeout timeout = Timeout.seconds(300); + private static MiniOzoneCluster cluster; + private static OzoneConfiguration conf = new OzoneConfiguration(); + private static OzoneClient client; + private static ObjectStore objectStore; + private static int chunkSize; + private static int flushSize; + private static int maxFlushSize; + private static int blockSize; + private static String volumeName; + private static String bucketName; + private static String keyString; + + /** + * Create a MiniDFSCluster for testing. + *

+ * Ozone is made active by setting OZONE_ENABLED = true + * + * @throws IOException + */ + @BeforeClass + public static void init() throws Exception { + chunkSize = 100; + flushSize = 2 * chunkSize; + maxFlushSize = 2 * flushSize; + blockSize = 2 * maxFlushSize; + + OzoneClientConfig clientConfig = conf.getObject(OzoneClientConfig.class); + clientConfig.setChecksumType(ChecksumType.NONE); + clientConfig.setStreamBufferFlushDelay(false); + conf.setFromObject(clientConfig); + + conf.setTimeDuration(HDDS_SCM_WATCHER_TIMEOUT, 1000, TimeUnit.MILLISECONDS); + conf.setTimeDuration(OZONE_SCM_STALENODE_INTERVAL, 3, TimeUnit.SECONDS); + conf.setQuietMode(false); + conf.setStorageSize(OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE, 4, + StorageUnit.MB); + + cluster = MiniOzoneCluster.newBuilder(conf) + .setNumDatanodes(7) + .setTotalPipelineNumLimit(10) + .setBlockSize(blockSize) + .setChunkSize(chunkSize) + .setStreamBufferFlushSize(flushSize) + .setStreamBufferMaxSize(maxFlushSize) + .setStreamBufferSizeUnit(StorageUnit.BYTES) + .build(); + cluster.waitForClusterToBeReady(); + //the easiest way to create an open container is creating a key + client = OzoneClientFactory.getRpcClient(conf); + objectStore = client.getObjectStore(); + keyString = UUID.randomUUID().toString(); + volumeName = "testblockoutputstream"; + bucketName = volumeName; + objectStore.createVolume(volumeName); + objectStore.getVolume(volumeName).createBucket(bucketName); + } + + private String getKeyName() { + return UUID.randomUUID().toString(); + } + + /** + * Shutdown MiniDFSCluster. + */ + @AfterClass + public static void shutdown() { + if (cluster != null) { + cluster.shutdown(); + } + } + + @Test + public void testMultiChunkWrite() throws Exception { + // write data less than 1 chunk size use streaming. + String keyName1 = getKeyName(); + OzoneDataStreamOutput key1 = createKey( + keyName1, ReplicationType.RATIS, 0); + int dataLength1 = chunkSize/2; + byte[] data1 = + ContainerTestHelper.getFixedLengthString(keyString, dataLength1) + .getBytes(UTF_8); + key1.write(Unpooled.copiedBuffer(data1)); + // now close the stream, It will update the key length. + key1.close(); + validateData(keyName1, data1); + + // write data more than 1 chunk size use streaming. + String keyName2 = getKeyName(); + OzoneDataStreamOutput key2 = createKey( + keyName2, ReplicationType.RATIS, 0); + int dataLength2 = chunkSize + 50; + byte[] data2 = + ContainerTestHelper.getFixedLengthString(keyString, dataLength2) + .getBytes(UTF_8); + key2.write(Unpooled.copiedBuffer(data2)); + // now close the stream, It will update the key length. + key2.close(); + validateData(keyName2, data2); + + // write data more than 1 block size use streaming. + String keyName3 = getKeyName(); + OzoneDataStreamOutput key3 = createKey( + keyName3, ReplicationType.RATIS, 0); + int dataLength3 = blockSize + 50; + byte[] data3 = + ContainerTestHelper.getFixedLengthString(keyString, dataLength3) + .getBytes(UTF_8); + key3.write(Unpooled.copiedBuffer(data3)); + // now close the stream, It will update the key length. + key3.close(); + validateData(keyName3, data3); + } + + private OzoneDataStreamOutput createKey(String keyName, ReplicationType type, + long size) throws Exception { + return TestHelper.createStreamKey( + keyName, type, size, objectStore, volumeName, bucketName); + } + private void validateData(String keyName, byte[] data) throws Exception { + TestHelper + .validateData(keyName, data, objectStore, volumeName, bucketName); + } + +} diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/TestHelper.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/TestHelper.java index aa502346b78f..a269a3cbcf9e 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/TestHelper.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/TestHelper.java @@ -22,6 +22,8 @@ import java.security.MessageDigest; import java.util.*; import java.util.concurrent.TimeoutException; + +import org.apache.hadoop.hdds.client.ReplicationConfig; import org.apache.hadoop.hdds.client.ReplicationType; import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; @@ -40,6 +42,7 @@ import org.apache.hadoop.ozone.client.ObjectStore; import org.apache.hadoop.ozone.client.io.BlockOutputStreamEntry; import org.apache.hadoop.ozone.client.io.KeyOutputStream; +import org.apache.hadoop.ozone.client.io.OzoneDataStreamOutput; import org.apache.hadoop.ozone.client.io.OzoneInputStream; import org.apache.hadoop.ozone.client.io.OzoneOutputStream; import org.apache.hadoop.ozone.container.common.impl.ContainerData; @@ -123,8 +126,23 @@ public static OzoneOutputStream createKey(String keyName, type == ReplicationType.STAND_ALONE ? org.apache.hadoop.hdds.client.ReplicationFactor.ONE : org.apache.hadoop.hdds.client.ReplicationFactor.THREE; + ReplicationConfig config = + ReplicationConfig.fromTypeAndFactor(type, factor); + return objectStore.getVolume(volumeName).getBucket(bucketName) + .createKey(keyName, size, config, new HashMap<>()); + } + + public static OzoneDataStreamOutput createStreamKey(String keyName, + ReplicationType type, long size, ObjectStore objectStore, + String volumeName, String bucketName) throws Exception { + org.apache.hadoop.hdds.client.ReplicationFactor factor = + type == ReplicationType.STAND_ALONE ? + org.apache.hadoop.hdds.client.ReplicationFactor.ONE : + org.apache.hadoop.hdds.client.ReplicationFactor.THREE; + ReplicationConfig config = + ReplicationConfig.fromTypeAndFactor(type, factor); return objectStore.getVolume(volumeName).getBucket(bucketName) - .createKey(keyName, size, type, factor, new HashMap<>()); + .createStreamKey(keyName, size, config, new HashMap<>()); } public static OzoneOutputStream createKey(String keyName, @@ -132,8 +150,10 @@ public static OzoneOutputStream createKey(String keyName, org.apache.hadoop.hdds.client.ReplicationFactor factor, long size, ObjectStore objectStore, String volumeName, String bucketName) throws Exception { + ReplicationConfig config = + ReplicationConfig.fromTypeAndFactor(type, factor); return objectStore.getVolume(volumeName).getBucket(bucketName) - .createKey(keyName, size, type, factor, new HashMap<>()); + .createKey(keyName, size, config, new HashMap<>()); } public static void validateData(String keyName, byte[] data, diff --git a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/shell/keys/PutKeyHandler.java b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/shell/keys/PutKeyHandler.java index c575b6e8e507..56bc834511a7 100644 --- a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/shell/keys/PutKeyHandler.java +++ b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/shell/keys/PutKeyHandler.java @@ -23,9 +23,14 @@ import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; +import java.io.RandomAccessFile; +import java.nio.ByteBuffer; +import java.nio.channels.FileChannel; import java.util.HashMap; import java.util.Map; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; import org.apache.hadoop.conf.StorageUnit; import org.apache.hadoop.hdds.client.ReplicationConfig; import org.apache.hadoop.hdds.client.ReplicationConfigValidator; @@ -36,6 +41,7 @@ import org.apache.hadoop.ozone.client.OzoneClient; import org.apache.hadoop.ozone.client.OzoneClientException; import org.apache.hadoop.ozone.client.OzoneVolume; +import org.apache.hadoop.ozone.client.io.OzoneDataStreamOutput; import org.apache.hadoop.ozone.shell.OzoneAddress; import org.apache.commons.codec.digest.DigestUtils; @@ -113,10 +119,36 @@ protected void execute(OzoneClient client, OzoneAddress address) int chunkSize = (int) getConf().getStorageSize(OZONE_SCM_CHUNK_SIZE_KEY, OZONE_SCM_CHUNK_SIZE_DEFAULT, StorageUnit.BYTES); - try (InputStream input = new FileInputStream(dataFile); - OutputStream output = bucket.createKey(keyName, dataFile.length(), - replicationConfig, keyMetadata)) { - IOUtils.copyBytes(input, output, chunkSize); + + if (dataFile.length() <= chunkSize) { + if (isVerbose()) { + out().println("API: async"); + } + try (InputStream input = new FileInputStream(dataFile); + OutputStream output = bucket.createKey(keyName, dataFile.length(), + replicationConfig, keyMetadata)) { + IOUtils.copyBytes(input, output, chunkSize); + } + } else { + if (isVerbose()) { + out().println("API: streaming"); + } + try (RandomAccessFile raf = new RandomAccessFile(dataFile, "r"); + OzoneDataStreamOutput out = bucket.createStreamKey(keyName, + dataFile.length(), replicationConfig, keyMetadata)) { + FileChannel ch = raf.getChannel(); + long len = raf.length(); + long off = 0; + while (len > 0) { + long writeLen = Math.min(len, chunkSize); + ByteBuffer segment = + ch.map(FileChannel.MapMode.READ_ONLY, off, writeLen); + ByteBuf buf = Unpooled.wrappedBuffer(segment); + out.write(buf); + off += writeLen; + len -= writeLen; + } + } } }