diff --git a/aws/src/main/java/org/apache/iceberg/aws/s3/AnalyticsAcceleratorInputStreamWrapper.java b/aws/src/main/java/org/apache/iceberg/aws/s3/AnalyticsAcceleratorInputStreamWrapper.java index 197f79905f0f..409f3095870e 100644 --- a/aws/src/main/java/org/apache/iceberg/aws/s3/AnalyticsAcceleratorInputStreamWrapper.java +++ b/aws/src/main/java/org/apache/iceberg/aws/s3/AnalyticsAcceleratorInputStreamWrapper.java @@ -19,31 +19,68 @@ package org.apache.iceberg.aws.s3; import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.List; +import java.util.function.IntFunction; +import java.util.stream.Collectors; +import org.apache.iceberg.io.FileIOMetricsContext; +import org.apache.iceberg.io.FileRange; +import org.apache.iceberg.io.RangeReadable; import org.apache.iceberg.io.SeekableInputStream; +import org.apache.iceberg.metrics.Counter; +import org.apache.iceberg.metrics.MetricsContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import software.amazon.s3.analyticsaccelerator.S3SeekableInputStream; +import software.amazon.s3.analyticsaccelerator.common.ObjectRange; /** A wrapper to convert {@link S3SeekableInputStream} to Iceberg {@link SeekableInputStream} */ -class AnalyticsAcceleratorInputStreamWrapper extends SeekableInputStream { +class AnalyticsAcceleratorInputStreamWrapper extends SeekableInputStream implements RangeReadable { + private static final Logger LOG = + LoggerFactory.getLogger(AnalyticsAcceleratorInputStreamWrapper.class); private final S3SeekableInputStream delegate; + private final Counter readBytes; + private final Counter readOperations; AnalyticsAcceleratorInputStreamWrapper(S3SeekableInputStream stream) { + this(stream, MetricsContext.nullMetrics()); + } + + AnalyticsAcceleratorInputStreamWrapper(S3SeekableInputStream stream, MetricsContext metrics) { this.delegate = stream; + this.readBytes = metrics.counter(FileIOMetricsContext.READ_BYTES, MetricsContext.Unit.BYTES); + this.readOperations = metrics.counter(FileIOMetricsContext.READ_OPERATIONS); } @Override public int read() throws IOException { - return this.delegate.read(); + int nextByteValue = this.delegate.read(); + if (nextByteValue != -1) { + readBytes.increment(); + } + readOperations.increment(); + return nextByteValue; } @Override - public int read(byte[] b) throws IOException { - return this.delegate.read(b, 0, b.length); + public int read(byte[] b, int off, int len) throws IOException { + int bytesRead = this.delegate.read(b, off, len); + if (bytesRead > 0) { + readBytes.increment(bytesRead); + } + readOperations.increment(); + return bytesRead; } @Override - public int read(byte[] b, int off, int len) throws IOException { - return this.delegate.read(b, off, len); + public void readFully(long position, byte[] buffer, int offset, int length) throws IOException { + this.delegate.readFully(position, buffer, offset, length); + } + + @Override + public int readTail(byte[] buffer, int offset, int length) throws IOException { + return this.delegate.readTail(buffer, offset, length); } @Override @@ -56,6 +93,25 @@ public long getPos() { return this.delegate.getPos(); } + @Override + public void readVectored(List ranges, IntFunction allocate) + throws IOException { + List objectRanges = + ranges.stream() + .map( + range -> + new software.amazon.s3.analyticsaccelerator.common.ObjectRange( + range.byteBuffer(), range.offset(), range.length())) + .collect(Collectors.toList()); + + // This does not keep track of bytes read and that is a possible improvement. + readOperations.increment(); + this.delegate.readVectored( + objectRanges, + allocate, + buffer -> LOG.info("Release buffer of length {}: {}", buffer.limit(), buffer)); + } + @Override public void close() throws IOException { this.delegate.close(); diff --git a/aws/src/main/java/org/apache/iceberg/aws/s3/AnalyticsAcceleratorUtil.java b/aws/src/main/java/org/apache/iceberg/aws/s3/AnalyticsAcceleratorUtil.java index ed1c3a4879e3..f4313d4aa1d3 100644 --- a/aws/src/main/java/org/apache/iceberg/aws/s3/AnalyticsAcceleratorUtil.java +++ b/aws/src/main/java/org/apache/iceberg/aws/s3/AnalyticsAcceleratorUtil.java @@ -27,30 +27,27 @@ import org.apache.iceberg.util.Pair; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import software.amazon.awssdk.services.s3.S3AsyncClient; -import software.amazon.awssdk.services.s3.model.HeadObjectResponse; +import software.amazon.awssdk.services.s3.S3Client; import software.amazon.s3.analyticsaccelerator.ObjectClientConfiguration; -import software.amazon.s3.analyticsaccelerator.S3SdkObjectClient; import software.amazon.s3.analyticsaccelerator.S3SeekableInputStream; import software.amazon.s3.analyticsaccelerator.S3SeekableInputStreamConfiguration; import software.amazon.s3.analyticsaccelerator.S3SeekableInputStreamFactory; +import software.amazon.s3.analyticsaccelerator.S3SyncSdkObjectClient; import software.amazon.s3.analyticsaccelerator.common.ConnectorConfiguration; import software.amazon.s3.analyticsaccelerator.request.ObjectClient; -import software.amazon.s3.analyticsaccelerator.request.ObjectMetadata; -import software.amazon.s3.analyticsaccelerator.util.OpenStreamInformation; import software.amazon.s3.analyticsaccelerator.util.S3URI; class AnalyticsAcceleratorUtil { private static final Logger LOG = LoggerFactory.getLogger(AnalyticsAcceleratorUtil.class); - private static final Cache, S3SeekableInputStreamFactory> + private static final Cache, S3SeekableInputStreamFactory> STREAM_FACTORY_CACHE = Caffeine.newBuilder() .maximumSize(100) .removalListener( (RemovalListener< - Pair, S3SeekableInputStreamFactory>) + Pair, S3SeekableInputStreamFactory>) (key, factory, cause) -> close(factory)) .build(); @@ -58,24 +55,15 @@ private AnalyticsAcceleratorUtil() {} public static SeekableInputStream newStream(S3InputFile inputFile) { S3URI uri = S3URI.of(inputFile.uri().bucket(), inputFile.uri().key()); - HeadObjectResponse metadata = inputFile.getObjectMetadata(); - OpenStreamInformation openStreamInfo = - OpenStreamInformation.builder() - .objectMetadata( - ObjectMetadata.builder() - .contentLength(metadata.contentLength()) - .etag(metadata.eTag()) - .build()) - .build(); S3SeekableInputStreamFactory factory = STREAM_FACTORY_CACHE.get( - Pair.of(inputFile.asyncClient(), inputFile.s3FileIOProperties()), + Pair.of(inputFile.client(), inputFile.s3FileIOProperties()), AnalyticsAcceleratorUtil::createNewFactory); try { - S3SeekableInputStream seekableInputStream = factory.createStream(uri, openStreamInfo); - return new AnalyticsAcceleratorInputStreamWrapper(seekableInputStream); + S3SeekableInputStream seekableInputStream = factory.createStream(uri); + return new AnalyticsAcceleratorInputStreamWrapper(seekableInputStream, inputFile.metrics()); } catch (IOException e) { throw new RuntimeIOException( e, "Failed to create S3 analytics accelerator input stream for: %s", inputFile.uri()); @@ -83,15 +71,15 @@ public static SeekableInputStream newStream(S3InputFile inputFile) { } private static S3SeekableInputStreamFactory createNewFactory( - Pair cacheKey) { + Pair cacheKey) { ConnectorConfiguration connectorConfiguration = new ConnectorConfiguration(cacheKey.second().s3AnalyticsacceleratorProperties()); S3SeekableInputStreamConfiguration streamConfiguration = S3SeekableInputStreamConfiguration.fromConfiguration(connectorConfiguration); ObjectClientConfiguration objectClientConfiguration = ObjectClientConfiguration.fromConfiguration(connectorConfiguration); - - ObjectClient objectClient = new S3SdkObjectClient(cacheKey.first(), objectClientConfiguration); + ObjectClient objectClient = + new S3SyncSdkObjectClient(cacheKey.first(), objectClientConfiguration); return new S3SeekableInputStreamFactory(objectClient, streamConfiguration); } @@ -105,8 +93,7 @@ private static void close(S3SeekableInputStreamFactory factory) { } } - public static void cleanupCache( - S3AsyncClient asyncClient, S3FileIOProperties s3FileIOProperties) { - STREAM_FACTORY_CACHE.invalidate(Pair.of(asyncClient, s3FileIOProperties)); + public static void cleanupCache(S3Client client, S3FileIOProperties s3FileIOProperties) { + STREAM_FACTORY_CACHE.invalidate(Pair.of(client, s3FileIOProperties)); } } diff --git a/aws/src/main/java/org/apache/iceberg/aws/s3/PrefixedS3Client.java b/aws/src/main/java/org/apache/iceberg/aws/s3/PrefixedS3Client.java index 400792cf976c..879f04a43003 100644 --- a/aws/src/main/java/org/apache/iceberg/aws/s3/PrefixedS3Client.java +++ b/aws/src/main/java/org/apache/iceberg/aws/s3/PrefixedS3Client.java @@ -109,14 +109,14 @@ public S3FileIOProperties s3FileIOProperties() { @Override public void close() { if (null != s3Client) { + // cleanup usage in analytics accelerator if any + if (s3FileIOProperties().isS3AnalyticsAcceleratorEnabled()) { + AnalyticsAcceleratorUtil.cleanupCache(s3Client, s3FileIOProperties); + } s3Client.close(); } if (null != s3AsyncClient) { - // cleanup usage in analytics accelerator if any - if (s3FileIOProperties().isS3AnalyticsAcceleratorEnabled()) { - AnalyticsAcceleratorUtil.cleanupCache(s3AsyncClient, s3FileIOProperties); - } s3AsyncClient.close(); } }