From 2388d620511404c4de8f99ce8a37e0c411294c8d Mon Sep 17 00:00:00 2001 From: runzhiwang Date: Mon, 20 Apr 2020 09:32:41 +0800 Subject: [PATCH 1/2] HDDS-3223. Improve s3g read 1GB object efficiency by 100 times --- .../ozone/s3/endpoint/ObjectEndpoint.java | 3 +- .../ozone/s3/io/S3WrapperInputStream.java | 86 +++++++++++++++++++ 2 files changed, 87 insertions(+), 2 deletions(-) diff --git a/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpoint.java b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpoint.java index 455e73436ee1..22d9bdf91f8c 100644 --- a/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpoint.java +++ b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpoint.java @@ -259,8 +259,7 @@ public Response get( try (S3WrapperInputStream s3WrapperInputStream = new S3WrapperInputStream( key.getInputStream())) { - IOUtils.copyLarge(s3WrapperInputStream, dest, startOffset, - copyLength); + s3WrapperInputStream.copyLarge(dest, startOffset, copyLength); } }; responseBuilder = Response diff --git a/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/io/S3WrapperInputStream.java b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/io/S3WrapperInputStream.java index 9efcc8738c66..69c3a0a1e780 100644 --- a/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/io/S3WrapperInputStream.java +++ b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/io/S3WrapperInputStream.java @@ -18,17 +18,20 @@ package org.apache.hadoop.ozone.s3.io; +import org.apache.commons.io.IOUtils; import org.apache.hadoop.fs.FSInputStream; import org.apache.hadoop.ozone.client.io.KeyInputStream; import java.io.IOException; import java.io.InputStream; +import java.io.OutputStream; /** * S3Wrapper Input Stream which encapsulates KeyInputStream from ozone. */ public class S3WrapperInputStream extends FSInputStream { private final KeyInputStream inputStream; + private static final int DEFAULT_BUFFER_SIZE = 32 * 1024; /** * Constructs S3WrapperInputStream with KeyInputStream. @@ -76,4 +79,87 @@ public long getPos() throws IOException { public boolean seekToNewSource(long targetPos) throws IOException { return false; } + + /** + * Copies some or all bytes from a large (over 2GB) InputStream + * to an OutputStream, optionally skipping input bytes. + *

+ * Copy the method from IOUtils of commons-io to reimplement skip by seek + * rather than read. The reason why IOUtils of commons-io implement skip + * by read can be found at + * IO-203. + *

+ *

+ * This method buffers the input internally, so there is no need to use a + * BufferedInputStream. + *

+ * The buffer size is given by {@link #DEFAULT_BUFFER_SIZE}. + * + * @param output the OutputStream to write to + * @param inputOffset : number of bytes to skip from input before copying + * -ve values are ignored + * @param length : number of bytes to copy. -ve means all + * @return the number of bytes copied + * @throws NullPointerException if the input or output is null + * @throws IOException if an I/O error occurs + */ + public long copyLarge(final OutputStream output, final long inputOffset, + final long length) throws IOException { + return copyLarge(output, inputOffset, length, + new byte[DEFAULT_BUFFER_SIZE]); + } + + /** + * Copies some or all bytes from a large (over 2GB) InputStream + * to an OutputStream, optionally skipping input bytes. + *

+ * Copy the method from IOUtils of commons-io to reimplement skip by seek + * rather than read. The reason why IOUtils of commons-io implement skip + * by read can be found at + * IO-203. + *

+ *

+ * This method uses the provided buffer, so there is no need to use a + * BufferedInputStream. + *

+ * + * @param output the OutputStream to write to + * @param inputOffset : number of bytes to skip from input before copying + * -ve values are ignored + * @param length : number of bytes to copy. -ve means all + * @param buffer the buffer to use for the copy + * @return the number of bytes copied + * @throws NullPointerException if the input or output is null + * @throws IOException if an I/O error occurs + */ + public long copyLarge(final OutputStream output, + final long inputOffset, final long length, final byte[] buffer) + throws IOException { + if (inputOffset > 0) { + seek(inputOffset); + } + if (length == 0) { + return 0; + } + final int bufferLength = buffer.length; + int bytesToRead = bufferLength; + if (length > 0 && length < bufferLength) { + bytesToRead = (int) length; + } + int read; + long totalRead = 0; + while (bytesToRead > 0) { + read = inputStream.read(buffer, 0, bytesToRead); + if (read == IOUtils.EOF) { + break; + } + output.write(buffer, 0, read); + totalRead += read; + if (length > 0) { // only adjust length if not reading to the end + // Note the cast must work because buffer.length is an integer + bytesToRead = (int) Math.min(length - totalRead, bufferLength); + } + } + return totalRead; + } } From f7d948ba8f428b7481bb6eae8bc6cfee9c015081 Mon Sep 17 00:00:00 2001 From: runzhiwang Date: Wed, 22 Apr 2020 21:47:16 +0800 Subject: [PATCH 2/2] fix code review --- .../hadoop/hdds/scm/XceiverClientManager.java | 9 +++ .../hadoop/hdds/scm/XceiverClientMetrics.java | 9 +++ .../ozone/client/io/KeyInputStream.java | 61 ++++++++++++++++++ .../ozone/client/rpc/TestKeyInputStream.java | 62 +++++++++++++++++++ .../ozone/s3/endpoint/ObjectEndpoint.java | 12 +++- .../ozone/s3/io/S3WrapperInputStream.java | 57 +---------------- .../endpoint/TestMultipartUploadWithCopy.java | 10 +-- 7 files changed, 152 insertions(+), 68 deletions(-) diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientManager.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientManager.java index 4ceff0b20511..80d1334a75a2 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientManager.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientManager.java @@ -291,6 +291,15 @@ public static synchronized XceiverClientMetrics getXceiverClientMetrics() { return metrics; } + /** + * Reset xceiver client metric. + */ + public static synchronized void resetXceiverClientMetrics() { + if (metrics != null) { + metrics.reset(); + } + } + /** * Configuration for HDDS client. */ diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientMetrics.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientMetrics.java index 7ca89ec52476..530764608466 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientMetrics.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientMetrics.java @@ -45,6 +45,10 @@ public class XceiverClientMetrics { private MetricsRegistry registry; public XceiverClientMetrics() { + init(); + } + + public void init() { int numEnumEntries = ContainerProtos.Type.values().length; this.registry = new MetricsRegistry(SOURCE_NAME); @@ -106,6 +110,11 @@ public long getContainerOpCountMetrics(ContainerProtos.Type type) { return opsArray[type.ordinal()].value(); } + @VisibleForTesting + public void reset() { + init(); + } + public void unRegister() { MetricsSystem ms = DefaultMetricsSystem.instance(); ms.unregisterSource(SOURCE_NAME); diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyInputStream.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyInputStream.java index 8e375bf951f7..4af683827240 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyInputStream.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyInputStream.java @@ -20,6 +20,7 @@ import com.google.common.annotations.VisibleForTesting; import org.apache.commons.collections.CollectionUtils; +import org.apache.commons.io.IOUtils; import org.apache.hadoop.fs.FSExceptionMessages; import org.apache.hadoop.fs.Seekable; import org.apache.hadoop.hdds.client.BlockID; @@ -34,6 +35,7 @@ import java.io.EOFException; import java.io.IOException; import java.io.InputStream; +import java.io.OutputStream; import java.util.ArrayList; import java.util.Arrays; import java.util.List; @@ -322,4 +324,63 @@ public synchronized int getCurrentStreamIndex() { public long getRemainingOfIndex(int index) throws IOException { return blockStreams.get(index).getRemaining(); } + + /** + * Copies some or all bytes from a large (over 2GB) InputStream + * to an OutputStream, optionally skipping input bytes. + *

+ * Copy the method from IOUtils of commons-io to reimplement skip by seek + * rather than read. The reason why IOUtils of commons-io implement skip + * by read can be found at + * IO-203. + *

+ *

+ * This method uses the provided buffer, so there is no need to use a + * BufferedInputStream. + *

+ * + * @param output the OutputStream to write to + * @param inputOffset : number of bytes to skip from input before copying + * -ve values are ignored + * @param length : number of bytes to copy. -ve means all + * @param buffer the buffer to use for the copy + * @return the number of bytes copied + * @throws NullPointerException if the input or output is null + * @throws IOException if an I/O error occurs + */ + public long copyLarge(final OutputStream output, + final long inputOffset, final long len, final byte[] buffer) + throws IOException { + if (inputOffset > 0) { + seek(inputOffset); + } + + if (len == 0) { + return 0; + } + + final int bufferLength = buffer.length; + int bytesToRead = bufferLength; + if (len > 0 && len < bufferLength) { + bytesToRead = (int) len; + } + + int read; + long totalRead = 0; + while (bytesToRead > 0) { + read = read(buffer, 0, bytesToRead); + if (read == IOUtils.EOF) { + break; + } + + output.write(buffer, 0, read); + totalRead += read; + if (len > 0) { // only adjust len if not reading to the end + // Note the cast must work because buffer.length is an integer + bytesToRead = (int) Math.min(len - totalRead, bufferLength); + } + } + + return totalRead; + } } diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestKeyInputStream.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestKeyInputStream.java index 2779e7fc0681..75b605547d06 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestKeyInputStream.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestKeyInputStream.java @@ -37,6 +37,7 @@ import org.junit.BeforeClass; import org.junit.Test; +import java.io.ByteArrayOutputStream; import java.io.IOException; import java.util.Random; import java.util.UUID; @@ -223,6 +224,7 @@ private void validate(KeyInputStream keyInputStream, byte[] inputData, @Test public void testSeek() throws Exception { + XceiverClientManager.resetXceiverClientMetrics(); XceiverClientMetrics metrics = XceiverClientManager .getXceiverClientMetrics(); long writeChunkCount = metrics.getContainerOpCountMetrics( @@ -273,4 +275,64 @@ public void testSeek() throws Exception { Assert.assertEquals(inputData[chunkSize + 50 + i], readData[i]); } } + + @Test + public void testCopyLarge() throws Exception { + String keyName = getKeyName(); + OzoneOutputStream key = TestHelper.createKey(keyName, + ReplicationType.RATIS, 0, objectStore, volumeName, bucketName); + + // write data spanning 3 blocks + int dataLength = (2 * blockSize) + (blockSize / 2); + + byte[] inputData = new byte[dataLength]; + Random rand = new Random(); + for (int i = 0; i < dataLength; i++) { + inputData[i] = (byte) rand.nextInt(127); + } + key.write(inputData); + key.close(); + + // test with random start and random length + for (int i = 0; i < 100; i++) { + int inputOffset = rand.nextInt(dataLength - 1); + int length = rand.nextInt(dataLength - inputOffset); + + KeyInputStream keyInputStream = (KeyInputStream) objectStore + .getVolume(volumeName).getBucket(bucketName).readKey(keyName) + .getInputStream(); + ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); + + keyInputStream.copyLarge(outputStream, inputOffset, length, + new byte[4096]); + byte[] readData = outputStream.toByteArray(); + keyInputStream.close(); + outputStream.close(); + + for (int j = inputOffset; j < inputOffset + length; j++) { + Assert.assertEquals(readData[j - inputOffset], inputData[j]); + } + } + + // test with random start and -ve length + for (int i = 0; i < 10; i++) { + int inputOffset = rand.nextInt(dataLength - 1); + int length = -1; + + KeyInputStream keyInputStream = (KeyInputStream) objectStore + .getVolume(volumeName).getBucket(bucketName).readKey(keyName) + .getInputStream(); + ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); + + keyInputStream.copyLarge(outputStream, inputOffset, length, + new byte[4096]); + byte[] readData = outputStream.toByteArray(); + keyInputStream.close(); + outputStream.close(); + + for (int j = inputOffset; j < dataLength; j++) { + Assert.assertEquals(readData[j - inputOffset], inputData[j]); + } + } + } } diff --git a/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpoint.java b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpoint.java index 22d9bdf91f8c..f695fcba55b6 100644 --- a/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpoint.java +++ b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpoint.java @@ -533,10 +533,16 @@ private Response createMultipartKey(String bucket, String key, long length, if (range != null) { RangeHeader rangeHeader = RangeHeaderParserUtil.parseRangeHeader(range, 0); - IOUtils.copyLarge(sourceObject, ozoneOutputStream, - rangeHeader.getStartOffset(), - rangeHeader.getEndOffset() - rangeHeader.getStartOffset()); + long copyLength = rangeHeader.getEndOffset() - + rangeHeader.getStartOffset(); + + try (S3WrapperInputStream s3WrapperInputStream = + new S3WrapperInputStream( + sourceObject.getInputStream())) { + s3WrapperInputStream.copyLarge(ozoneOutputStream, + rangeHeader.getStartOffset(), copyLength); + } } else { IOUtils.copy(sourceObject, ozoneOutputStream); } diff --git a/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/io/S3WrapperInputStream.java b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/io/S3WrapperInputStream.java index 69c3a0a1e780..edf90edd9a3e 100644 --- a/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/io/S3WrapperInputStream.java +++ b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/io/S3WrapperInputStream.java @@ -18,7 +18,6 @@ package org.apache.hadoop.ozone.s3.io; -import org.apache.commons.io.IOUtils; import org.apache.hadoop.fs.FSInputStream; import org.apache.hadoop.ozone.client.io.KeyInputStream; @@ -105,61 +104,7 @@ public boolean seekToNewSource(long targetPos) throws IOException { */ public long copyLarge(final OutputStream output, final long inputOffset, final long length) throws IOException { - return copyLarge(output, inputOffset, length, + return inputStream.copyLarge(output, inputOffset, length, new byte[DEFAULT_BUFFER_SIZE]); } - - /** - * Copies some or all bytes from a large (over 2GB) InputStream - * to an OutputStream, optionally skipping input bytes. - *

- * Copy the method from IOUtils of commons-io to reimplement skip by seek - * rather than read. The reason why IOUtils of commons-io implement skip - * by read can be found at - * IO-203. - *

- *

- * This method uses the provided buffer, so there is no need to use a - * BufferedInputStream. - *

- * - * @param output the OutputStream to write to - * @param inputOffset : number of bytes to skip from input before copying - * -ve values are ignored - * @param length : number of bytes to copy. -ve means all - * @param buffer the buffer to use for the copy - * @return the number of bytes copied - * @throws NullPointerException if the input or output is null - * @throws IOException if an I/O error occurs - */ - public long copyLarge(final OutputStream output, - final long inputOffset, final long length, final byte[] buffer) - throws IOException { - if (inputOffset > 0) { - seek(inputOffset); - } - if (length == 0) { - return 0; - } - final int bufferLength = buffer.length; - int bytesToRead = bufferLength; - if (length > 0 && length < bufferLength) { - bytesToRead = (int) length; - } - int read; - long totalRead = 0; - while (bytesToRead > 0) { - read = inputStream.read(buffer, 0, bytesToRead); - if (read == IOUtils.EOF) { - break; - } - output.write(buffer, 0, read); - totalRead += read; - if (length > 0) { // only adjust length if not reading to the end - // Note the cast must work because buffer.length is an integer - bytesToRead = (int) Math.min(length - totalRead, bufferLength); - } - } - return totalRead; - } } diff --git a/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/TestMultipartUploadWithCopy.java b/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/TestMultipartUploadWithCopy.java index f688ff9191c2..36fa70bd68c7 100644 --- a/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/TestMultipartUploadWithCopy.java +++ b/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/TestMultipartUploadWithCopy.java @@ -111,13 +111,6 @@ public void testMultipart() throws Exception { OzoneConsts.S3_BUCKET + "/" + EXISTING_KEY, null); partsList.add(part2); - partNumber = 3; - Part part3 = - uploadPartWithCopy(KEY, uploadID, partNumber, - OzoneConsts.S3_BUCKET + "/" + EXISTING_KEY, - "bytes=" + RANGE_FROM + "-" + RANGE_TO); - partsList.add(part3); - // complete multipart upload CompleteMultipartUploadRequest completeMultipartUploadRequest = new CompleteMultipartUploadRequest(); @@ -130,8 +123,7 @@ public void testMultipart() throws Exception { OzoneConsts.S3_BUCKET); try (InputStream is = bucket.readKey(KEY)) { String keyContent = new Scanner(is).useDelimiter("\\A").next(); - Assert.assertEquals(content + EXISTING_KEY_CONTENT + EXISTING_KEY_CONTENT - .substring(RANGE_FROM, RANGE_TO), keyContent); + Assert.assertEquals(content + EXISTING_KEY_CONTENT, keyContent); } }