Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions lib/trino-filesystem-azure/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,11 @@
<artifactId>opentelemetry-api</artifactId>
</dependency>

<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-core</artifactId>
</dependency>

<dependency>
<groupId>io.projectreactor.netty</groupId>
<artifactId>reactor-netty-core</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package io.trino.filesystem.azure;

import com.azure.core.http.HttpClient;
import com.azure.core.http.policy.HttpPipelinePolicy;
import com.azure.core.http.rest.PagedIterable;
import com.azure.core.util.ClientOptions;
import com.azure.core.util.TracingOptions;
Expand Down Expand Up @@ -80,6 +81,7 @@ public class AzureFileSystem
implements TrinoFileSystem
{
private final HttpClient httpClient;
private final HttpPipelinePolicy concurrencyPolicy;
private final ExecutorService uploadExecutor;
private final TracingOptions tracingOptions;
private final AzureAuth azureAuth;
Expand All @@ -92,6 +94,7 @@ public class AzureFileSystem

public AzureFileSystem(
HttpClient httpClient,
HttpPipelinePolicy concurrencyPolicy,
ExecutorService uploadExecutor,
TracingOptions tracingOptions,
AzureAuth azureAuth,
Expand All @@ -103,6 +106,7 @@ public AzureFileSystem(
boolean multipartWriteEnabled)
{
this.httpClient = requireNonNull(httpClient, "httpClient is null");
this.concurrencyPolicy = requireNonNull(concurrencyPolicy, "concurrencyPolicy is null");
this.uploadExecutor = requireNonNull(uploadExecutor, "uploadExecutor is null");
this.tracingOptions = requireNonNull(tracingOptions, "tracingOptions is null");
this.azureAuth = requireNonNull(azureAuth, "azureAuth is null");
Expand Down Expand Up @@ -627,6 +631,7 @@ private BlobContainerClient createBlobContainerClient(AzureLocation location, Op

BlobContainerClientBuilder builder = new BlobContainerClientBuilder()
.httpClient(httpClient)
.addPolicy(concurrencyPolicy)
.clientOptions(new ClientOptions().setTracingOptions(tracingOptions))
.endpoint("https://%s.blob.%s".formatted(location.account(), validatedEndpoint(location)));

Expand All @@ -643,6 +648,7 @@ private DataLakeFileSystemClient createFileSystemClient(AzureLocation location,

DataLakeServiceClientBuilder builder = new DataLakeServiceClientBuilder()
.httpClient(httpClient)
.addPolicy(concurrencyPolicy)
.clientOptions(new ClientOptions().setTracingOptions(tracingOptions))
.endpoint("https://%s.dfs.%s".formatted(location.account(), validatedEndpoint(location)));
key.ifPresent(encryption -> builder.customerProvidedKey(lakeCustomerProvidedKey(encryption)));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import com.azure.core.http.HttpClient;
import com.azure.core.http.netty.NettyAsyncHttpClientBuilder;
import com.azure.core.http.policy.HttpPipelinePolicy;
import com.azure.core.tracing.opentelemetry.OpenTelemetryTracingOptions;
import com.azure.core.util.HttpClientOptions;
import com.azure.core.util.TracingOptions;
Expand Down Expand Up @@ -54,6 +55,7 @@ public class AzureFileSystemFactory
private final ConnectionProvider connectionProvider;
private final EventLoopGroup eventLoopGroup;
private final boolean multipart;
private final HttpPipelinePolicy concurrencyPolicy;

@Inject
public AzureFileSystemFactory(OpenTelemetry openTelemetry, AzureAuth azureAuth, AzureFileSystemConfig config)
Expand Down Expand Up @@ -91,12 +93,13 @@ public AzureFileSystemFactory(
this.maxSingleUploadSize = requireNonNull(maxSingleUploadSize, "maxSingleUploadSize is null");
this.tracingOptions = new OpenTelemetryTracingOptions().setOpenTelemetry(openTelemetry);
this.connectionProvider = ConnectionProvider.create(applicationId, maxHttpRequests);
this.eventLoopGroup = new MultiThreadIoEventLoopGroup(maxHttpRequests, NioIoHandler.newFactory());
this.eventLoopGroup = new MultiThreadIoEventLoopGroup(NioIoHandler.newFactory());
HttpClientOptions clientOptions = new HttpClientOptions();
clientOptions.setTracingOptions(tracingOptions);
clientOptions.setApplicationId(applicationId);
httpClient = createAzureHttpClient(connectionProvider, eventLoopGroup, clientOptions);
this.multipart = multipart;
this.concurrencyPolicy = new ConcurrencyLimitHttpPipelinePolicy(maxHttpRequests);
}

@PreDestroy
Expand Down Expand Up @@ -124,7 +127,7 @@ public void destroy()
@Override
public TrinoFileSystem create(ConnectorIdentity identity)
{
return new AzureFileSystem(httpClient, uploadExecutor, tracingOptions, auth, endpoint, readBlockSize, writeBlockSize, maxWriteConcurrency, maxSingleUploadSize, multipart);
return new AzureFileSystem(httpClient, concurrencyPolicy, uploadExecutor, tracingOptions, auth, endpoint, readBlockSize, writeBlockSize, maxWriteConcurrency, maxSingleUploadSize, multipart);
}

public static HttpClient createAzureHttpClient(ConnectionProvider connectionProvider, EventLoopGroup eventLoopGroup, HttpClientOptions clientOptions)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.filesystem.azure;

import com.azure.core.http.HttpPipelineCallContext;
import com.azure.core.http.HttpPipelineNextPolicy;
import com.azure.core.http.HttpResponse;
import com.azure.core.http.policy.HttpPipelinePolicy;
import reactor.core.publisher.Mono;

import java.util.concurrent.Semaphore;

import static com.google.common.base.Verify.verify;

public final class ConcurrencyLimitHttpPipelinePolicy
implements HttpPipelinePolicy
{
private final Semaphore semaphore;

public ConcurrencyLimitHttpPipelinePolicy(int maxHttpRequests)
{
verify(maxHttpRequests > 0, "maxHttpRequests must be greater than 0");
this.semaphore = new Semaphore(maxHttpRequests);
}

@Override
public Mono<HttpResponse> process(HttpPipelineCallContext context, HttpPipelineNextPolicy next)
{
try {
semaphore.acquire();
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
return Mono.error(e);
}
return next.process().doAfterTerminate(semaphore::release);
}
}
Loading