diff --git a/hadoop-hdds/client/pom.xml b/hadoop-hdds/client/pom.xml index 482c06732c1e..4e75e42d9847 100644 --- a/hadoop-hdds/client/pom.xml +++ b/hadoop-hdds/client/pom.xml @@ -66,10 +66,6 @@ https://maven.apache.org/xsd/maven-4.0.0.xsd"> ${spotbugs.version} provided - - io.netty - netty-buffer - diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockDataStreamOutput.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockDataStreamOutput.java index 39ec2f921961..d0419fa0c3cc 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockDataStreamOutput.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockDataStreamOutput.java @@ -20,7 +20,6 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; -import io.netty.buffer.ByteBuf; import org.apache.hadoop.hdds.client.BlockID; import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; @@ -46,6 +45,7 @@ import org.slf4j.LoggerFactory; import java.io.IOException; +import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.List; import java.util.concurrent.CompletableFuture; @@ -59,7 +59,7 @@ import static org.apache.hadoop.hdds.scm.storage.ContainerProtocolCalls.putBlockAsync; /** - * An {@link ByteBufStreamOutput} used by the REST service in combination + * An {@link ByteBufferStreamOutput} used by the REST service in combination * with the SCMClient to write the value of a key to a sequence * of container chunks. Writes are buffered locally and periodically written to * the container as a new chunk. In order to preserve the semantics that @@ -74,7 +74,7 @@ * This class encapsulates all state management for buffering and writing * through to the container. */ -public class BlockDataStreamOutput implements ByteBufStreamOutput { +public class BlockDataStreamOutput implements ByteBufferStreamOutput { public static final Logger LOG = LoggerFactory.getLogger(BlockDataStreamOutput.class); public static final String EXCEPTION_MSG = @@ -209,16 +209,16 @@ public IOException getIoException() { } @Override - public void write(ByteBuf buf) throws IOException { + public void write(ByteBuffer b, int off, int len) throws IOException { checkOpen(); - if (buf == null) { + if (b == null) { throw new NullPointerException(); } - final int len = buf.readableBytes(); if (len == 0) { return; } - writeChunkToContainer(buf); + writeChunkToContainer( + (ByteBuffer) b.asReadOnlyBuffer().position(off).limit(off + len)); writtenDataLength += len; } @@ -476,15 +476,17 @@ private boolean needSync(long position) { * Writes buffered data as a new chunk to the container and saves chunk * information to be used later in putKey call. * + * @param buf chunk data to write, from position to limit * @throws IOException if there is an I/O error while performing the call * @throws OzoneChecksumException if there is an error while computing * checksum */ - private void writeChunkToContainer(ByteBuf buf) + private void writeChunkToContainer(ByteBuffer buf) throws IOException { - ChecksumData checksumData = checksum.computeChecksum(buf.nioBuffer()); - int effectiveChunkSize = buf.readableBytes(); + final int effectiveChunkSize = buf.remaining(); final long offset = chunkOffset.getAndAdd(effectiveChunkSize); + ChecksumData checksumData = + checksum.computeChecksum(buf.asReadOnlyBuffer()); ChunkInfo chunkInfo = ChunkInfo.newBuilder() .setChunkName(blockID.get().getLocalID() + "_chunk_" + ++chunkIndex) .setOffset(offset) @@ -499,8 +501,8 @@ private void writeChunkToContainer(ByteBuf buf) CompletableFuture future = (needSync(offset + effectiveChunkSize) ? - out.writeAsync(buf.nioBuffer(), StandardWriteOption.SYNC) : - out.writeAsync(buf.nioBuffer())) + out.writeAsync(buf, StandardWriteOption.SYNC) : + out.writeAsync(buf)) .whenCompleteAsync((r, e) -> { if (e != null || !r.isSuccess()) { if (e == null) { diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ByteBufStreamOutput.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ByteBufferStreamOutput.java similarity index 82% rename from hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ByteBufStreamOutput.java rename to hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ByteBufferStreamOutput.java index 7f40737b709f..0650a685b634 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ByteBufStreamOutput.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ByteBufferStreamOutput.java @@ -18,23 +18,24 @@ package org.apache.hadoop.hdds.scm.storage; -import io.netty.buffer.ByteBuf; - import java.io.Closeable; import java.io.IOException; +import java.nio.ByteBuffer; /** * This interface is for writing an output stream of ByteBuffers. -* An ByteBufStreamOutput accepts Netty ByteBuf and sends them to some sink. +* An ByteBufferStreamOutput accepts nio ByteBuffer and sends them to some sink. */ -public interface ByteBufStreamOutput extends Closeable { +public interface ByteBufferStreamOutput extends Closeable { /** * Try to write all the bytes in ByteBuf b to DataStream. * * @param b the data. * @exception IOException if an I/O error occurs. */ - void write(ByteBuf b) throws IOException; + default void write(ByteBuffer b) throws IOException { + write(b, b.position(), b.remaining()); + } /** * Try to write the [off:off + len) slice in ByteBuf b to DataStream. @@ -44,9 +45,7 @@ public interface ByteBufStreamOutput extends Closeable { * @param len the number of bytes to write. * @exception IOException if an I/O error occurs. */ - default void write(ByteBuf b, int off, int len) throws IOException { - write(b.slice(off, len)); - } + void write(ByteBuffer b, int off, int len) throws IOException; /** * Flushes this DataStream output and forces any buffered output bytes diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockDataStreamOutputEntry.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockDataStreamOutputEntry.java index 98907bf8af4b..f0c3a43e891e 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockDataStreamOutputEntry.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockDataStreamOutputEntry.java @@ -18,18 +18,18 @@ package org.apache.hadoop.ozone.client.io; import com.google.common.annotations.VisibleForTesting; -import io.netty.buffer.ByteBuf; import org.apache.hadoop.hdds.client.BlockID; import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.scm.OzoneClientConfig; import org.apache.hadoop.hdds.scm.XceiverClientFactory; import org.apache.hadoop.hdds.scm.pipeline.Pipeline; import org.apache.hadoop.hdds.scm.storage.BlockDataStreamOutput; -import org.apache.hadoop.hdds.scm.storage.ByteBufStreamOutput; +import org.apache.hadoop.hdds.scm.storage.ByteBufferStreamOutput; import org.apache.hadoop.hdds.security.token.OzoneBlockTokenIdentifier; import org.apache.hadoop.security.token.Token; import java.io.IOException; +import java.nio.ByteBuffer; import java.util.Collection; import java.util.Collections; @@ -37,10 +37,10 @@ * Helper class used inside {@link BlockDataStreamOutput}. * */ public final class BlockDataStreamOutputEntry - implements ByteBufStreamOutput { + implements ByteBufferStreamOutput { private final OzoneClientConfig config; - private ByteBufStreamOutput byteBufStreamOutput; + private ByteBufferStreamOutput byteBufferStreamOutput; private BlockID blockID; private final String key; private final XceiverClientFactory xceiverClientManager; @@ -61,7 +61,7 @@ private BlockDataStreamOutputEntry( OzoneClientConfig config ) { this.config = config; - this.byteBufStreamOutput = null; + this.byteBufferStreamOutput = null; this.blockID = blockID; this.key = key; this.xceiverClientManager = xceiverClientManager; @@ -90,63 +90,62 @@ long getRemaining() { * @throws IOException if xceiverClient initialization fails */ private void checkStream() throws IOException { - if (this.byteBufStreamOutput == null) { - this.byteBufStreamOutput = + if (this.byteBufferStreamOutput == null) { + this.byteBufferStreamOutput = new BlockDataStreamOutput(blockID, xceiverClientManager, pipeline, config, token); } } @Override - public void write(ByteBuf b) throws IOException { + public void write(ByteBuffer b, int off, int len) throws IOException { checkStream(); - final int len = b.readableBytes(); - byteBufStreamOutput.write(b); + byteBufferStreamOutput.write(b, off, len); this.currentPosition += len; } @Override public void flush() throws IOException { - if (this.byteBufStreamOutput != null) { - this.byteBufStreamOutput.flush(); + if (this.byteBufferStreamOutput != null) { + this.byteBufferStreamOutput.flush(); } } @Override public void close() throws IOException { - if (this.byteBufStreamOutput != null) { - this.byteBufStreamOutput.close(); + if (this.byteBufferStreamOutput != null) { + this.byteBufferStreamOutput.close(); // after closing the chunkOutPutStream, blockId would have been // reconstructed with updated bcsId this.blockID = - ((BlockDataStreamOutput) byteBufStreamOutput).getBlockID(); + ((BlockDataStreamOutput) byteBufferStreamOutput).getBlockID(); } } boolean isClosed() { - if (byteBufStreamOutput != null) { - return ((BlockDataStreamOutput) byteBufStreamOutput).isClosed(); + if (byteBufferStreamOutput != null) { + return ((BlockDataStreamOutput) byteBufferStreamOutput).isClosed(); } return false; } Collection getFailedServers() { - if (byteBufStreamOutput != null) { + if (byteBufferStreamOutput != null) { BlockDataStreamOutput out = - (BlockDataStreamOutput) this.byteBufStreamOutput; + (BlockDataStreamOutput) this.byteBufferStreamOutput; return out.getFailedServers(); } return Collections.emptyList(); } long getWrittenDataLength() { - if (byteBufStreamOutput != null) { + if (byteBufferStreamOutput != null) { BlockDataStreamOutput out = - (BlockDataStreamOutput) this.byteBufStreamOutput; + (BlockDataStreamOutput) this.byteBufferStreamOutput; return out.getWrittenDataLength(); } else { // For a pre allocated block for which no write has been initiated, - // the ByteBufStreamOutput will be null here. + // the ByteBufferStreamOutput will be null here. // In such cases, the default blockCommitSequenceId will be 0 return 0; } @@ -155,7 +154,7 @@ long getWrittenDataLength() { void cleanup(boolean invalidateClient) throws IOException { checkStream(); BlockDataStreamOutput out = - (BlockDataStreamOutput) this.byteBufStreamOutput; + (BlockDataStreamOutput) this.byteBufferStreamOutput; out.cleanup(invalidateClient); } @@ -163,7 +162,7 @@ void cleanup(boolean invalidateClient) throws IOException { void writeOnRetry(long len) throws IOException { checkStream(); BlockDataStreamOutput out = - (BlockDataStreamOutput) this.byteBufStreamOutput; + (BlockDataStreamOutput) this.byteBufferStreamOutput; out.writeOnRetry(len); this.currentPosition += len; @@ -231,8 +230,8 @@ public BlockDataStreamOutputEntry build() { } @VisibleForTesting - public ByteBufStreamOutput getByteBufStreamOutput() { - return byteBufStreamOutput; + public ByteBufferStreamOutput getByteBufStreamOutput() { + return byteBufferStreamOutput; } public BlockID getBlockID() { diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyDataStreamOutput.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyDataStreamOutput.java index c37f9cd51d3c..9bba89d0a8a0 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyDataStreamOutput.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyDataStreamOutput.java @@ -19,7 +19,6 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; -import io.netty.buffer.ByteBuf; import org.apache.hadoop.fs.FSExceptionMessages; import org.apache.hadoop.fs.FileEncryptionInfo; import org.apache.hadoop.hdds.client.ReplicationConfig; @@ -32,7 +31,7 @@ import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException; import org.apache.hadoop.hdds.scm.pipeline.Pipeline; import org.apache.hadoop.hdds.scm.pipeline.PipelineID; -import org.apache.hadoop.hdds.scm.storage.ByteBufStreamOutput; +import org.apache.hadoop.hdds.scm.storage.ByteBufferStreamOutput; import org.apache.hadoop.io.retry.RetryPolicies; import org.apache.hadoop.io.retry.RetryPolicy; import org.apache.hadoop.ozone.om.helpers.OmKeyInfo; @@ -48,6 +47,7 @@ import java.io.IOException; import java.io.InterruptedIOException; +import java.nio.ByteBuffer; import java.util.Collection; import java.util.List; import java.util.Map; @@ -63,7 +63,7 @@ * * TODO : currently not support multi-thread access. */ -public class KeyDataStreamOutput implements ByteBufStreamOutput { +public class KeyDataStreamOutput implements ByteBufferStreamOutput { private OzoneClientConfig config; @@ -185,17 +185,16 @@ public void addPreallocateBlocks(OmKeyLocationInfoGroup version, } @Override - public void write(ByteBuf b) throws IOException { + public void write(ByteBuffer b, int off, int len) throws IOException { checkNotClosed(); if (b == null) { throw new NullPointerException(); } - final int len = b.readableBytes(); - handleWrite(b, b.readerIndex(), len, false); + handleWrite(b, off, len, false); writeOffset += len; } - private void handleWrite(ByteBuf b, int off, long len, boolean retry) + private void handleWrite(ByteBuffer b, int off, long len, boolean retry) throws IOException { while (len > 0) { try { @@ -227,7 +226,7 @@ private void handleWrite(ByteBuf b, int off, long len, boolean retry) } private int writeToDataStreamOutput(BlockDataStreamOutputEntry current, - boolean retry, long len, ByteBuf b, int writeLen, int off, + boolean retry, long len, ByteBuffer b, int writeLen, int off, long currentPos) throws IOException { try { if (retry) { diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/OzoneDataStreamOutput.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/OzoneDataStreamOutput.java index 378b86872e43..d40ac2b332ef 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/OzoneDataStreamOutput.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/OzoneDataStreamOutput.java @@ -16,55 +16,55 @@ */ package org.apache.hadoop.ozone.client.io; -import io.netty.buffer.ByteBuf; -import org.apache.hadoop.hdds.scm.storage.ByteBufStreamOutput; +import org.apache.hadoop.hdds.scm.storage.ByteBufferStreamOutput; import org.apache.hadoop.ozone.om.helpers.OmMultipartCommitUploadPartInfo; import java.io.IOException; +import java.nio.ByteBuffer; /** * OzoneDataStreamOutput is used to write data into Ozone. * It uses SCM's {@link KeyDataStreamOutput} for writing the data. */ -public class OzoneDataStreamOutput implements ByteBufStreamOutput { +public class OzoneDataStreamOutput implements ByteBufferStreamOutput { - private final ByteBufStreamOutput byteBufStreamOutput; + private final ByteBufferStreamOutput byteBufferStreamOutput; /** * Constructs OzoneDataStreamOutput with KeyDataStreamOutput. * - * @param byteBufStreamOutput + * @param byteBufferStreamOutput the underlying ByteBufferStreamOutput */ - public OzoneDataStreamOutput(ByteBufStreamOutput byteBufStreamOutput) { - this.byteBufStreamOutput = byteBufStreamOutput; + public OzoneDataStreamOutput(ByteBufferStreamOutput byteBufferStreamOutput) { + this.byteBufferStreamOutput = byteBufferStreamOutput; } @Override - public void write(ByteBuf b) throws IOException { - byteBufStreamOutput.write(b); + public void write(ByteBuffer b, int off, int len) throws IOException { + byteBufferStreamOutput.write(b, off, len); } @Override public synchronized void flush() throws IOException { - byteBufStreamOutput.flush(); + byteBufferStreamOutput.flush(); } @Override public synchronized void close() throws IOException { //commitKey can be done here, if needed. - byteBufStreamOutput.close(); + byteBufferStreamOutput.close(); } public OmMultipartCommitUploadPartInfo getCommitUploadPartInfo() { - if (byteBufStreamOutput instanceof KeyDataStreamOutput) { + if (byteBufferStreamOutput instanceof KeyDataStreamOutput) { return ((KeyDataStreamOutput) - byteBufStreamOutput).getCommitUploadPartInfo(); + byteBufferStreamOutput).getCommitUploadPartInfo(); } // Otherwise return null. return null; } - public ByteBufStreamOutput getByteBufStreamOutput() { - return byteBufStreamOutput; + public ByteBufferStreamOutput getByteBufStreamOutput() { + return byteBufferStreamOutput; } } diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockDataStreamOutput.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockDataStreamOutput.java index 4d52d8949046..6d5401d651d5 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockDataStreamOutput.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockDataStreamOutput.java @@ -17,7 +17,6 @@ package org.apache.hadoop.ozone.client.rpc; -import io.netty.buffer.Unpooled; import org.apache.hadoop.conf.StorageUnit; import org.apache.hadoop.hdds.client.ReplicationType; import org.apache.hadoop.hdds.conf.OzoneConfiguration; @@ -38,6 +37,7 @@ import org.junit.rules.Timeout; import java.io.IOException; +import java.nio.ByteBuffer; import java.util.UUID; import java.util.concurrent.TimeUnit; @@ -126,48 +126,38 @@ public static void shutdown() { } } + @Test + public void testHalfChunkWrite() throws Exception { + testWrite(chunkSize / 2); + } + + @Test + public void testSingleChunkWrite() throws Exception { + testWrite(chunkSize); + } + @Test public void testMultiChunkWrite() throws Exception { - // write data less than 1 chunk size use streaming. - String keyName1 = getKeyName(); - OzoneDataStreamOutput key1 = createKey( - keyName1, ReplicationType.RATIS, 0); - int dataLength1 = chunkSize/2; - byte[] data1 = - ContainerTestHelper.getFixedLengthString(keyString, dataLength1) - .getBytes(UTF_8); - key1.write(Unpooled.copiedBuffer(data1)); - // now close the stream, It will update the key length. - key1.close(); - validateData(keyName1, data1); - - // write data more than 1 chunk size use streaming. - String keyName2 = getKeyName(); - OzoneDataStreamOutput key2 = createKey( - keyName2, ReplicationType.RATIS, 0); - int dataLength2 = chunkSize + 50; - byte[] data2 = - ContainerTestHelper.getFixedLengthString(keyString, dataLength2) - .getBytes(UTF_8); - key2.write(Unpooled.copiedBuffer(data2)); - // now close the stream, It will update the key length. - key2.close(); - validateData(keyName2, data2); - - // write data more than 1 block size use streaming. - String keyName3 = getKeyName(); - OzoneDataStreamOutput key3 = createKey( - keyName3, ReplicationType.RATIS, 0); - int dataLength3 = blockSize + 50; - byte[] data3 = - ContainerTestHelper.getFixedLengthString(keyString, dataLength3) + testWrite(chunkSize + 50); + } + + @Test + public void testMultiBlockWrite() throws Exception { + testWrite(blockSize + 50); + } + + private void testWrite(int dataLength) throws Exception { + String keyName = getKeyName(); + OzoneDataStreamOutput key = createKey( + keyName, ReplicationType.RATIS, 0); + byte[] data = + ContainerTestHelper.getFixedLengthString(keyString, dataLength) .getBytes(UTF_8); - key3.write(Unpooled.copiedBuffer(data3)); + key.write(ByteBuffer.wrap(data)); // now close the stream, It will update the key length. - key3.close(); - validateData(keyName3, data3); + key.close(); + validateData(keyName, data); } - private OzoneDataStreamOutput createKey(String keyName, ReplicationType type, long size) throws Exception { return TestHelper.createStreamKey( diff --git a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/shell/keys/PutKeyHandler.java b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/shell/keys/PutKeyHandler.java index 56bc834511a7..af6a461c9f06 100644 --- a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/shell/keys/PutKeyHandler.java +++ b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/shell/keys/PutKeyHandler.java @@ -29,8 +29,6 @@ import java.util.HashMap; import java.util.Map; -import io.netty.buffer.ByteBuf; -import io.netty.buffer.Unpooled; import org.apache.hadoop.conf.StorageUnit; import org.apache.hadoop.hdds.client.ReplicationConfig; import org.apache.hadoop.hdds.client.ReplicationConfigValidator; @@ -141,10 +139,8 @@ protected void execute(OzoneClient client, OzoneAddress address) long off = 0; while (len > 0) { long writeLen = Math.min(len, chunkSize); - ByteBuffer segment = - ch.map(FileChannel.MapMode.READ_ONLY, off, writeLen); - ByteBuf buf = Unpooled.wrappedBuffer(segment); - out.write(buf); + ByteBuffer bb = ch.map(FileChannel.MapMode.READ_ONLY, off, writeLen); + out.write(bb); off += writeLen; len -= writeLen; }