InputStream
- * to an OutputStream, optionally skipping input bytes.
- * - * Copy the method from IOUtils of commons-io to reimplement skip by seek - * rather than read. The reason why IOUtils of commons-io implement skip - * by read can be found at - * IO-203. - *
- *
- * This method uses the provided buffer, so there is no need to use a
- * BufferedInputStream.
- *
OutputStream to write to
- * @param inputOffset : number of bytes to skip from input before copying
- * -ve values are ignored
- * @param length : number of bytes to copy. -ve means all
- * @param buffer the buffer to use for the copy
- * @return the number of bytes copied
- * @throws NullPointerException if the input or output is null
- * @throws IOException if an I/O error occurs
- */
- public long copyLarge(final OutputStream output,
- final long inputOffset, final long len, final byte[] buffer)
- throws IOException {
- if (inputOffset > 0) {
- seek(inputOffset);
- }
-
- if (len == 0) {
+ @Override
+ public long skip(long n) throws IOException {
+ if (n <= 0) {
return 0;
}
- final int bufferLength = buffer.length;
- int bytesToRead = bufferLength;
- if (len > 0 && len < bufferLength) {
- bytesToRead = (int) len;
- }
-
- int read;
- long totalRead = 0;
- while (bytesToRead > 0) {
- read = read(buffer, 0, bytesToRead);
- if (read == IOUtils.EOF) {
- break;
- }
-
- output.write(buffer, 0, read);
- totalRead += read;
- if (len > 0) { // only adjust len if not reading to the end
- // Note the cast must work because buffer.length is an integer
- bytesToRead = (int) Math.min(len - totalRead, bufferLength);
- }
- }
-
- return totalRead;
+ long toSkip = Math.min(n, length - getPos());
+ seek(getPos() + toSkip);
+ return toSkip;
}
}
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestKeyInputStream.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestKeyInputStream.java
index 8ab176d6a35e..7775bb7def3c 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestKeyInputStream.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestKeyInputStream.java
@@ -43,7 +43,6 @@
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
-import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
@@ -294,66 +293,6 @@ public void testSeek() throws Exception {
}
}
- @Test
- public void testCopyLarge() throws Exception {
- String keyName = getKeyName();
- OzoneOutputStream key = TestHelper.createKey(keyName,
- ReplicationType.RATIS, 0, objectStore, volumeName, bucketName);
-
- // write data spanning 3 blocks
- int dataLength = (2 * blockSize) + (blockSize / 2);
-
- byte[] inputData = new byte[dataLength];
- Random rand = new Random();
- for (int i = 0; i < dataLength; i++) {
- inputData[i] = (byte) rand.nextInt(127);
- }
- key.write(inputData);
- key.close();
-
- // test with random start and random length
- for (int i = 0; i < 100; i++) {
- int inputOffset = rand.nextInt(dataLength - 1);
- int length = rand.nextInt(dataLength - inputOffset);
-
- KeyInputStream keyInputStream = (KeyInputStream) objectStore
- .getVolume(volumeName).getBucket(bucketName).readKey(keyName)
- .getInputStream();
- ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
-
- keyInputStream.copyLarge(outputStream, inputOffset, length,
- new byte[4096]);
- byte[] readData = outputStream.toByteArray();
- keyInputStream.close();
- outputStream.close();
-
- for (int j = inputOffset; j < inputOffset + length; j++) {
- Assert.assertEquals(readData[j - inputOffset], inputData[j]);
- }
- }
-
- // test with random start and -ve length
- for (int i = 0; i < 10; i++) {
- int inputOffset = rand.nextInt(dataLength - 1);
- int length = -1;
-
- KeyInputStream keyInputStream = (KeyInputStream) objectStore
- .getVolume(volumeName).getBucket(bucketName).readKey(keyName)
- .getInputStream();
- ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
-
- keyInputStream.copyLarge(outputStream, inputOffset, length,
- new byte[4096]);
- byte[] readData = outputStream.toByteArray();
- keyInputStream.close();
- outputStream.close();
-
- for (int j = inputOffset; j < dataLength; j++) {
- Assert.assertEquals(readData[j - inputOffset], inputData[j]);
- }
- }
- }
-
@Test
public void testReadChunk() throws Exception {
String keyName = getKeyName();
@@ -395,4 +334,62 @@ public void testReadChunk() throws Exception {
}
keyInputStream.close();
}
+
+ @Test
+ public void testSkip() throws Exception {
+ XceiverClientManager.resetXceiverClientMetrics();
+ XceiverClientMetrics metrics = XceiverClientManager
+ .getXceiverClientMetrics();
+ long writeChunkCount = metrics.getContainerOpCountMetrics(
+ ContainerProtos.Type.WriteChunk);
+ long readChunkCount = metrics.getContainerOpCountMetrics(
+ ContainerProtos.Type.ReadChunk);
+
+ String keyName = getKeyName();
+ OzoneOutputStream key = TestHelper.createKey(keyName,
+ ReplicationType.RATIS, 0, objectStore, volumeName, bucketName);
+
+ // write data spanning 3 chunks
+ int dataLength = (2 * chunkSize) + (chunkSize / 2);
+ byte[] inputData = ContainerTestHelper.getFixedLengthString(
+ keyString, dataLength).getBytes(UTF_8);
+ key.write(inputData);
+ key.close();
+
+ Assert.assertEquals(writeChunkCount + 3,
+ metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk));
+
+ KeyInputStream keyInputStream = (KeyInputStream) objectStore
+ .getVolume(volumeName).getBucket(bucketName).readKey(keyName)
+ .getInputStream();
+
+ // skip 150
+ keyInputStream.skip(70);
+ Assert.assertEquals(70, keyInputStream.getPos());
+ keyInputStream.skip(0);
+ Assert.assertEquals(70, keyInputStream.getPos());
+ keyInputStream.skip(80);
+
+ Assert.assertEquals(150, keyInputStream.getPos());
+
+ // Skip operation should not result in any readChunk operation.
+ Assert.assertEquals(readChunkCount, metrics
+ .getContainerOpCountMetrics(ContainerProtos.Type.ReadChunk));
+
+ byte[] readData = new byte[chunkSize];
+ keyInputStream.read(readData, 0, chunkSize);
+
+ // Since we reading data from index 150 to 250 and the chunk boundary is
+ // 100 bytes, we need to read 2 chunks.
+ Assert.assertEquals(readChunkCount + 2,
+ metrics.getContainerOpCountMetrics(ContainerProtos.Type.ReadChunk));
+
+ keyInputStream.close();
+
+ // Verify that the data read matches with the input data at corresponding
+ // indices.
+ for (int i = 0; i < chunkSize; i++) {
+ Assert.assertEquals(inputData[chunkSize + 50 + i], readData[i]);
+ }
+ }
}
diff --git a/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/S3GatewayConfigKeys.java b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/S3GatewayConfigKeys.java
index fae1c823ca7e..5acf36876146 100644
--- a/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/S3GatewayConfigKeys.java
+++ b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/S3GatewayConfigKeys.java
@@ -52,6 +52,12 @@ public final class S3GatewayConfigKeys {
OZONE_S3G_HTTP_AUTH_CONFIG_PREFIX + "kerberos.keytab";
public static final String OZONE_S3G_WEB_AUTHENTICATION_KERBEROS_PRINCIPAL =
OZONE_S3G_HTTP_AUTH_CONFIG_PREFIX + "kerberos.principal";
+
+ public static final String OZONE_S3G_CLIENT_BUFFER_SIZE_KEY =
+ "ozone.s3g.client.buffer.size";
+ public static final String OZONE_S3G_CLIENT_BUFFER_SIZE_DEFAULT =
+ "4KB";
+
/**
* Never constructed.
*/
diff --git a/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpoint.java b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpoint.java
index 6f0ea57e8623..5502173dc790 100644
--- a/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpoint.java
+++ b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpoint.java
@@ -17,6 +17,8 @@
*/
package org.apache.hadoop.ozone.s3.endpoint;
+import javax.annotation.PostConstruct;
+import javax.inject.Inject;
import javax.ws.rs.Consumes;
import javax.ws.rs.DELETE;
import javax.ws.rs.DefaultValue;
@@ -49,6 +51,8 @@
import org.apache.hadoop.hdds.client.ReplicationFactor;
import org.apache.hadoop.hdds.client.ReplicationType;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.conf.StorageUnit;
import org.apache.hadoop.ozone.client.OzoneBucket;
import org.apache.hadoop.ozone.client.OzoneKeyDetails;
import org.apache.hadoop.ozone.client.OzoneMultipartUploadPartListParts;
@@ -77,6 +81,9 @@
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.tuple.Pair;
+
+import static org.apache.hadoop.ozone.s3.S3GatewayConfigKeys.OZONE_S3G_CLIENT_BUFFER_SIZE_DEFAULT;
+import static org.apache.hadoop.ozone.s3.S3GatewayConfigKeys.OZONE_S3G_CLIENT_BUFFER_SIZE_KEY;
import static org.apache.hadoop.ozone.s3.exception.S3ErrorTable.ENTITY_TOO_SMALL;
import static org.apache.hadoop.ozone.s3.exception.S3ErrorTable.INVALID_REQUEST;
import static org.apache.hadoop.ozone.s3.exception.S3ErrorTable.NO_SUCH_UPLOAD;
@@ -104,6 +111,7 @@ public class ObjectEndpoint extends EndpointBase {
private HttpHeaders headers;
private List
@@ -259,7 +277,8 @@ public Response get(
try (S3WrapperInputStream s3WrapperInputStream =
new S3WrapperInputStream(
key.getInputStream())) {
- s3WrapperInputStream.copyLarge(dest, startOffset, copyLength);
+ IOUtils.copyLarge(s3WrapperInputStream, dest, startOffset,
+ copyLength, new byte[bufferSize]);
}
};
responseBuilder = Response
@@ -400,7 +419,6 @@ public Response delete(
return Response
.status(Status.NO_CONTENT)
.build();
-
}
/**
@@ -539,16 +557,9 @@ private Response createMultipartKey(String bucket, String key, long length,
if (range != null) {
RangeHeader rangeHeader =
RangeHeaderParserUtil.parseRangeHeader(range, 0);
-
- long copyLength = rangeHeader.getEndOffset() -
- rangeHeader.getStartOffset();
-
- try (S3WrapperInputStream s3WrapperInputStream =
- new S3WrapperInputStream(
- sourceObject.getInputStream())) {
- s3WrapperInputStream.copyLarge(ozoneOutputStream,
- rangeHeader.getStartOffset(), copyLength);
- }
+ IOUtils.copyLarge(sourceObject, ozoneOutputStream,
+ rangeHeader.getStartOffset(),
+ rangeHeader.getEndOffset() - rangeHeader.getStartOffset());
} else {
IOUtils.copy(sourceObject, ozoneOutputStream);
}
@@ -578,7 +589,6 @@ private Response createMultipartKey(String bucket, String key, long length,
}
throw ex;
}
-
}
/**
diff --git a/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/io/S3WrapperInputStream.java b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/io/S3WrapperInputStream.java
index edf90edd9a3e..d88287c4edbb 100644
--- a/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/io/S3WrapperInputStream.java
+++ b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/io/S3WrapperInputStream.java
@@ -23,14 +23,12 @@
import java.io.IOException;
import java.io.InputStream;
-import java.io.OutputStream;
/**
* S3Wrapper Input Stream which encapsulates KeyInputStream from ozone.
*/
public class S3WrapperInputStream extends FSInputStream {
private final KeyInputStream inputStream;
- private static final int DEFAULT_BUFFER_SIZE = 32 * 1024;
/**
* Constructs S3WrapperInputStream with KeyInputStream.
@@ -75,36 +73,12 @@ public long getPos() throws IOException {
}
@Override
- public boolean seekToNewSource(long targetPos) throws IOException {
- return false;
+ public long skip(long n) throws IOException {
+ return inputStream.skip(n);
}
- /**
- * Copies some or all bytes from a large (over 2GB) InputStream
- * to an OutputStream, optionally skipping input bytes.
- *
- * Copy the method from IOUtils of commons-io to reimplement skip by seek - * rather than read. The reason why IOUtils of commons-io implement skip - * by read can be found at - * IO-203. - *
- *
- * This method buffers the input internally, so there is no need to use a
- * BufferedInputStream.
- *
OutputStream to write to
- * @param inputOffset : number of bytes to skip from input before copying
- * -ve values are ignored
- * @param length : number of bytes to copy. -ve means all
- * @return the number of bytes copied
- * @throws NullPointerException if the input or output is null
- * @throws IOException if an I/O error occurs
- */
- public long copyLarge(final OutputStream output, final long inputOffset,
- final long length) throws IOException {
- return inputStream.copyLarge(output, inputOffset, length,
- new byte[DEFAULT_BUFFER_SIZE]);
+ @Override
+ public boolean seekToNewSource(long targetPos) throws IOException {
+ return false;
}
}