From ee79314b6705cb9d2d6ded139ab41a91fbbaed2b Mon Sep 17 00:00:00 2001 From: Sanjay Marreddi Date: Thu, 24 Apr 2025 14:28:58 +0100 Subject: [PATCH] AWS: Close the S3SeekableInputStreamFactory before removing from cache --- .../aws/s3/AnalyticsAcceleratorUtil.java | 24 ++++++++++++++++++- 1 file changed, 23 insertions(+), 1 deletion(-) diff --git a/aws/src/main/java/org/apache/iceberg/aws/s3/AnalyticsAcceleratorUtil.java b/aws/src/main/java/org/apache/iceberg/aws/s3/AnalyticsAcceleratorUtil.java index 192ed015204a..ed1c3a4879e3 100644 --- a/aws/src/main/java/org/apache/iceberg/aws/s3/AnalyticsAcceleratorUtil.java +++ b/aws/src/main/java/org/apache/iceberg/aws/s3/AnalyticsAcceleratorUtil.java @@ -20,10 +20,13 @@ import com.github.benmanes.caffeine.cache.Cache; import com.github.benmanes.caffeine.cache.Caffeine; +import com.github.benmanes.caffeine.cache.RemovalListener; import java.io.IOException; import org.apache.iceberg.exceptions.RuntimeIOException; import org.apache.iceberg.io.SeekableInputStream; import org.apache.iceberg.util.Pair; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import software.amazon.awssdk.services.s3.S3AsyncClient; import software.amazon.awssdk.services.s3.model.HeadObjectResponse; import software.amazon.s3.analyticsaccelerator.ObjectClientConfiguration; @@ -39,8 +42,17 @@ class AnalyticsAcceleratorUtil { + private static final Logger LOG = LoggerFactory.getLogger(AnalyticsAcceleratorUtil.class); + private static final Cache, S3SeekableInputStreamFactory> - STREAM_FACTORY_CACHE = Caffeine.newBuilder().maximumSize(100).build(); + STREAM_FACTORY_CACHE = + Caffeine.newBuilder() + .maximumSize(100) + .removalListener( + (RemovalListener< + Pair, S3SeekableInputStreamFactory>) + (key, factory, cause) -> close(factory)) + .build(); private AnalyticsAcceleratorUtil() {} @@ -83,6 +95,16 @@ private static S3SeekableInputStreamFactory createNewFactory( return new S3SeekableInputStreamFactory(objectClient, streamConfiguration); } + private static void close(S3SeekableInputStreamFactory factory) { + if (factory != null) { + try { + factory.close(); + } catch (IOException e) { + LOG.warn("Failed to close S3SeekableInputStreamFactory", e); + } + } + } + public static void cleanupCache( S3AsyncClient asyncClient, S3FileIOProperties s3FileIOProperties) { STREAM_FACTORY_CACHE.invalidate(Pair.of(asyncClient, s3FileIOProperties));