Skip to content

Commit ea712ce

Browse files
committed
Fix Azure http client leak in Azure filesystem
1 parent 07d7a17 commit ea712ce

File tree

5 files changed

+80
-11
lines changed

5 files changed

+80
-11
lines changed

lib/trino-filesystem-azure/pom.xml

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,11 @@
105105
<artifactId>guice</artifactId>
106106
</dependency>
107107

108+
<dependency>
109+
<groupId>com.squareup.okhttp3</groupId>
110+
<artifactId>okhttp</artifactId>
111+
</dependency>
112+
108113
<dependency>
109114
<groupId>io.airlift</groupId>
110115
<artifactId>configuration</artifactId>
@@ -135,6 +140,11 @@
135140
<artifactId>trino-spi</artifactId>
136141
</dependency>
137142

143+
<dependency>
144+
<groupId>jakarta.annotation</groupId>
145+
<artifactId>jakarta.annotation-api</artifactId>
146+
</dependency>
147+
138148
<dependency>
139149
<groupId>jakarta.validation</groupId>
140150
<artifactId>jakarta.validation-api</artifactId>

lib/trino-filesystem-azure/src/main/java/io/trino/filesystem/azure/AzureFileSystemFactory.java

Lines changed: 35 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
package io.trino.filesystem.azure;
1515

1616
import com.azure.core.http.HttpClient;
17-
import com.azure.core.http.okhttp.OkHttpAsyncClientProvider;
17+
import com.azure.core.http.okhttp.OkHttpAsyncHttpClientBuilder;
1818
import com.azure.core.tracing.opentelemetry.OpenTelemetryTracingOptions;
1919
import com.azure.core.util.HttpClientOptions;
2020
import com.azure.core.util.TracingOptions;
@@ -24,6 +24,11 @@
2424
import io.trino.filesystem.TrinoFileSystem;
2525
import io.trino.filesystem.TrinoFileSystemFactory;
2626
import io.trino.spi.security.ConnectorIdentity;
27+
import jakarta.annotation.PreDestroy;
28+
import okhttp3.ConnectionPool;
29+
import okhttp3.OkHttpClient;
30+
31+
import java.util.concurrent.TimeUnit;
2732

2833
import static com.google.common.base.Preconditions.checkArgument;
2934
import static java.util.Objects.requireNonNull;
@@ -37,6 +42,7 @@ public class AzureFileSystemFactory
3742
private final int maxWriteConcurrency;
3843
private final DataSize maxSingleUploadSize;
3944
private final TracingOptions tracingOptions;
45+
private final OkHttpClient okHttpClient;
4046
private final HttpClient httpClient;
4147

4248
@Inject
@@ -65,14 +71,39 @@ public AzureFileSystemFactory(
6571
this.maxWriteConcurrency = maxWriteConcurrency;
6672
this.maxSingleUploadSize = requireNonNull(maxSingleUploadSize, "maxSingleUploadSize is null");
6773
this.tracingOptions = new OpenTelemetryTracingOptions().setOpenTelemetry(openTelemetry);
68-
this.httpClient = HttpClient.createDefault((HttpClientOptions) new HttpClientOptions()
69-
.setHttpClientProvider(OkHttpAsyncClientProvider.class)
70-
.setTracingOptions(tracingOptions));
74+
75+
okHttpClient = new OkHttpClient.Builder().build();
76+
HttpClientOptions clientOptions = new HttpClientOptions();
77+
clientOptions.setTracingOptions(tracingOptions);
78+
httpClient = createAzureHttpClient(okHttpClient, clientOptions);
79+
}
80+
81+
@PreDestroy
82+
public void destroy()
83+
{
84+
okHttpClient.dispatcher().executorService().shutdownNow();
85+
okHttpClient.connectionPool().evictAll();
7186
}
7287

7388
@Override
7489
public TrinoFileSystem create(ConnectorIdentity identity)
7590
{
7691
return new AzureFileSystem(httpClient, tracingOptions, auth, readBlockSize, writeBlockSize, maxWriteConcurrency, maxSingleUploadSize);
7792
}
93+
94+
public static HttpClient createAzureHttpClient(OkHttpClient okHttpClient, HttpClientOptions clientOptions)
95+
{
96+
Integer poolSize = clientOptions.getMaximumConnectionPoolSize();
97+
// By default, OkHttp uses a maximum idle connection count of 5.
98+
int maximumConnectionPoolSize = (poolSize != null && poolSize > 0) ? poolSize : 5;
99+
100+
return new OkHttpAsyncHttpClientBuilder(okHttpClient)
101+
.proxy(clientOptions.getProxyOptions())
102+
.configuration(clientOptions.getConfiguration())
103+
.connectionTimeout(clientOptions.getConnectTimeout())
104+
.writeTimeout(clientOptions.getWriteTimeout())
105+
.readTimeout(clientOptions.getReadTimeout())
106+
.connectionPool(new ConnectionPool(maximumConnectionPoolSize,
107+
clientOptions.getConnectionIdleTimeout().toMillis(), TimeUnit.MILLISECONDS)).build();
108+
}
78109
}

lib/trino-filesystem-azure/src/test/java/io/trino/filesystem/azure/AbstractTestAzureFileSystem.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@ protected enum AccountKind
6161
private String containerName;
6262
private Location rootLocation;
6363
private BlobContainerClient blobContainerClient;
64+
private AzureFileSystemFactory fileSystemFactory;
6465
private TrinoFileSystem fileSystem;
6566

6667
protected void initializeWithAccessKey(String account, String accountKey, AccountKind accountKind)
@@ -100,10 +101,11 @@ private void initialize(String account, AzureAuth azureAuth, AccountKind account
100101
checkState(!isHierarchicalNamespaceEnabled, "Expected hierarchical namespaces to not be enabled for storage account %s and container %s with account kind %s".formatted(account, containerName, accountKind));
101102
}
102103

103-
fileSystem = new AzureFileSystemFactory(
104+
fileSystemFactory = new AzureFileSystemFactory(
104105
OpenTelemetry.noop(),
105106
azureAuth,
106-
new AzureFileSystemConfig()).create(ConnectorIdentity.ofUser("test"));
107+
new AzureFileSystemConfig());
108+
fileSystem = fileSystemFactory.create(ConnectorIdentity.ofUser("test"));
107109

108110
cleanupFiles();
109111
}
@@ -124,6 +126,10 @@ private boolean isHierarchicalNamespaceEnabled()
124126
void tearDown()
125127
{
126128
azureAuth = null;
129+
if (fileSystemFactory != null) {
130+
fileSystemFactory.destroy();
131+
fileSystemFactory = null;
132+
}
127133
fileSystem = null;
128134
if (blobContainerClient != null) {
129135
blobContainerClient.deleteIfExists();

plugin/trino-delta-lake/pom.xml

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -249,6 +249,12 @@
249249
</exclusions>
250250
</dependency>
251251

252+
<dependency>
253+
<groupId>com.squareup.okhttp3</groupId>
254+
<artifactId>okhttp</artifactId>
255+
<scope>runtime</scope>
256+
</dependency>
257+
252258
<dependency>
253259
<groupId>io.airlift</groupId>
254260
<artifactId>log-manager</artifactId>
@@ -261,6 +267,12 @@
261267
<scope>runtime</scope>
262268
</dependency>
263269

270+
<dependency>
271+
<groupId>io.trino</groupId>
272+
<artifactId>trino-filesystem-azure</artifactId>
273+
<scope>runtime</scope>
274+
</dependency>
275+
264276
<dependency>
265277
<groupId>org.jetbrains</groupId>
266278
<artifactId>annotations</artifactId>

plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeAdlsConnectorSmokeTest.java

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
*/
1414
package io.trino.plugin.deltalake;
1515

16+
import com.azure.core.util.HttpClientOptions;
1617
import com.azure.storage.blob.BlobContainerClient;
1718
import com.azure.storage.blob.BlobServiceClient;
1819
import com.azure.storage.blob.BlobServiceClientBuilder;
@@ -24,6 +25,7 @@
2425
import com.google.common.reflect.ClassPath;
2526
import io.trino.plugin.hive.containers.HiveHadoop;
2627
import io.trino.testing.QueryRunner;
28+
import okhttp3.OkHttpClient;
2729
import org.junit.jupiter.api.AfterAll;
2830
import org.junit.jupiter.api.TestInstance;
2931

@@ -42,6 +44,7 @@
4244

4345
import static com.google.common.collect.ImmutableList.toImmutableList;
4446
import static com.google.common.collect.ImmutableSet.toImmutableSet;
47+
import static io.trino.filesystem.azure.AzureFileSystemFactory.createAzureHttpClient;
4548
import static io.trino.plugin.hive.containers.HiveHadoop.HIVE3_IMAGE;
4649
import static io.trino.testing.TestingProperties.requiredNonEmptySystemProperty;
4750
import static java.lang.String.format;
@@ -58,25 +61,32 @@ public class TestDeltaLakeAdlsConnectorSmokeTest
5861
private final String container;
5962
private final String account;
6063
private final String accessKey;
61-
private final BlobContainerClient azureContainerClient;
6264
private final String adlsDirectory;
65+
private BlobContainerClient azureContainerClient;
6366

6467
public TestDeltaLakeAdlsConnectorSmokeTest()
6568
{
6669
this.container = requiredNonEmptySystemProperty("testing.azure-abfs-container");
6770
this.account = requiredNonEmptySystemProperty("testing.azure-abfs-account");
6871
this.accessKey = requiredNonEmptySystemProperty("testing.azure-abfs-access-key");
69-
70-
String connectionString = format("DefaultEndpointsProtocol=https;AccountName=%s;AccountKey=%s;EndpointSuffix=core.windows.net", account, accessKey);
71-
BlobServiceClient blobServiceClient = new BlobServiceClientBuilder().connectionString(connectionString).buildClient();
72-
this.azureContainerClient = blobServiceClient.getBlobContainerClient(container);
7372
this.adlsDirectory = format("abfs://%s@%s.dfs.core.windows.net/%s/", container, account, bucketName);
7473
}
7574

7675
@Override
7776
protected HiveHadoop createHiveHadoop()
7877
throws Exception
7978
{
79+
String connectionString = format("DefaultEndpointsProtocol=https;AccountName=%s;AccountKey=%s;EndpointSuffix=core.windows.net", account, accessKey);
80+
OkHttpClient okHttpClient = new OkHttpClient.Builder().build();
81+
closeAfterClass(() -> {
82+
okHttpClient.dispatcher().executorService().shutdownNow();
83+
okHttpClient.connectionPool().evictAll();
84+
});
85+
BlobServiceClient blobServiceClient = new BlobServiceClientBuilder().connectionString(connectionString)
86+
.httpClient(createAzureHttpClient(okHttpClient, new HttpClientOptions()))
87+
.buildClient();
88+
this.azureContainerClient = blobServiceClient.getBlobContainerClient(container);
89+
8090
String abfsSpecificCoreSiteXmlContent = Resources.toString(Resources.getResource("io/trino/plugin/deltalake/hdp3.1-core-site.xml.abfs-template"), UTF_8)
8191
.replace("%ABFS_ACCESS_KEY%", accessKey)
8292
.replace("%ABFS_ACCOUNT%", account);

0 commit comments

Comments
 (0)