Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
package software.amazon.s3.analyticsaccelerator;

import java.io.EOFException;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.List;
Expand Down Expand Up @@ -234,6 +235,41 @@ public void readVectored(
logicalIO.readVectored(ranges, allocate);
}

/**
* Fill the provided buffer with the contents of the input source starting at {@code position} for
* the given {@code offset} and {@code length}.
*
* @param position start position of the read
* @param buffer target buffer to copy data
* @param offset offset in the buffer to copy the data
* @param length size of the read
* @throws IOException if an I/O error occurs
*/
public void readFully(long position, byte[] buffer, int offset, int length) throws IOException {
throwIfClosed("cannot read from closed stream");
validatePositionedReadArgs(position, buffer, offset, length);

if (length == 0) {
Comment thread
SanjayMarreddi marked this conversation as resolved.
return;
}

this.telemetry.measureVerbose(
() ->
Operation.builder()
.name(OPERATION_READ)
.attribute(StreamAttributes.uri(this.s3URI))
.attribute(StreamAttributes.etag(this.logicalIO.metadata().getEtag()))
.attribute(StreamAttributes.range(position, position + length - 1))
.build(),
() -> {
int bytesRead = this.logicalIO.read(buffer, offset, length, position);
if (bytesRead < length) {
throw new EOFException(
"Reached the end of stream with " + (length - bytesRead) + " bytes left to read");
}
});
}

/**
* Releases all resources associated with the {@link S3SeekableInputStream}.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,19 @@ public abstract void readVectored(
Consumer<ByteBuffer> release)
throws IOException;

/**
* Fill the provided buffer with the contents of the input source starting at {@code position} for
* the given {@code offset} and {@code length}.
*
* @param position start position of the read
* @param buffer target buffer to copy data
* @param offset offset in the buffer to copy the data
* @param length size of the read
* @throws IOException if an I/O error occurs
*/
public abstract void readFully(long position, byte[] buffer, int offset, int length)
throws IOException;

/**
* Validates the arguments for a read operation. This method is available to use in all subclasses
* to ensure consistency.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
package software.amazon.s3.analyticsaccelerator.model;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.function.Consumer;
Expand Down Expand Up @@ -84,6 +85,28 @@ public void readVectored(
}
}

@Override
public void readFully(long position, byte[] buffer, int offset, int length) throws IOException {
// Save current position of stream
long prevPosition = this.position;
if (position >= this.contentLength) {
throw new IOException("Position is beyond end of stream");
}

data.position((int) position);
int bytesAvailable = this.contentLength - (int) position;
int bytesToRead = Math.min(length, bytesAvailable);
data.get(buffer, offset, bytesToRead);
if (bytesToRead < length) {
throw new IOException(
"Reached the end of stream with " + (length - bytesToRead) + " bytes left to read");
}

// Restore original position
this.position = prevPosition;
data.position((int) this.position);
}

@Override
public int read() {
if (this.position >= this.contentLength) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,11 @@ public void readVectored(
this.delegate.readVectored(ranges, allocate, release);
}

@Override
public void readFully(long position, byte[] buffer, int offset, int length) throws IOException {
this.delegate.readFully(position, buffer, offset, length);
}

@Override
public int read() throws IOException {
return this.delegate.read();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -448,6 +448,99 @@ public void testInsufficientBuffer() throws IOException {
IndexOutOfBoundsException.class, () -> seekableInputStream.readTail(new byte[0], 0, 8), -1);
SpotBugsLambdaWorkaround.assertReadResult(
IndexOutOfBoundsException.class, () -> seekableInputStream.readTail(new byte[0], 0, 8), -1);
assertThrows(
IndexOutOfBoundsException.class, () -> seekableInputStream.readFully(0, new byte[0], 0, 8));
}

@Test
void testReadFullyWithInvalidArgument() throws IOException {
// Given: seekable stream
try (S3SeekableInputStream stream = getTestStream()) {
// When & Then: reading with invalid arguments, exception is thrown
// -1 is invalid position
assertThrows(IllegalArgumentException.class, () -> stream.readFully(-1, new byte[10], 0, 5));
// -1 is invalid length
assertThrows(IllegalArgumentException.class, () -> stream.readFully(0, new byte[10], 0, -1));
// Requesting more data than byte buffer size
assertThrows(IndexOutOfBoundsException.class, () -> stream.readFully(0, new byte[5], 0, 10));
}
}

@Test
void testReadFullyHappyCase() throws IOException {
Comment thread
SanjayMarreddi marked this conversation as resolved.
// Given: seekable stream
try (S3SeekableInputStream stream = getTestStream()) {
// When: reading 5 bytes from position 3
byte[] buf = new byte[5];
stream.readFully(3, buf, 0, 5);

// Then: buffer contains the expected 5 bytes from position 3
byte[] expected = TEST_DATA.substring(3, 8).getBytes(StandardCharsets.UTF_8);
assertArrayEquals(expected, buf);

// Position should remain unchanged after readFully
assertEquals(0, stream.getPos());
}
}

@Test
void testReadFullyDoesNotAlterPosition() throws IOException {
// Given: seekable stream with data "test-data12345678910"
try (S3SeekableInputStream stream = getTestStream()) {
// When:
// 1) Reading first 5 bytes from position 0 (should be "test-")
// 2) Reading 5 bytes from position 10 using readFully (should be "23456")
// 3) Reading next 5 bytes from current position (should be "data1")
byte[] one = new byte[5];
byte[] two = new byte[5];
byte[] three = new byte[5];

int numBytesRead1 = stream.read(one, 0, one.length);
stream.readFully(10, two, 0, two.length);
int numBytesRead3 = stream.read(three, 0, three.length);

// Then: readFully did not alter the position and reads #1 and #3 return subsequent bytes
// First read should return 5 bytes
assertEquals(5, numBytesRead1);
// Third read should also return 5 bytes, continuing from where first read left off
assertEquals(5, numBytesRead3);

// Verify the actual content of each buffer
assertEquals("test-", new String(one, StandardCharsets.UTF_8));
assertEquals("data1", new String(three, StandardCharsets.UTF_8));
assertEquals("23456", new String(two, StandardCharsets.UTF_8));

// Verify the stream position is at 10 (5 + 5) after all reads
assertEquals(10, stream.getPos());
}
}

@Test
public void testReadFullyOnClosedStream() throws IOException {
S3SeekableInputStream seekableInputStream = getTestStream();
seekableInputStream.close();
assertThrows(IOException.class, () -> seekableInputStream.readFully(0, new byte[8], 0, 8));
}

@Test
public void testZeroLengthReadFully() throws IOException {
S3SeekableInputStream seekableInputStream = getTestStream();
assertDoesNotThrow(() -> seekableInputStream.readFully(0, new byte[0], 0, 0));
}

@Test
void testReadFullyThrowsWhenInsufficientBytes() throws IOException {
// Given: seekable stream with TEST_DATA (20 bytes)
try (S3SeekableInputStream stream = getTestStream()) {
// When & Then: trying to read beyond available data should throw IOException
byte[] buffer = new byte[10];

// Try to read 10 bytes starting at position 15 (only 5 bytes available)
assertThrows(IOException.class, () -> stream.readFully(15, buffer, 0, 10));

// Verify stream position remains unchanged after failed readFully
assertEquals(0, stream.getPos());
}
}

private S3SeekableInputStream getTestStream() {
Expand Down
Loading