diff --git a/plugin/trino-exchange-filesystem/src/main/java/io/trino/plugin/exchange/filesystem/s3/S3AsyncClientWrapper.java b/plugin/trino-exchange-filesystem/src/main/java/io/trino/plugin/exchange/filesystem/s3/S3AsyncClientWrapper.java new file mode 100644 index 000000000000..012d5b3adc47 --- /dev/null +++ b/plugin/trino-exchange-filesystem/src/main/java/io/trino/plugin/exchange/filesystem/s3/S3AsyncClientWrapper.java @@ -0,0 +1,172 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.exchange.filesystem.s3; + +import software.amazon.awssdk.awscore.AwsRequestOverrideConfiguration; +import software.amazon.awssdk.core.ApiName; +import software.amazon.awssdk.core.async.AsyncRequestBody; +import software.amazon.awssdk.core.async.AsyncResponseTransformer; +import software.amazon.awssdk.core.util.VersionInfo; +import software.amazon.awssdk.services.s3.S3AsyncClient; +import software.amazon.awssdk.services.s3.model.AbortMultipartUploadRequest; +import software.amazon.awssdk.services.s3.model.AbortMultipartUploadResponse; +import software.amazon.awssdk.services.s3.model.CompleteMultipartUploadRequest; +import software.amazon.awssdk.services.s3.model.CompleteMultipartUploadResponse; +import software.amazon.awssdk.services.s3.model.CreateMultipartUploadRequest; +import software.amazon.awssdk.services.s3.model.CreateMultipartUploadResponse; +import software.amazon.awssdk.services.s3.model.DeleteObjectsRequest; +import software.amazon.awssdk.services.s3.model.DeleteObjectsResponse; +import software.amazon.awssdk.services.s3.model.GetObjectRequest; +import software.amazon.awssdk.services.s3.model.GetObjectResponse; +import software.amazon.awssdk.services.s3.model.ListObjectsV2Request; +import software.amazon.awssdk.services.s3.model.ListObjectsV2Response; +import software.amazon.awssdk.services.s3.model.PutObjectRequest; +import software.amazon.awssdk.services.s3.model.PutObjectResponse; +import software.amazon.awssdk.services.s3.model.S3Request; +import software.amazon.awssdk.services.s3.model.UploadPartRequest; +import software.amazon.awssdk.services.s3.model.UploadPartResponse; +import software.amazon.awssdk.services.s3.paginators.ListObjectsV2Publisher; + +import java.util.concurrent.CompletableFuture; +import java.util.function.Consumer; + +import static io.trino.plugin.exchange.filesystem.s3.S3AsyncClientWrapper.RequestType.ABORT_MULTIPART_UPLOAD; +import static io.trino.plugin.exchange.filesystem.s3.S3AsyncClientWrapper.RequestType.COMPLETE_MULTIPART_UPLOAD; +import static io.trino.plugin.exchange.filesystem.s3.S3AsyncClientWrapper.RequestType.CREATE_MULTIPART_UPLOAD; +import static io.trino.plugin.exchange.filesystem.s3.S3AsyncClientWrapper.RequestType.DELETE_OBJECTS; +import static io.trino.plugin.exchange.filesystem.s3.S3AsyncClientWrapper.RequestType.GET_OBJECT; +import static io.trino.plugin.exchange.filesystem.s3.S3AsyncClientWrapper.RequestType.LIST_OBJECTS_V2; +import static io.trino.plugin.exchange.filesystem.s3.S3AsyncClientWrapper.RequestType.PUT_OBJECT; +import static io.trino.plugin.exchange.filesystem.s3.S3AsyncClientWrapper.RequestType.UPLOAD_PART; +import static java.util.Objects.requireNonNull; + +public abstract class S3AsyncClientWrapper + implements S3AsyncClient +{ + private final S3AsyncClient delegate; + + public S3AsyncClientWrapper(S3AsyncClient delegate) + { + this.delegate = requireNonNull(delegate, "delegate is null"); + } + + @Override + public String serviceName() + { + return delegate.serviceName(); + } + + @Override + public CompletableFuture putObject(PutObjectRequest request, AsyncRequestBody body) + { + CompletableFuture future = delegate.putObject(request, body); + handle(PUT_OBJECT, future); + return future; + } + + @Override + public CompletableFuture deleteObjects(DeleteObjectsRequest request) + { + CompletableFuture future = delegate.deleteObjects(request); + handle(DELETE_OBJECTS, future); + return future; + } + + @Override + public CompletableFuture getObject(GetObjectRequest request, AsyncResponseTransformer transformer) + { + CompletableFuture future = delegate.getObject(request, transformer); + handle(GET_OBJECT, future); + return future; + } + + @Override + public CompletableFuture createMultipartUpload(CreateMultipartUploadRequest request) + { + CompletableFuture future = delegate.createMultipartUpload(request); + handle(CREATE_MULTIPART_UPLOAD, future); + return future; + } + + @Override + public CompletableFuture uploadPart(UploadPartRequest request, AsyncRequestBody body) + { + CompletableFuture future = delegate.uploadPart(request, body); + handle(UPLOAD_PART, future); + return future; + } + + @Override + public CompletableFuture completeMultipartUpload(CompleteMultipartUploadRequest request) + { + CompletableFuture future = delegate.completeMultipartUpload(request); + handle(COMPLETE_MULTIPART_UPLOAD, future); + return future; + } + + @Override + public CompletableFuture abortMultipartUpload(AbortMultipartUploadRequest request) + { + CompletableFuture future = delegate.abortMultipartUpload(request); + handle(ABORT_MULTIPART_UPLOAD, future); + return future; + } + + @Override + public CompletableFuture listObjectsV2(ListObjectsV2Request request) + { + CompletableFuture future = delegate.listObjectsV2(request); + handle(LIST_OBJECTS_V2, future); + return future; + } + + protected abstract void handle(RequestType requestType, CompletableFuture responseFuture); + + @Override + public ListObjectsV2Publisher listObjectsV2Paginator(ListObjectsV2Request listObjectsV2Request) + { + return new ListObjectsV2Publisher(this, applyPaginatorUserAgent(listObjectsV2Request)); + } + + /** + * Based on {@link software.amazon.awssdk.services.s3.DefaultS3AsyncClient#applyPaginatorUserAgent(S3Request)} + */ + private T applyPaginatorUserAgent(T request) + { + Consumer userAgentApplier = b -> b.addApiName(ApiName.builder() + .version(VersionInfo.SDK_VERSION).name("PAGINATED").build()); + AwsRequestOverrideConfiguration overrideConfiguration = request.overrideConfiguration() + .map(c -> c.toBuilder().applyMutation(userAgentApplier).build()) + .orElse((AwsRequestOverrideConfiguration.builder().applyMutation(userAgentApplier).build())); + return (T) request.toBuilder().overrideConfiguration(overrideConfiguration).build(); + } + + @Override + public void close() + { + delegate.close(); + } + + public enum RequestType + { + PUT_OBJECT, + DELETE_OBJECTS, + GET_OBJECT, + CREATE_MULTIPART_UPLOAD, + UPLOAD_PART, + COMPLETE_MULTIPART_UPLOAD, + ABORT_MULTIPART_UPLOAD, + LIST_OBJECTS_V2, + } +} diff --git a/plugin/trino-exchange-filesystem/src/main/java/io/trino/plugin/exchange/filesystem/s3/S3FileSystemExchangeStorage.java b/plugin/trino-exchange-filesystem/src/main/java/io/trino/plugin/exchange/filesystem/s3/S3FileSystemExchangeStorage.java index 3bdadaa7cf49..f7ad333f3837 100644 --- a/plugin/trino-exchange-filesystem/src/main/java/io/trino/plugin/exchange/filesystem/s3/S3FileSystemExchangeStorage.java +++ b/plugin/trino-exchange-filesystem/src/main/java/io/trino/plugin/exchange/filesystem/s3/S3FileSystemExchangeStorage.java @@ -29,6 +29,7 @@ import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListeningExecutorService; +import io.airlift.log.Logger; import io.airlift.slice.Slice; import io.airlift.slice.SliceInput; import io.airlift.slice.Slices; @@ -96,6 +97,7 @@ import java.util.List; import java.util.Optional; import java.util.Queue; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; @@ -126,7 +128,10 @@ public class S3FileSystemExchangeStorage implements FileSystemExchangeStorage { - public enum CompatibilityMode { + private static final Logger log = Logger.get(S3FileSystemExchangeStorage.class); + + public enum CompatibilityMode + { AWS, GCP } @@ -163,12 +168,26 @@ public S3FileSystemExchangeStorage(S3FileSystemExchangeStorageStats stats, Excha .putAdvancedOption(USER_AGENT_PREFIX, "") .putAdvancedOption(USER_AGENT_SUFFIX, "Trino-exchange") .build(); - this.s3AsyncClient = createS3AsyncClient( + S3AsyncClient client = createS3AsyncClient( credentialsProvider, overrideConfig, config.getAsyncClientConcurrency(), config.getAsyncClientMaxPendingConnectionAcquires(), config.getConnectionAcquisitionTimeout()); + this.s3AsyncClient = new S3AsyncClientWrapper(client) + { + @Override + protected void handle(RequestType requestType, CompletableFuture responseFuture) + { + stats.requestStarted(requestType); + responseFuture.whenComplete((result, failure) -> { + if (failure != null && failure.getMessage() != null && failure.getMessage().contains("Maximum pending connection acquisitions exceeded")) { + log.error(failure, "Encountered 'Maximum pending connection acquisitions exceeded' error. Active requests: %s", stats.getActiveRequestsSummary()); + } + stats.requestCompleted(requestType); + }); + } + }; if (compatibilityMode == GCP) { Optional gcsJsonKeyFilePath = config.getGcsJsonKeyFilePath(); diff --git a/plugin/trino-exchange-filesystem/src/main/java/io/trino/plugin/exchange/filesystem/s3/S3FileSystemExchangeStorageStats.java b/plugin/trino-exchange-filesystem/src/main/java/io/trino/plugin/exchange/filesystem/s3/S3FileSystemExchangeStorageStats.java index 612c32ce2a0b..fc73c3ef1408 100644 --- a/plugin/trino-exchange-filesystem/src/main/java/io/trino/plugin/exchange/filesystem/s3/S3FileSystemExchangeStorageStats.java +++ b/plugin/trino-exchange-filesystem/src/main/java/io/trino/plugin/exchange/filesystem/s3/S3FileSystemExchangeStorageStats.java @@ -15,9 +15,25 @@ import io.airlift.stats.DistributionStat; import io.trino.plugin.exchange.filesystem.ExecutionStats; +import io.trino.plugin.exchange.filesystem.s3.S3AsyncClientWrapper.RequestType; import org.weakref.jmx.Managed; import org.weakref.jmx.Nested; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicLong; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.collect.ImmutableMap.toImmutableMap; +import static io.trino.plugin.exchange.filesystem.s3.S3AsyncClientWrapper.RequestType.ABORT_MULTIPART_UPLOAD; +import static io.trino.plugin.exchange.filesystem.s3.S3AsyncClientWrapper.RequestType.COMPLETE_MULTIPART_UPLOAD; +import static io.trino.plugin.exchange.filesystem.s3.S3AsyncClientWrapper.RequestType.CREATE_MULTIPART_UPLOAD; +import static io.trino.plugin.exchange.filesystem.s3.S3AsyncClientWrapper.RequestType.DELETE_OBJECTS; +import static io.trino.plugin.exchange.filesystem.s3.S3AsyncClientWrapper.RequestType.GET_OBJECT; +import static io.trino.plugin.exchange.filesystem.s3.S3AsyncClientWrapper.RequestType.LIST_OBJECTS_V2; +import static io.trino.plugin.exchange.filesystem.s3.S3AsyncClientWrapper.RequestType.PUT_OBJECT; +import static io.trino.plugin.exchange.filesystem.s3.S3AsyncClientWrapper.RequestType.UPLOAD_PART; + public class S3FileSystemExchangeStorageStats { private final ExecutionStats createEmptyFile = new ExecutionStats(); @@ -35,6 +51,7 @@ public class S3FileSystemExchangeStorageStats private final ExecutionStats completeMultipartUpload = new ExecutionStats(); private final DistributionStat completeMultipartUploadPartsCount = new DistributionStat(); private final ExecutionStats abortMultipartUpload = new ExecutionStats(); + private final Map activeRequests = new ConcurrentHashMap<>(); @Managed @Nested @@ -140,4 +157,76 @@ public ExecutionStats getAbortMultipartUpload() { return abortMultipartUpload; } + + public void requestStarted(RequestType requestType) + { + activeRequests.computeIfAbsent(requestType, key -> new AtomicLong()).incrementAndGet(); + } + + public void requestCompleted(RequestType requestType) + { + AtomicLong count = activeRequests.get(requestType); + checkArgument(count != null && count.get() >= 0, "no active requests of type %s found", requestType); + count.decrementAndGet(); + } + + @Managed + public long getActivePutObjectRequestCount() + { + return getActiveRequestCount(PUT_OBJECT); + } + + @Managed + public long getActiveDeleteObjectsRequestCount() + { + return getActiveRequestCount(DELETE_OBJECTS); + } + + @Managed + public long getActiveGetObjectRequestCount() + { + return getActiveRequestCount(GET_OBJECT); + } + + @Managed + public long getActiveCreateMultipartUploadRequestCount() + { + return getActiveRequestCount(CREATE_MULTIPART_UPLOAD); + } + + @Managed + public long getActiveUploadPartRequestCount() + { + return getActiveRequestCount(UPLOAD_PART); + } + + @Managed + public long getActiveCompleteMultipartUploadRequestCount() + { + return getActiveRequestCount(COMPLETE_MULTIPART_UPLOAD); + } + + @Managed + public long getActiveAbortMultipartUploadRequestCount() + { + return getActiveRequestCount(ABORT_MULTIPART_UPLOAD); + } + + @Managed + public long getListObjectsV2RequestCount() + { + return getActiveRequestCount(LIST_OBJECTS_V2); + } + + public Map getActiveRequestsSummary() + { + return activeRequests.entrySet().stream() + .collect(toImmutableMap(Map.Entry::getKey, entry -> entry.getValue().get())); + } + + private long getActiveRequestCount(RequestType requestType) + { + AtomicLong count = activeRequests.get(requestType); + return count == null ? 0 : count.get(); + } }