Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ public class PhysicalIOConfiguration {
private static final boolean DEFAULT_SMALL_OBJECTS_PREFETCHING_ENABLED = true;
private static final long DEFAULT_SMALL_OBJECT_SIZE_THRESHOLD = 8 * ONE_MB;
private static final int DEFAULT_THREAD_POOL_SIZE = 96;
private static final long DEFAULT_READ_BUFFER_SIZE = 8 * ONE_KB;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is the default block size right? should we rename this to DEFAULT_BLOCK_SIZE then?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We already have a configuration named DEFAULT_BLOCK_SIZE_BYTES which is not being used actively and defined for a separate purpose. The reason I created a new config is not to expose a config name which is a term to library's internals so I used as read buffer. Open to different name suggestions.


/**
* Capacity, in blobs. {@link PhysicalIOConfiguration#DEFAULT_MEMORY_CAPACITY_BYTES} by default.
Expand Down Expand Up @@ -151,6 +152,9 @@ public class PhysicalIOConfiguration {

@Builder.Default private int threadPoolSize = DEFAULT_THREAD_POOL_SIZE;

private static final String READ_BUFFER_SIZE_KEY = "readbuffersize";
@Builder.Default private long readBufferSize = DEFAULT_READ_BUFFER_SIZE;

/** Default set of settings for {@link PhysicalIO} */
public static final PhysicalIOConfiguration DEFAULT = PhysicalIOConfiguration.builder().build();

Expand Down Expand Up @@ -192,6 +196,7 @@ public static PhysicalIOConfiguration fromConfiguration(ConnectorConfiguration c
configuration.getLong(
SMALL_OBJECT_SIZE_THRESHOLD_KEY, DEFAULT_SMALL_OBJECT_SIZE_THRESHOLD))
.threadPoolSize(configuration.getInt(THREAD_POOL_SIZE_KEY, DEFAULT_THREAD_POOL_SIZE))
.readBufferSize(configuration.getLong(READ_BUFFER_SIZE_KEY, DEFAULT_READ_BUFFER_SIZE))
.build();
}

Expand All @@ -215,6 +220,7 @@ public static PhysicalIOConfiguration fromConfiguration(ConnectorConfiguration c
* @param smallObjectsPrefetchingEnabled Whether small object prefetching is enabled
* @param smallObjectSizeThreshold Maximum size in bytes for an object to be considered small
* @param threadPoolSize Size of thread pool to be used for async operations
* @param readBufferSize Size of the maximum buffer for read operations
*/
@Builder
private PhysicalIOConfiguration(
Expand All @@ -232,7 +238,8 @@ private PhysicalIOConfiguration(
int blockReadRetryCount,
boolean smallObjectsPrefetchingEnabled,
long smallObjectSizeThreshold,
int threadPoolSize) {
int threadPoolSize,
long readBufferSize) {
Preconditions.checkArgument(memoryCapacityBytes > 0, "`memoryCapacityBytes` must be positive");
Preconditions.checkArgument(
memoryCleanupFrequencyMilliseconds > 0,
Expand All @@ -254,6 +261,7 @@ private PhysicalIOConfiguration(
Preconditions.checkArgument(
smallObjectSizeThreshold > 0, "`smallObjectSizeThreshold` must be positive");
Preconditions.checkNotNull(threadPoolSize > 0, "`threadPoolSize` must be positive");
Preconditions.checkArgument(readBufferSize > 0, "`readBufferSize` must be positive");

this.memoryCapacityBytes = memoryCapacityBytes;
this.memoryCleanupFrequencyMilliseconds = memoryCleanupFrequencyMilliseconds;
Expand All @@ -270,6 +278,7 @@ private PhysicalIOConfiguration(
this.smallObjectsPrefetchingEnabled = smallObjectsPrefetchingEnabled;
this.smallObjectSizeThreshold = smallObjectSizeThreshold;
this.threadPoolSize = threadPoolSize;
this.readBufferSize = readBufferSize;
}

@Override
Expand All @@ -293,6 +302,7 @@ public String toString() {
builder.append("\tsmallObjectsPrefetchingEnabled: " + smallObjectsPrefetchingEnabled + "\n");
builder.append("\tsmallObjectSizeThreshold: " + smallObjectSizeThreshold + "\n");
builder.append("\tthreadPoolSize: " + threadPoolSize + "\n");
builder.append("\treadBufferSize: " + readBufferSize + "\n");

return builder.toString();
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
/*
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package software.amazon.s3.analyticsaccelerator.io.physical.data;

import java.io.Closeable;
import java.io.IOException;
import java.util.concurrent.CountDownLatch;
import javax.annotation.Nullable;
import lombok.Getter;
import lombok.NonNull;
import software.amazon.s3.analyticsaccelerator.common.Preconditions;
import software.amazon.s3.analyticsaccelerator.util.BlockKey;

/** Block object stores the data of a stream */
public class DataBlock implements Closeable {
/**
* The data of the block, set after construction via {@link #setData(byte[])}. Accessed only after
* ensuring readiness via {@link #awaitData()}.
*/
@Nullable private byte[] data;

@Getter private final BlockKey blockKey;
@Getter private final long generation;
private final CountDownLatch dataReadyLatch = new CountDownLatch(1);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: a lot of people (like me!) won't be familiar with what a countdownlatch does, would be great to add a comment here about the purpose of this variable.


/**
* Constructs a DataBlock object
*
* @param blockKey the objectKey and range of the object
* @param generation generation of the block in a sequential read pattern
*/
public DataBlock(@NonNull BlockKey blockKey, long generation) {
long start = blockKey.getRange().getStart();
long end = blockKey.getRange().getEnd();
Preconditions.checkArgument(
0 <= generation, "`generation` must be non-negative; was: %s", generation);
Preconditions.checkArgument(0 <= start, "`start` must be non-negative; was: %s", start);
Preconditions.checkArgument(0 <= end, "`end` must be non-negative; was: %s", end);

this.blockKey = blockKey;
this.generation = generation;
}

/**
* Reads a byte from the underlying object
*
* @param pos The position to read
* @return an unsigned int representing the byte that was read
* @throws IOException if an I/O error occurs
*/
public int read(long pos) throws IOException {
Preconditions.checkArgument(0 <= pos, "`pos` must not be negative");
awaitData();
int contentOffset = posToOffset(pos);
return Byte.toUnsignedInt(this.data[contentOffset]);
}

/**
* Reads data into the provided buffer
*
* @param buf buffer to read data into
* @param off start position in buffer at which data is written
* @param len length of data to be read
* @param pos the position to begin reading from
* @return the total number of bytes read into the buffer
* @throws IOException if an I/O error occurs
*/
public int read(byte @NonNull [] buf, int off, int len, long pos) throws IOException {
Preconditions.checkArgument(0 <= pos, "`pos` must not be negative");
Preconditions.checkArgument(0 <= off, "`off` must not be negative");
Preconditions.checkArgument(0 <= len, "`len` must not be negative");
Preconditions.checkArgument(off < buf.length, "`off` must be less than size of buffer");

awaitData();

int contentOffset = posToOffset(pos);
int available = this.data.length - contentOffset;
int bytesToCopy = Math.min(len, available);

if (bytesToCopy >= 0) System.arraycopy(this.data, contentOffset, buf, off, bytesToCopy);

return bytesToCopy;
}

/**
* Determines the offset in the Block corresponding to a position in an object.
*
* @param pos the position of a byte in the object
* @return the offset in the byte buffer underlying this Block
*/
private int posToOffset(long pos) {
return (int) (pos - this.blockKey.getRange().getStart());
}

/**
* Method to set data and reduce the dataReadyLatch to signal that data is ready
*
* @param data data of the block
*/
public void setData(final byte[] data) {
this.data = data;
dataReadyLatch.countDown();
}

/** Method to wait until data is fully loaded */
private void awaitData() throws IOException {
try {
dataReadyLatch.await();
} catch (InterruptedException e) {
throw new IOException("Failed to read data", e);
}

if (data == null) throw new IOException("Failed to read data");
}

/** Closes the {@link DataBlock} and frees up all resources it holds */
@Override
public void close() throws IOException {
this.data = null;
}
}
Loading