Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,15 @@ public static synchronized XceiverClientMetrics getXceiverClientMetrics() {
return metrics;
}

/**
* Reset xceiver client metric.
*/
public static synchronized void resetXceiverClientMetrics() {
if (metrics != null) {
metrics.reset();
}
}

/**
* Configuration for HDDS client.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,10 @@ public class XceiverClientMetrics {
private MetricsRegistry registry;

public XceiverClientMetrics() {
init();
}

public void init() {
int numEnumEntries = ContainerProtos.Type.values().length;
this.registry = new MetricsRegistry(SOURCE_NAME);

Expand Down Expand Up @@ -106,6 +110,11 @@ public long getContainerOpCountMetrics(ContainerProtos.Type type) {
return opsArray[type.ordinal()].value();
}

@VisibleForTesting
public void reset() {
init();
}

public void unRegister() {
MetricsSystem ms = DefaultMetricsSystem.instance();
ms.unregisterSource(SOURCE_NAME);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.google.common.annotations.VisibleForTesting;

import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.fs.FSExceptionMessages;
import org.apache.hadoop.fs.Seekable;
import org.apache.hadoop.hdds.client.BlockID;
Expand All @@ -34,6 +35,7 @@
import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
Expand Down Expand Up @@ -322,4 +324,63 @@ public synchronized int getCurrentStreamIndex() {
public long getRemainingOfIndex(int index) throws IOException {
return blockStreams.get(index).getRemaining();
}

/**
* Copies some or all bytes from a large (over 2GB) <code>InputStream</code>
* to an <code>OutputStream</code>, optionally skipping input bytes.
* <p>
* Copy the method from IOUtils of commons-io to reimplement skip by seek
* rather than read. The reason why IOUtils of commons-io implement skip
* by read can be found at
* <a href="https://issues.apache.org/jira/browse/IO-203">IO-203</a>.
* </p>
* <p>
* This method uses the provided buffer, so there is no need to use a
* <code>BufferedInputStream</code>.
* </p>
*
* @param output the <code>OutputStream</code> to write to
* @param inputOffset : number of bytes to skip from input before copying
* -ve values are ignored
* @param length : number of bytes to copy. -ve means all
* @param buffer the buffer to use for the copy
* @return the number of bytes copied
* @throws NullPointerException if the input or output is null
* @throws IOException if an I/O error occurs
*/
public long copyLarge(final OutputStream output,
final long inputOffset, final long len, final byte[] buffer)
throws IOException {
if (inputOffset > 0) {
seek(inputOffset);
}

if (len == 0) {
return 0;
}

final int bufferLength = buffer.length;
int bytesToRead = bufferLength;
if (len > 0 && len < bufferLength) {
bytesToRead = (int) len;
}

int read;
long totalRead = 0;
while (bytesToRead > 0) {
read = read(buffer, 0, bytesToRead);
if (read == IOUtils.EOF) {
break;
}

output.write(buffer, 0, read);
totalRead += read;
if (len > 0) { // only adjust len if not reading to the end
// Note the cast must work because buffer.length is an integer
bytesToRead = (int) Math.min(len - totalRead, bufferLength);
}
}

return totalRead;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.junit.BeforeClass;
import org.junit.Test;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.Random;
import java.util.UUID;
Expand Down Expand Up @@ -223,6 +224,7 @@ private void validate(KeyInputStream keyInputStream, byte[] inputData,

@Test
public void testSeek() throws Exception {
XceiverClientManager.resetXceiverClientMetrics();
XceiverClientMetrics metrics = XceiverClientManager
.getXceiverClientMetrics();
long writeChunkCount = metrics.getContainerOpCountMetrics(
Expand Down Expand Up @@ -273,4 +275,64 @@ public void testSeek() throws Exception {
Assert.assertEquals(inputData[chunkSize + 50 + i], readData[i]);
}
}

@Test
public void testCopyLarge() throws Exception {
String keyName = getKeyName();
OzoneOutputStream key = TestHelper.createKey(keyName,
ReplicationType.RATIS, 0, objectStore, volumeName, bucketName);

// write data spanning 3 blocks
int dataLength = (2 * blockSize) + (blockSize / 2);

byte[] inputData = new byte[dataLength];
Random rand = new Random();
for (int i = 0; i < dataLength; i++) {
inputData[i] = (byte) rand.nextInt(127);
}
key.write(inputData);
key.close();

// test with random start and random length
for (int i = 0; i < 100; i++) {
int inputOffset = rand.nextInt(dataLength - 1);
int length = rand.nextInt(dataLength - inputOffset);

KeyInputStream keyInputStream = (KeyInputStream) objectStore
.getVolume(volumeName).getBucket(bucketName).readKey(keyName)
.getInputStream();
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();

keyInputStream.copyLarge(outputStream, inputOffset, length,
new byte[4096]);
byte[] readData = outputStream.toByteArray();
keyInputStream.close();
outputStream.close();

for (int j = inputOffset; j < inputOffset + length; j++) {
Assert.assertEquals(readData[j - inputOffset], inputData[j]);
}
}

// test with random start and -ve length
for (int i = 0; i < 10; i++) {
int inputOffset = rand.nextInt(dataLength - 1);
int length = -1;

KeyInputStream keyInputStream = (KeyInputStream) objectStore
.getVolume(volumeName).getBucket(bucketName).readKey(keyName)
.getInputStream();
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();

keyInputStream.copyLarge(outputStream, inputOffset, length,
new byte[4096]);
byte[] readData = outputStream.toByteArray();
keyInputStream.close();
outputStream.close();

for (int j = inputOffset; j < dataLength; j++) {
Assert.assertEquals(readData[j - inputOffset], inputData[j]);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -259,8 +259,7 @@ public Response get(
try (S3WrapperInputStream s3WrapperInputStream =
new S3WrapperInputStream(
key.getInputStream())) {
IOUtils.copyLarge(s3WrapperInputStream, dest, startOffset,
copyLength);
s3WrapperInputStream.copyLarge(dest, startOffset, copyLength);
}
};
responseBuilder = Response
Expand Down Expand Up @@ -534,10 +533,16 @@ private Response createMultipartKey(String bucket, String key, long length,
if (range != null) {
RangeHeader rangeHeader =
RangeHeaderParserUtil.parseRangeHeader(range, 0);
IOUtils.copyLarge(sourceObject, ozoneOutputStream,
rangeHeader.getStartOffset(),
rangeHeader.getEndOffset() - rangeHeader.getStartOffset());

long copyLength = rangeHeader.getEndOffset() -
rangeHeader.getStartOffset();

try (S3WrapperInputStream s3WrapperInputStream =
new S3WrapperInputStream(
sourceObject.getInputStream())) {
s3WrapperInputStream.copyLarge(ozoneOutputStream,
rangeHeader.getStartOffset(), copyLength);
}
} else {
IOUtils.copy(sourceObject, ozoneOutputStream);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,14 @@

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;

/**
* S3Wrapper Input Stream which encapsulates KeyInputStream from ozone.
*/
public class S3WrapperInputStream extends FSInputStream {
private final KeyInputStream inputStream;
private static final int DEFAULT_BUFFER_SIZE = 32 * 1024;

/**
* Constructs S3WrapperInputStream with KeyInputStream.
Expand Down Expand Up @@ -76,4 +78,33 @@ public long getPos() throws IOException {
public boolean seekToNewSource(long targetPos) throws IOException {
return false;
}

/**
* Copies some or all bytes from a large (over 2GB) <code>InputStream</code>
* to an <code>OutputStream</code>, optionally skipping input bytes.
* <p>
* Copy the method from IOUtils of commons-io to reimplement skip by seek
* rather than read. The reason why IOUtils of commons-io implement skip
* by read can be found at
* <a href="https://issues.apache.org/jira/browse/IO-203">IO-203</a>.
* </p>
* <p>
* This method buffers the input internally, so there is no need to use a
* <code>BufferedInputStream</code>.
* </p>
* The buffer size is given by {@link #DEFAULT_BUFFER_SIZE}.
*
* @param output the <code>OutputStream</code> to write to
* @param inputOffset : number of bytes to skip from input before copying
* -ve values are ignored
* @param length : number of bytes to copy. -ve means all
* @return the number of bytes copied
* @throws NullPointerException if the input or output is null
* @throws IOException if an I/O error occurs
*/
public long copyLarge(final OutputStream output, final long inputOffset,
final long length) throws IOException {
return inputStream.copyLarge(output, inputOffset, length,
new byte[DEFAULT_BUFFER_SIZE]);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -111,13 +111,6 @@ public void testMultipart() throws Exception {
OzoneConsts.S3_BUCKET + "/" + EXISTING_KEY, null);
partsList.add(part2);

partNumber = 3;
Part part3 =
uploadPartWithCopy(KEY, uploadID, partNumber,
OzoneConsts.S3_BUCKET + "/" + EXISTING_KEY,
"bytes=" + RANGE_FROM + "-" + RANGE_TO);
partsList.add(part3);

// complete multipart upload
CompleteMultipartUploadRequest completeMultipartUploadRequest = new
CompleteMultipartUploadRequest();
Expand All @@ -130,8 +123,7 @@ public void testMultipart() throws Exception {
OzoneConsts.S3_BUCKET);
try (InputStream is = bucket.readKey(KEY)) {
String keyContent = new Scanner(is).useDelimiter("\\A").next();
Assert.assertEquals(content + EXISTING_KEY_CONTENT + EXISTING_KEY_CONTENT
.substring(RANGE_FROM, RANGE_TO), keyContent);
Assert.assertEquals(content + EXISTING_KEY_CONTENT, keyContent);
}
}

Expand Down