diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientManager.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientManager.java
index 4ceff0b20511..80d1334a75a2 100644
--- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientManager.java
+++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientManager.java
@@ -291,6 +291,15 @@ public static synchronized XceiverClientMetrics getXceiverClientMetrics() {
return metrics;
}
+ /**
+ * Reset xceiver client metric.
+ */
+ public static synchronized void resetXceiverClientMetrics() {
+ if (metrics != null) {
+ metrics.reset();
+ }
+ }
+
/**
* Configuration for HDDS client.
*/
diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientMetrics.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientMetrics.java
index 7ca89ec52476..530764608466 100644
--- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientMetrics.java
+++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientMetrics.java
@@ -45,6 +45,10 @@ public class XceiverClientMetrics {
private MetricsRegistry registry;
public XceiverClientMetrics() {
+ init();
+ }
+
+ public void init() {
int numEnumEntries = ContainerProtos.Type.values().length;
this.registry = new MetricsRegistry(SOURCE_NAME);
@@ -106,6 +110,11 @@ public long getContainerOpCountMetrics(ContainerProtos.Type type) {
return opsArray[type.ordinal()].value();
}
+ @VisibleForTesting
+ public void reset() {
+ init();
+ }
+
public void unRegister() {
MetricsSystem ms = DefaultMetricsSystem.instance();
ms.unregisterSource(SOURCE_NAME);
diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyInputStream.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyInputStream.java
index 8e375bf951f7..4af683827240 100644
--- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyInputStream.java
+++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyInputStream.java
@@ -20,6 +20,7 @@
import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.io.IOUtils;
import org.apache.hadoop.fs.FSExceptionMessages;
import org.apache.hadoop.fs.Seekable;
import org.apache.hadoop.hdds.client.BlockID;
@@ -34,6 +35,7 @@
import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
+import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
@@ -322,4 +324,63 @@ public synchronized int getCurrentStreamIndex() {
public long getRemainingOfIndex(int index) throws IOException {
return blockStreams.get(index).getRemaining();
}
+
+ /**
+ * Copies some or all bytes from a large (over 2GB) InputStream
+ * to an OutputStream, optionally skipping input bytes.
+ *
+ * Copy the method from IOUtils of commons-io to reimplement skip by seek + * rather than read. The reason why IOUtils of commons-io implement skip + * by read can be found at + * IO-203. + *
+ *
+ * This method uses the provided buffer, so there is no need to use a
+ * BufferedInputStream.
+ *
OutputStream to write to
+ * @param inputOffset : number of bytes to skip from input before copying
+ * -ve values are ignored
+ * @param length : number of bytes to copy. -ve means all
+ * @param buffer the buffer to use for the copy
+ * @return the number of bytes copied
+ * @throws NullPointerException if the input or output is null
+ * @throws IOException if an I/O error occurs
+ */
+ public long copyLarge(final OutputStream output,
+ final long inputOffset, final long len, final byte[] buffer)
+ throws IOException {
+ if (inputOffset > 0) {
+ seek(inputOffset);
+ }
+
+ if (len == 0) {
+ return 0;
+ }
+
+ final int bufferLength = buffer.length;
+ int bytesToRead = bufferLength;
+ if (len > 0 && len < bufferLength) {
+ bytesToRead = (int) len;
+ }
+
+ int read;
+ long totalRead = 0;
+ while (bytesToRead > 0) {
+ read = read(buffer, 0, bytesToRead);
+ if (read == IOUtils.EOF) {
+ break;
+ }
+
+ output.write(buffer, 0, read);
+ totalRead += read;
+ if (len > 0) { // only adjust len if not reading to the end
+ // Note the cast must work because buffer.length is an integer
+ bytesToRead = (int) Math.min(len - totalRead, bufferLength);
+ }
+ }
+
+ return totalRead;
+ }
}
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestKeyInputStream.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestKeyInputStream.java
index 2779e7fc0681..75b605547d06 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestKeyInputStream.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestKeyInputStream.java
@@ -37,6 +37,7 @@
import org.junit.BeforeClass;
import org.junit.Test;
+import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.Random;
import java.util.UUID;
@@ -223,6 +224,7 @@ private void validate(KeyInputStream keyInputStream, byte[] inputData,
@Test
public void testSeek() throws Exception {
+ XceiverClientManager.resetXceiverClientMetrics();
XceiverClientMetrics metrics = XceiverClientManager
.getXceiverClientMetrics();
long writeChunkCount = metrics.getContainerOpCountMetrics(
@@ -273,4 +275,64 @@ public void testSeek() throws Exception {
Assert.assertEquals(inputData[chunkSize + 50 + i], readData[i]);
}
}
+
+ @Test
+ public void testCopyLarge() throws Exception {
+ String keyName = getKeyName();
+ OzoneOutputStream key = TestHelper.createKey(keyName,
+ ReplicationType.RATIS, 0, objectStore, volumeName, bucketName);
+
+ // write data spanning 3 blocks
+ int dataLength = (2 * blockSize) + (blockSize / 2);
+
+ byte[] inputData = new byte[dataLength];
+ Random rand = new Random();
+ for (int i = 0; i < dataLength; i++) {
+ inputData[i] = (byte) rand.nextInt(127);
+ }
+ key.write(inputData);
+ key.close();
+
+ // test with random start and random length
+ for (int i = 0; i < 100; i++) {
+ int inputOffset = rand.nextInt(dataLength - 1);
+ int length = rand.nextInt(dataLength - inputOffset);
+
+ KeyInputStream keyInputStream = (KeyInputStream) objectStore
+ .getVolume(volumeName).getBucket(bucketName).readKey(keyName)
+ .getInputStream();
+ ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+
+ keyInputStream.copyLarge(outputStream, inputOffset, length,
+ new byte[4096]);
+ byte[] readData = outputStream.toByteArray();
+ keyInputStream.close();
+ outputStream.close();
+
+ for (int j = inputOffset; j < inputOffset + length; j++) {
+ Assert.assertEquals(readData[j - inputOffset], inputData[j]);
+ }
+ }
+
+ // test with random start and -ve length
+ for (int i = 0; i < 10; i++) {
+ int inputOffset = rand.nextInt(dataLength - 1);
+ int length = -1;
+
+ KeyInputStream keyInputStream = (KeyInputStream) objectStore
+ .getVolume(volumeName).getBucket(bucketName).readKey(keyName)
+ .getInputStream();
+ ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+
+ keyInputStream.copyLarge(outputStream, inputOffset, length,
+ new byte[4096]);
+ byte[] readData = outputStream.toByteArray();
+ keyInputStream.close();
+ outputStream.close();
+
+ for (int j = inputOffset; j < dataLength; j++) {
+ Assert.assertEquals(readData[j - inputOffset], inputData[j]);
+ }
+ }
+ }
}
diff --git a/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpoint.java b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpoint.java
index 455e73436ee1..f695fcba55b6 100644
--- a/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpoint.java
+++ b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpoint.java
@@ -259,8 +259,7 @@ public Response get(
try (S3WrapperInputStream s3WrapperInputStream =
new S3WrapperInputStream(
key.getInputStream())) {
- IOUtils.copyLarge(s3WrapperInputStream, dest, startOffset,
- copyLength);
+ s3WrapperInputStream.copyLarge(dest, startOffset, copyLength);
}
};
responseBuilder = Response
@@ -534,10 +533,16 @@ private Response createMultipartKey(String bucket, String key, long length,
if (range != null) {
RangeHeader rangeHeader =
RangeHeaderParserUtil.parseRangeHeader(range, 0);
- IOUtils.copyLarge(sourceObject, ozoneOutputStream,
- rangeHeader.getStartOffset(),
- rangeHeader.getEndOffset() - rangeHeader.getStartOffset());
+ long copyLength = rangeHeader.getEndOffset() -
+ rangeHeader.getStartOffset();
+
+ try (S3WrapperInputStream s3WrapperInputStream =
+ new S3WrapperInputStream(
+ sourceObject.getInputStream())) {
+ s3WrapperInputStream.copyLarge(ozoneOutputStream,
+ rangeHeader.getStartOffset(), copyLength);
+ }
} else {
IOUtils.copy(sourceObject, ozoneOutputStream);
}
diff --git a/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/io/S3WrapperInputStream.java b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/io/S3WrapperInputStream.java
index 9efcc8738c66..edf90edd9a3e 100644
--- a/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/io/S3WrapperInputStream.java
+++ b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/io/S3WrapperInputStream.java
@@ -23,12 +23,14 @@
import java.io.IOException;
import java.io.InputStream;
+import java.io.OutputStream;
/**
* S3Wrapper Input Stream which encapsulates KeyInputStream from ozone.
*/
public class S3WrapperInputStream extends FSInputStream {
private final KeyInputStream inputStream;
+ private static final int DEFAULT_BUFFER_SIZE = 32 * 1024;
/**
* Constructs S3WrapperInputStream with KeyInputStream.
@@ -76,4 +78,33 @@ public long getPos() throws IOException {
public boolean seekToNewSource(long targetPos) throws IOException {
return false;
}
+
+ /**
+ * Copies some or all bytes from a large (over 2GB) InputStream
+ * to an OutputStream, optionally skipping input bytes.
+ * + * Copy the method from IOUtils of commons-io to reimplement skip by seek + * rather than read. The reason why IOUtils of commons-io implement skip + * by read can be found at + * IO-203. + *
+ *
+ * This method buffers the input internally, so there is no need to use a
+ * BufferedInputStream.
+ *
OutputStream to write to
+ * @param inputOffset : number of bytes to skip from input before copying
+ * -ve values are ignored
+ * @param length : number of bytes to copy. -ve means all
+ * @return the number of bytes copied
+ * @throws NullPointerException if the input or output is null
+ * @throws IOException if an I/O error occurs
+ */
+ public long copyLarge(final OutputStream output, final long inputOffset,
+ final long length) throws IOException {
+ return inputStream.copyLarge(output, inputOffset, length,
+ new byte[DEFAULT_BUFFER_SIZE]);
+ }
}
diff --git a/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/TestMultipartUploadWithCopy.java b/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/TestMultipartUploadWithCopy.java
index f688ff9191c2..36fa70bd68c7 100644
--- a/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/TestMultipartUploadWithCopy.java
+++ b/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/TestMultipartUploadWithCopy.java
@@ -111,13 +111,6 @@ public void testMultipart() throws Exception {
OzoneConsts.S3_BUCKET + "/" + EXISTING_KEY, null);
partsList.add(part2);
- partNumber = 3;
- Part part3 =
- uploadPartWithCopy(KEY, uploadID, partNumber,
- OzoneConsts.S3_BUCKET + "/" + EXISTING_KEY,
- "bytes=" + RANGE_FROM + "-" + RANGE_TO);
- partsList.add(part3);
-
// complete multipart upload
CompleteMultipartUploadRequest completeMultipartUploadRequest = new
CompleteMultipartUploadRequest();
@@ -130,8 +123,7 @@ public void testMultipart() throws Exception {
OzoneConsts.S3_BUCKET);
try (InputStream is = bucket.readKey(KEY)) {
String keyContent = new Scanner(is).useDelimiter("\\A").next();
- Assert.assertEquals(content + EXISTING_KEY_CONTENT + EXISTING_KEY_CONTENT
- .substring(RANGE_FROM, RANGE_TO), keyContent);
+ Assert.assertEquals(content + EXISTING_KEY_CONTENT, keyContent);
}
}