Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -465,6 +465,7 @@ public final class OzoneConfigKeys {
public static final String OZONE_CLIENT_HTTPS_NEED_AUTH_KEY =
"ozone.https.client.need-auth";
public static final boolean OZONE_CLIENT_HTTPS_NEED_AUTH_DEFAULT = false;

/**
* There is no need to instantiate this class.
*/
Expand Down
8 changes: 8 additions & 0 deletions hadoop-hdds/common/src/main/resources/ozone-default.xml
Original file line number Diff line number Diff line change
Expand Up @@ -2421,6 +2421,14 @@
information will be extracted
</description>
</property>
<property>
<name>ozone.s3g.client.buffer.size</name>
<tag>OZONE, S3GATEWAY</tag>
<value>4KB</value>
<description>
The size of the buffer which is for read block. (4KB by default).
</description>
</property>
<property>
<name>ssl.server.keystore.keypassword</name>
<tag>OZONE, SECURITY, MANAGEMENT</tag>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import com.google.common.annotations.VisibleForTesting;

import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.fs.FSExceptionMessages;
import org.apache.hadoop.fs.Seekable;
import org.apache.hadoop.hdds.client.BlockID;
Expand All @@ -35,7 +34,6 @@
import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
Expand Down Expand Up @@ -325,62 +323,14 @@ public long getRemainingOfIndex(int index) throws IOException {
return blockStreams.get(index).getRemaining();
}

/**
* Copies some or all bytes from a large (over 2GB) <code>InputStream</code>
* to an <code>OutputStream</code>, optionally skipping input bytes.
* <p>
* Copy the method from IOUtils of commons-io to reimplement skip by seek
* rather than read. The reason why IOUtils of commons-io implement skip
* by read can be found at
* <a href="https://issues.apache.org/jira/browse/IO-203">IO-203</a>.
* </p>
* <p>
* This method uses the provided buffer, so there is no need to use a
* <code>BufferedInputStream</code>.
* </p>
*
* @param output the <code>OutputStream</code> to write to
* @param inputOffset : number of bytes to skip from input before copying
* -ve values are ignored
* @param length : number of bytes to copy. -ve means all
* @param buffer the buffer to use for the copy
* @return the number of bytes copied
* @throws NullPointerException if the input or output is null
* @throws IOException if an I/O error occurs
*/
public long copyLarge(final OutputStream output,
final long inputOffset, final long len, final byte[] buffer)
throws IOException {
if (inputOffset > 0) {
seek(inputOffset);
}

if (len == 0) {
@Override
public long skip(long n) throws IOException {
if (n <= 0) {
return 0;
}

final int bufferLength = buffer.length;
int bytesToRead = bufferLength;
if (len > 0 && len < bufferLength) {
bytesToRead = (int) len;
}

int read;
long totalRead = 0;
while (bytesToRead > 0) {
read = read(buffer, 0, bytesToRead);
if (read == IOUtils.EOF) {
break;
}

output.write(buffer, 0, read);
totalRead += read;
if (len > 0) { // only adjust len if not reading to the end
// Note the cast must work because buffer.length is an integer
bytesToRead = (int) Math.min(len - totalRead, bufferLength);
}
}

return totalRead;
long toSkip = Math.min(n, length - getPos());
seek(getPos() + toSkip);
return toSkip;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
Expand Down Expand Up @@ -294,66 +293,6 @@ public void testSeek() throws Exception {
}
}

@Test
public void testCopyLarge() throws Exception {
String keyName = getKeyName();
OzoneOutputStream key = TestHelper.createKey(keyName,
ReplicationType.RATIS, 0, objectStore, volumeName, bucketName);

// write data spanning 3 blocks
int dataLength = (2 * blockSize) + (blockSize / 2);

byte[] inputData = new byte[dataLength];
Random rand = new Random();
for (int i = 0; i < dataLength; i++) {
inputData[i] = (byte) rand.nextInt(127);
}
key.write(inputData);
key.close();

// test with random start and random length
for (int i = 0; i < 100; i++) {
int inputOffset = rand.nextInt(dataLength - 1);
int length = rand.nextInt(dataLength - inputOffset);

KeyInputStream keyInputStream = (KeyInputStream) objectStore
.getVolume(volumeName).getBucket(bucketName).readKey(keyName)
.getInputStream();
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();

keyInputStream.copyLarge(outputStream, inputOffset, length,
new byte[4096]);
byte[] readData = outputStream.toByteArray();
keyInputStream.close();
outputStream.close();

for (int j = inputOffset; j < inputOffset + length; j++) {
Assert.assertEquals(readData[j - inputOffset], inputData[j]);
}
}

// test with random start and -ve length
for (int i = 0; i < 10; i++) {
int inputOffset = rand.nextInt(dataLength - 1);
int length = -1;

KeyInputStream keyInputStream = (KeyInputStream) objectStore
.getVolume(volumeName).getBucket(bucketName).readKey(keyName)
.getInputStream();
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();

keyInputStream.copyLarge(outputStream, inputOffset, length,
new byte[4096]);
byte[] readData = outputStream.toByteArray();
keyInputStream.close();
outputStream.close();

for (int j = inputOffset; j < dataLength; j++) {
Assert.assertEquals(readData[j - inputOffset], inputData[j]);
}
}
}

@Test
public void testReadChunk() throws Exception {
String keyName = getKeyName();
Expand Down Expand Up @@ -395,4 +334,62 @@ public void testReadChunk() throws Exception {
}
keyInputStream.close();
}

@Test
public void testSkip() throws Exception {
XceiverClientManager.resetXceiverClientMetrics();
XceiverClientMetrics metrics = XceiverClientManager
.getXceiverClientMetrics();
long writeChunkCount = metrics.getContainerOpCountMetrics(
ContainerProtos.Type.WriteChunk);
long readChunkCount = metrics.getContainerOpCountMetrics(
ContainerProtos.Type.ReadChunk);

String keyName = getKeyName();
OzoneOutputStream key = TestHelper.createKey(keyName,
ReplicationType.RATIS, 0, objectStore, volumeName, bucketName);

// write data spanning 3 chunks
int dataLength = (2 * chunkSize) + (chunkSize / 2);
byte[] inputData = ContainerTestHelper.getFixedLengthString(
keyString, dataLength).getBytes(UTF_8);
key.write(inputData);
key.close();

Assert.assertEquals(writeChunkCount + 3,
metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk));

KeyInputStream keyInputStream = (KeyInputStream) objectStore
.getVolume(volumeName).getBucket(bucketName).readKey(keyName)
.getInputStream();

// skip 150
keyInputStream.skip(70);
Assert.assertEquals(70, keyInputStream.getPos());
keyInputStream.skip(0);
Assert.assertEquals(70, keyInputStream.getPos());
keyInputStream.skip(80);

Assert.assertEquals(150, keyInputStream.getPos());

// Skip operation should not result in any readChunk operation.
Assert.assertEquals(readChunkCount, metrics
.getContainerOpCountMetrics(ContainerProtos.Type.ReadChunk));

byte[] readData = new byte[chunkSize];
keyInputStream.read(readData, 0, chunkSize);

// Since we reading data from index 150 to 250 and the chunk boundary is
// 100 bytes, we need to read 2 chunks.
Assert.assertEquals(readChunkCount + 2,
metrics.getContainerOpCountMetrics(ContainerProtos.Type.ReadChunk));

keyInputStream.close();

// Verify that the data read matches with the input data at corresponding
// indices.
for (int i = 0; i < chunkSize; i++) {
Assert.assertEquals(inputData[chunkSize + 50 + i], readData[i]);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,12 @@ public final class S3GatewayConfigKeys {
OZONE_S3G_HTTP_AUTH_CONFIG_PREFIX + "kerberos.keytab";
public static final String OZONE_S3G_WEB_AUTHENTICATION_KERBEROS_PRINCIPAL =
OZONE_S3G_HTTP_AUTH_CONFIG_PREFIX + "kerberos.principal";

public static final String OZONE_S3G_CLIENT_BUFFER_SIZE_KEY =
"ozone.s3g.client.buffer.size";
public static final String OZONE_S3G_CLIENT_BUFFER_SIZE_DEFAULT =
"4KB";

/**
* Never constructed.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
*/
package org.apache.hadoop.ozone.s3.endpoint;

import javax.annotation.PostConstruct;
import javax.inject.Inject;
import javax.ws.rs.Consumes;
import javax.ws.rs.DELETE;
import javax.ws.rs.DefaultValue;
Expand Down Expand Up @@ -49,6 +51,8 @@

import org.apache.hadoop.hdds.client.ReplicationFactor;
import org.apache.hadoop.hdds.client.ReplicationType;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.conf.StorageUnit;
import org.apache.hadoop.ozone.client.OzoneBucket;
import org.apache.hadoop.ozone.client.OzoneKeyDetails;
import org.apache.hadoop.ozone.client.OzoneMultipartUploadPartListParts;
Expand Down Expand Up @@ -77,6 +81,9 @@
import org.apache.commons.io.IOUtils;

import org.apache.commons.lang3.tuple.Pair;

import static org.apache.hadoop.ozone.s3.S3GatewayConfigKeys.OZONE_S3G_CLIENT_BUFFER_SIZE_DEFAULT;
import static org.apache.hadoop.ozone.s3.S3GatewayConfigKeys.OZONE_S3G_CLIENT_BUFFER_SIZE_KEY;
import static org.apache.hadoop.ozone.s3.exception.S3ErrorTable.ENTITY_TOO_SMALL;
import static org.apache.hadoop.ozone.s3.exception.S3ErrorTable.INVALID_REQUEST;
import static org.apache.hadoop.ozone.s3.exception.S3ErrorTable.NO_SUCH_UPLOAD;
Expand Down Expand Up @@ -104,6 +111,7 @@ public class ObjectEndpoint extends EndpointBase {
private HttpHeaders headers;

private List<String> customizableGetHeaders = new ArrayList<>();
private int bufferSize;

public ObjectEndpoint() {
customizableGetHeaders.add("Content-Type");
Expand All @@ -114,6 +122,16 @@ public ObjectEndpoint() {
customizableGetHeaders.add("Content-Encoding");
}

@Inject
private OzoneConfiguration ozoneConfiguration;

@PostConstruct
public void init() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NIT: It can be slightly better to use

  @Inject
  private OzoneConfiguration ozoneConfiguration;

And you don't need to expose getClient() in the EndpointBase.

(Didn't try, but it should work, IMHO).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for let me know this high technology

bufferSize = (int) ozoneConfiguration.getStorageSize(
OZONE_S3G_CLIENT_BUFFER_SIZE_KEY,
OZONE_S3G_CLIENT_BUFFER_SIZE_DEFAULT, StorageUnit.BYTES);
}

/**
* Rest endpoint to upload object to a bucket.
* <p>
Expand Down Expand Up @@ -259,7 +277,8 @@ public Response get(
try (S3WrapperInputStream s3WrapperInputStream =
new S3WrapperInputStream(
key.getInputStream())) {
s3WrapperInputStream.copyLarge(dest, startOffset, copyLength);
IOUtils.copyLarge(s3WrapperInputStream, dest, startOffset,
copyLength, new byte[bufferSize]);
}
};
responseBuilder = Response
Expand Down Expand Up @@ -400,7 +419,6 @@ public Response delete(
return Response
.status(Status.NO_CONTENT)
.build();

}

/**
Expand Down Expand Up @@ -539,16 +557,9 @@ private Response createMultipartKey(String bucket, String key, long length,
if (range != null) {
RangeHeader rangeHeader =
RangeHeaderParserUtil.parseRangeHeader(range, 0);

long copyLength = rangeHeader.getEndOffset() -
rangeHeader.getStartOffset();

try (S3WrapperInputStream s3WrapperInputStream =
new S3WrapperInputStream(
sourceObject.getInputStream())) {
s3WrapperInputStream.copyLarge(ozoneOutputStream,
rangeHeader.getStartOffset(), copyLength);
}
IOUtils.copyLarge(sourceObject, ozoneOutputStream,
rangeHeader.getStartOffset(),
rangeHeader.getEndOffset() - rangeHeader.getStartOffset());
} else {
IOUtils.copy(sourceObject, ozoneOutputStream);
}
Expand Down Expand Up @@ -578,7 +589,6 @@ private Response createMultipartKey(String bucket, String key, long length,
}
throw ex;
}

}

/**
Expand Down
Loading