Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,31 +19,68 @@
package org.apache.iceberg.aws.s3;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.function.IntFunction;
import java.util.stream.Collectors;
import org.apache.iceberg.io.FileIOMetricsContext;
import org.apache.iceberg.io.FileRange;
import org.apache.iceberg.io.RangeReadable;
import org.apache.iceberg.io.SeekableInputStream;
import org.apache.iceberg.metrics.Counter;
import org.apache.iceberg.metrics.MetricsContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.s3.analyticsaccelerator.S3SeekableInputStream;
import software.amazon.s3.analyticsaccelerator.common.ObjectRange;

/** A wrapper to convert {@link S3SeekableInputStream} to Iceberg {@link SeekableInputStream} */
class AnalyticsAcceleratorInputStreamWrapper extends SeekableInputStream {
class AnalyticsAcceleratorInputStreamWrapper extends SeekableInputStream implements RangeReadable {
private static final Logger LOG =
LoggerFactory.getLogger(AnalyticsAcceleratorInputStreamWrapper.class);

private final S3SeekableInputStream delegate;
private final Counter readBytes;
private final Counter readOperations;

AnalyticsAcceleratorInputStreamWrapper(S3SeekableInputStream stream) {
this(stream, MetricsContext.nullMetrics());
}

AnalyticsAcceleratorInputStreamWrapper(S3SeekableInputStream stream, MetricsContext metrics) {
this.delegate = stream;
this.readBytes = metrics.counter(FileIOMetricsContext.READ_BYTES, MetricsContext.Unit.BYTES);
this.readOperations = metrics.counter(FileIOMetricsContext.READ_OPERATIONS);
}

@Override
public int read() throws IOException {
return this.delegate.read();
int nextByteValue = this.delegate.read();
if (nextByteValue != -1) {
readBytes.increment();
}
readOperations.increment();
return nextByteValue;
}

@Override
public int read(byte[] b) throws IOException {
return this.delegate.read(b, 0, b.length);
public int read(byte[] b, int off, int len) throws IOException {
int bytesRead = this.delegate.read(b, off, len);
if (bytesRead > 0) {
readBytes.increment(bytesRead);
}
readOperations.increment();
return bytesRead;
}

@Override
public int read(byte[] b, int off, int len) throws IOException {
return this.delegate.read(b, off, len);
public void readFully(long position, byte[] buffer, int offset, int length) throws IOException {
this.delegate.readFully(position, buffer, offset, length);
}

@Override
public int readTail(byte[] buffer, int offset, int length) throws IOException {
return this.delegate.readTail(buffer, offset, length);
}

@Override
Expand All @@ -56,6 +93,25 @@ public long getPos() {
return this.delegate.getPos();
}

@Override
public void readVectored(List<FileRange> ranges, IntFunction<ByteBuffer> allocate)
throws IOException {
List<ObjectRange> objectRanges =
ranges.stream()
.map(
range ->
new software.amazon.s3.analyticsaccelerator.common.ObjectRange(
range.byteBuffer(), range.offset(), range.length()))
.collect(Collectors.toList());

// This does not keep track of bytes read and that is a possible improvement.
readOperations.increment();
this.delegate.readVectored(
objectRanges,
allocate,
buffer -> LOG.info("Release buffer of length {}: {}", buffer.limit(), buffer));
}

@Override
public void close() throws IOException {
this.delegate.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,71 +27,59 @@
import org.apache.iceberg.util.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.services.s3.S3AsyncClient;
import software.amazon.awssdk.services.s3.model.HeadObjectResponse;
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.s3.analyticsaccelerator.ObjectClientConfiguration;
import software.amazon.s3.analyticsaccelerator.S3SdkObjectClient;
import software.amazon.s3.analyticsaccelerator.S3SeekableInputStream;
import software.amazon.s3.analyticsaccelerator.S3SeekableInputStreamConfiguration;
import software.amazon.s3.analyticsaccelerator.S3SeekableInputStreamFactory;
import software.amazon.s3.analyticsaccelerator.S3SyncSdkObjectClient;
import software.amazon.s3.analyticsaccelerator.common.ConnectorConfiguration;
import software.amazon.s3.analyticsaccelerator.request.ObjectClient;
import software.amazon.s3.analyticsaccelerator.request.ObjectMetadata;
import software.amazon.s3.analyticsaccelerator.util.OpenStreamInformation;
import software.amazon.s3.analyticsaccelerator.util.S3URI;

class AnalyticsAcceleratorUtil {

private static final Logger LOG = LoggerFactory.getLogger(AnalyticsAcceleratorUtil.class);

private static final Cache<Pair<S3AsyncClient, S3FileIOProperties>, S3SeekableInputStreamFactory>
private static final Cache<Pair<S3Client, S3FileIOProperties>, S3SeekableInputStreamFactory>
STREAM_FACTORY_CACHE =
Caffeine.newBuilder()
.maximumSize(100)
.removalListener(
(RemovalListener<
Pair<S3AsyncClient, S3FileIOProperties>, S3SeekableInputStreamFactory>)
Pair<S3Client, S3FileIOProperties>, S3SeekableInputStreamFactory>)
(key, factory, cause) -> close(factory))
.build();

private AnalyticsAcceleratorUtil() {}

public static SeekableInputStream newStream(S3InputFile inputFile) {
S3URI uri = S3URI.of(inputFile.uri().bucket(), inputFile.uri().key());
HeadObjectResponse metadata = inputFile.getObjectMetadata();
OpenStreamInformation openStreamInfo =
OpenStreamInformation.builder()
.objectMetadata(
ObjectMetadata.builder()
.contentLength(metadata.contentLength())
.etag(metadata.eTag())
.build())
.build();

S3SeekableInputStreamFactory factory =
STREAM_FACTORY_CACHE.get(
Pair.of(inputFile.asyncClient(), inputFile.s3FileIOProperties()),
Pair.of(inputFile.client(), inputFile.s3FileIOProperties()),
AnalyticsAcceleratorUtil::createNewFactory);

try {
S3SeekableInputStream seekableInputStream = factory.createStream(uri, openStreamInfo);
return new AnalyticsAcceleratorInputStreamWrapper(seekableInputStream);
S3SeekableInputStream seekableInputStream = factory.createStream(uri);
return new AnalyticsAcceleratorInputStreamWrapper(seekableInputStream, inputFile.metrics());
} catch (IOException e) {
throw new RuntimeIOException(
e, "Failed to create S3 analytics accelerator input stream for: %s", inputFile.uri());
}
}

private static S3SeekableInputStreamFactory createNewFactory(
Pair<S3AsyncClient, S3FileIOProperties> cacheKey) {
Pair<S3Client, S3FileIOProperties> cacheKey) {
ConnectorConfiguration connectorConfiguration =
new ConnectorConfiguration(cacheKey.second().s3AnalyticsacceleratorProperties());
S3SeekableInputStreamConfiguration streamConfiguration =
S3SeekableInputStreamConfiguration.fromConfiguration(connectorConfiguration);
ObjectClientConfiguration objectClientConfiguration =
ObjectClientConfiguration.fromConfiguration(connectorConfiguration);

ObjectClient objectClient = new S3SdkObjectClient(cacheKey.first(), objectClientConfiguration);
ObjectClient objectClient =
new S3SyncSdkObjectClient(cacheKey.first(), objectClientConfiguration);
return new S3SeekableInputStreamFactory(objectClient, streamConfiguration);
}

Expand All @@ -105,8 +93,7 @@ private static void close(S3SeekableInputStreamFactory factory) {
}
}

public static void cleanupCache(
S3AsyncClient asyncClient, S3FileIOProperties s3FileIOProperties) {
STREAM_FACTORY_CACHE.invalidate(Pair.of(asyncClient, s3FileIOProperties));
public static void cleanupCache(S3Client client, S3FileIOProperties s3FileIOProperties) {
STREAM_FACTORY_CACHE.invalidate(Pair.of(client, s3FileIOProperties));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -109,14 +109,14 @@ public S3FileIOProperties s3FileIOProperties() {
@Override
public void close() {
if (null != s3Client) {
// cleanup usage in analytics accelerator if any
if (s3FileIOProperties().isS3AnalyticsAcceleratorEnabled()) {
AnalyticsAcceleratorUtil.cleanupCache(s3Client, s3FileIOProperties);
}
s3Client.close();
}

if (null != s3AsyncClient) {
// cleanup usage in analytics accelerator if any
if (s3FileIOProperties().isS3AnalyticsAcceleratorEnabled()) {
AnalyticsAcceleratorUtil.cleanupCache(s3AsyncClient, s3FileIOProperties);
}
s3AsyncClient.close();
}
}
Expand Down