From 1dad672faf42fbfb0ada2a95456b922a50b2becf Mon Sep 17 00:00:00 2001 From: bhattmanish98 Date: Sun, 14 Sep 2025 22:35:50 -0700 Subject: [PATCH 1/4] HADOOP-19672 Nework switchover when apache client throw error --- .../constants/FileSystemConfigurations.java | 4 +- .../services/AbfsApacheHttpClient.java | 18 +++- .../fs/azurebfs/services/AbfsBlobClient.java | 5 +- .../fs/azurebfs/services/AbfsClient.java | 39 ++++++--- .../azurebfs/services/AbfsClientHandler.java | 2 +- .../services/AbfsConnectionManager.java | 38 +++++--- .../fs/azurebfs/services/AbfsDfsClient.java | 5 +- .../azurebfs/services/AbfsRestOperation.java | 3 - .../azurebfs/ITestWasbAbfsCompatibility.java | 2 + .../fs/azurebfs/services/ITestAbfsClient.java | 86 +++++++++++++++---- .../ITestApacheClientConnectionPool.java | 35 ++++++++ 11 files changed, 180 insertions(+), 57 deletions(-) diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java index 58b1512ac3e9c..59727cac53c91 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java @@ -214,7 +214,7 @@ public final class FileSystemConfigurations { public static final long THOUSAND = 1000L; public static final HttpOperationType DEFAULT_NETWORKING_LIBRARY - = HttpOperationType.JDK_HTTP_URL_CONNECTION; + = HttpOperationType.APACHE_HTTP_CLIENT; public static final int DEFAULT_APACHE_HTTP_CLIENT_MAX_IO_EXCEPTION_RETRIES = 3; @@ -228,7 +228,7 @@ public final class FileSystemConfigurations { public static final int MAX_APACHE_HTTP_CLIENT_CACHE_WARMUP_COUNT = 5; - public static final int DEFAULT_APACHE_HTTP_CLIENT_CACHE_REFRESH_COUNT = 5; + public static final int DEFAULT_APACHE_HTTP_CLIENT_CACHE_REFRESH_COUNT = 3; public static final int MAX_APACHE_HTTP_CLIENT_CACHE_REFRESH_COUNT = 5; diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsApacheHttpClient.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsApacheHttpClient.java index 1d22ae52cde6f..0dffead84245c 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsApacheHttpClient.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsApacheHttpClient.java @@ -65,6 +65,13 @@ static void registerFallback() { usable = false; } + /** + * In case, getting success response from apache client, sets the usable flag to true. + */ + static void setUsable() { + usable = true; + } + /** * @return if ApacheHttpClient is usable. */ @@ -73,18 +80,21 @@ static boolean usable() { } AbfsApacheHttpClient(DelegatingSSLSocketFactory delegatingSSLSocketFactory, - final AbfsConfiguration abfsConfiguration, final KeepAliveCache keepAliveCache, - URL baseUrl) { + final AbfsConfiguration abfsConfiguration, + final KeepAliveCache keepAliveCache, + URL baseUrl, + final boolean isCacheWarmupNeeded) { final AbfsConnectionManager connMgr = new AbfsConnectionManager( createSocketFactoryRegistry( new SSLConnectionSocketFactory(delegatingSSLSocketFactory, getDefaultHostnameVerifier())), new AbfsHttpClientConnectionFactory(), keepAliveCache, - abfsConfiguration, baseUrl); + abfsConfiguration, baseUrl, isCacheWarmupNeeded); final HttpClientBuilder builder = HttpClients.custom(); builder.setConnectionManager(connMgr) .setRequestExecutor( - new AbfsManagedHttpRequestExecutor(abfsConfiguration.getHttpReadTimeout())) + new AbfsManagedHttpRequestExecutor( + abfsConfiguration.getHttpReadTimeout())) .disableContentCompression() .disableRedirectHandling() .disableAutomaticRetries() diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsBlobClient.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsBlobClient.java index 77aca70990ff3..d6ae0427b23b9 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsBlobClient.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsBlobClient.java @@ -59,6 +59,7 @@ import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystemStore; import org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants; import org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.ApiVersion; +import org.apache.hadoop.fs.azurebfs.constants.AbfsServiceType; import org.apache.hadoop.fs.azurebfs.constants.FSOperationType; import org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations; import org.apache.hadoop.fs.azurebfs.constants.HttpQueryParams; @@ -188,7 +189,7 @@ public AbfsBlobClient(final URL baseUrl, final EncryptionContextProvider encryptionContextProvider, final AbfsClientContext abfsClientContext) throws IOException { super(baseUrl, sharedKeyCredentials, abfsConfiguration, tokenProvider, - encryptionContextProvider, abfsClientContext); + encryptionContextProvider, abfsClientContext, AbfsServiceType.BLOB); this.azureAtomicRenameDirSet = new HashSet<>(Arrays.asList( abfsConfiguration.getAzureAtomicRenameDirs() .split(AbfsHttpConstants.COMMA))); @@ -201,7 +202,7 @@ public AbfsBlobClient(final URL baseUrl, final EncryptionContextProvider encryptionContextProvider, final AbfsClientContext abfsClientContext) throws IOException { super(baseUrl, sharedKeyCredentials, abfsConfiguration, sasTokenProvider, - encryptionContextProvider, abfsClientContext); + encryptionContextProvider, abfsClientContext, AbfsServiceType.BLOB); this.azureAtomicRenameDirSet = new HashSet<>(Arrays.asList( abfsConfiguration.getAzureAtomicRenameDirs() .split(AbfsHttpConstants.COMMA))); diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java index ad17c1bfc20a5..47fff8e7d4c71 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java @@ -55,9 +55,11 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.azurebfs.AbfsConfiguration; +import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystemStore; import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystemStore.Permissions; import org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants; import org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.ApiVersion; +import org.apache.hadoop.fs.azurebfs.constants.AbfsServiceType; import org.apache.hadoop.fs.azurebfs.constants.FSOperationType; import org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations; import org.apache.hadoop.fs.azurebfs.constants.HttpOperationType; @@ -199,6 +201,8 @@ public abstract class AbfsClient implements Closeable { private AbfsApacheHttpClient abfsApacheHttpClient; + private AbfsServiceType abfsServiceType; + /** * logging the rename failure if metadata is in an incomplete state. */ @@ -208,7 +212,8 @@ private AbfsClient(final URL baseUrl, final SharedKeyCredentials sharedKeyCredentials, final AbfsConfiguration abfsConfiguration, final EncryptionContextProvider encryptionContextProvider, - final AbfsClientContext abfsClientContext) throws IOException { + final AbfsClientContext abfsClientContext, + final AbfsServiceType abfsServiceType) throws IOException { this.baseUrl = baseUrl; this.sharedKeyCredentials = sharedKeyCredentials; String baseUrlString = baseUrl.toString(); @@ -220,6 +225,7 @@ private AbfsClient(final URL baseUrl, this.authType = abfsConfiguration.getAuthType(accountName); this.intercept = AbfsThrottlingInterceptFactory.getInstance(accountName, abfsConfiguration); this.renameResilience = abfsConfiguration.getRenameResilience(); + this.abfsServiceType = abfsServiceType; if (encryptionContextProvider != null) { this.encryptionContextProvider = encryptionContextProvider; @@ -254,7 +260,8 @@ private AbfsClient(final URL baseUrl, abfsApacheHttpClient = new AbfsApacheHttpClient( DelegatingSSLSocketFactory.getDefaultFactory(), - abfsConfiguration, keepAliveCache, baseUrl); + abfsConfiguration, keepAliveCache, baseUrl, + abfsConfiguration.getFsConfiguredServiceType() == abfsServiceType); } this.userAgent = initializeUserAgent(abfsConfiguration, sslProviderName); @@ -328,25 +335,29 @@ private AbfsClient(final URL baseUrl, LOG.trace("primaryUserGroup is {}", this.primaryUserGroup); } - public AbfsClient(final URL baseUrl, final SharedKeyCredentials sharedKeyCredentials, - final AbfsConfiguration abfsConfiguration, - final AccessTokenProvider tokenProvider, - final EncryptionContextProvider encryptionContextProvider, - final AbfsClientContext abfsClientContext) + public AbfsClient(final URL baseUrl, + final SharedKeyCredentials sharedKeyCredentials, + final AbfsConfiguration abfsConfiguration, + final AccessTokenProvider tokenProvider, + final EncryptionContextProvider encryptionContextProvider, + final AbfsClientContext abfsClientContext, + final AbfsServiceType abfsServiceType) throws IOException { this(baseUrl, sharedKeyCredentials, abfsConfiguration, - encryptionContextProvider, abfsClientContext); + encryptionContextProvider, abfsClientContext, abfsServiceType); this.tokenProvider = tokenProvider; } - public AbfsClient(final URL baseUrl, final SharedKeyCredentials sharedKeyCredentials, - final AbfsConfiguration abfsConfiguration, - final SASTokenProvider sasTokenProvider, - final EncryptionContextProvider encryptionContextProvider, - final AbfsClientContext abfsClientContext) + public AbfsClient(final URL baseUrl, + final SharedKeyCredentials sharedKeyCredentials, + final AbfsConfiguration abfsConfiguration, + final SASTokenProvider sasTokenProvider, + final EncryptionContextProvider encryptionContextProvider, + final AbfsClientContext abfsClientContext, + final AbfsServiceType abfsServiceType) throws IOException { this(baseUrl, sharedKeyCredentials, abfsConfiguration, - encryptionContextProvider, abfsClientContext); + encryptionContextProvider, abfsClientContext, abfsServiceType); this.sasTokenProvider = sasTokenProvider; } diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientHandler.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientHandler.java index f15b0b5326c90..0211b8bd52307 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientHandler.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientHandler.java @@ -68,13 +68,13 @@ public AbfsClientHandler(final URL baseUrl, final SASTokenProvider sasTokenProvider, final EncryptionContextProvider encryptionContextProvider, final AbfsClientContext abfsClientContext) throws IOException { + initServiceType(abfsConfiguration); this.dfsAbfsClient = createDfsClient(baseUrl, sharedKeyCredentials, abfsConfiguration, null, sasTokenProvider, encryptionContextProvider, abfsClientContext); this.blobAbfsClient = createBlobClient(baseUrl, sharedKeyCredentials, abfsConfiguration, null, sasTokenProvider, encryptionContextProvider, abfsClientContext); - initServiceType(abfsConfiguration); } /** diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsConnectionManager.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsConnectionManager.java index 16697fa838a45..4a15fa57af245 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsConnectionManager.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsConnectionManager.java @@ -23,6 +23,7 @@ import java.util.UUID; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; @@ -91,26 +92,34 @@ class AbfsConnectionManager implements HttpClientConnectionManager { /** * The base host for which connections are managed. */ - private HttpHost baseHost; + private final HttpHost baseHost; AbfsConnectionManager(Registry socketFactoryRegistry, - AbfsHttpClientConnectionFactory connectionFactory, KeepAliveCache kac, - final AbfsConfiguration abfsConfiguration, final URL baseUrl) { + AbfsHttpClientConnectionFactory connectionFactory, + KeepAliveCache kac, + final AbfsConfiguration abfsConfiguration, + final URL baseUrl, + final boolean isCacheWarmupNeeded) { this.httpConnectionFactory = connectionFactory; this.kac = kac; this.connectionOperator = new DefaultHttpClientConnectionOperator( socketFactoryRegistry, null, null); this.abfsConfiguration = abfsConfiguration; - if (abfsConfiguration.getApacheCacheWarmupCount() > 0 + this.baseHost = new HttpHost(baseUrl.getHost(), + baseUrl.getDefaultPort(), baseUrl.getProtocol()); + if (isCacheWarmupNeeded && abfsConfiguration.getApacheCacheWarmupCount() > 0 && kac.getFixedThreadPool() != null) { // Warm up the cache with connections. LOG.debug("Warming up the KeepAliveCache with {} connections", abfsConfiguration.getApacheCacheWarmupCount()); - this.baseHost = new HttpHost(baseUrl.getHost(), - baseUrl.getDefaultPort(), baseUrl.getProtocol()); HttpRoute route = new HttpRoute(baseHost, null, true); - cacheExtraConnection(route, + int totalConnectionsCreated = cacheExtraConnection(route, abfsConfiguration.getApacheCacheWarmupCount()); + if (totalConnectionsCreated == 0) { + AbfsApacheHttpClient.registerFallback(); + } else { + AbfsApacheHttpClient.setUsable(); + } } } @@ -271,7 +280,7 @@ public void connect(final HttpClientConnection conn, LOG.debug("Connection established: {}", conn); if (context instanceof AbfsManagedHttpClientContext) { ((AbfsManagedHttpClientContext) context).setConnectTime( - TimeUnit.MILLISECONDS.toMillis(System.nanoTime() - start)); + TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start)); } } @@ -318,8 +327,9 @@ public void shutdown() { * @param route the HTTP route for which connections are created * @param numberOfConnections the number of connections to create */ - private void cacheExtraConnection(final HttpRoute route, + private int cacheExtraConnection(final HttpRoute route, final int numberOfConnections) { + AtomicInteger totalConnectionCreated = new AtomicInteger(0); if (!isCacheRefreshInProgress.getAndSet(true)) { long start = System.nanoTime(); CountDownLatch latch = new CountDownLatch(numberOfConnections); @@ -333,6 +343,7 @@ private void cacheExtraConnection(final HttpRoute route, connect(conn, route, abfsConfiguration.getHttpConnectionTimeout(), new AbfsManagedHttpClientContext()); addConnectionToCache(conn); + totalConnectionCreated.incrementAndGet(); } catch (Exception e) { LOG.debug("Error creating connection: {}", e.getMessage()); if (conn != null) { @@ -350,7 +361,7 @@ private void cacheExtraConnection(final HttpRoute route, } catch (RejectedExecutionException e) { LOG.debug("Task rejected for connection creation: {}", e.getMessage()); - return; + return 0; } } @@ -370,6 +381,7 @@ private void cacheExtraConnection(final HttpRoute route, elapsedTimeMillis(start)); } } + return totalConnectionCreated.get(); } /** @@ -383,10 +395,10 @@ private void addConnectionToCache(HttpClientConnection conn) { if (((AbfsManagedApacheHttpConnection) conn).getTargetHost() .equals(baseHost)) { boolean connAddedInKac = kac.add(conn); - synchronized (connectionLock) { - connectionLock.notify(); // wake up one thread only - } if (connAddedInKac) { + synchronized (connectionLock) { + connectionLock.notify(); // wake up one thread only + } LOG.debug("Connection cached: {}", conn); } else { LOG.debug("Connection not cached, and is released: {}", conn); diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsDfsClient.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsDfsClient.java index f88df14c3423a..f574f4704ab5c 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsDfsClient.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsDfsClient.java @@ -53,6 +53,7 @@ import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystemStore; import org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants; import org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.ApiVersion; +import org.apache.hadoop.fs.azurebfs.constants.AbfsServiceType; import org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations; import org.apache.hadoop.fs.azurebfs.constants.HttpQueryParams; import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsDriverException; @@ -160,7 +161,7 @@ public AbfsDfsClient(final URL baseUrl, final EncryptionContextProvider encryptionContextProvider, final AbfsClientContext abfsClientContext) throws IOException { super(baseUrl, sharedKeyCredentials, abfsConfiguration, tokenProvider, - encryptionContextProvider, abfsClientContext); + encryptionContextProvider, abfsClientContext, AbfsServiceType.DFS); } public AbfsDfsClient(final URL baseUrl, @@ -170,7 +171,7 @@ public AbfsDfsClient(final URL baseUrl, final EncryptionContextProvider encryptionContextProvider, final AbfsClientContext abfsClientContext) throws IOException { super(baseUrl, sharedKeyCredentials, abfsConfiguration, sasTokenProvider, - encryptionContextProvider, abfsClientContext); + encryptionContextProvider, abfsClientContext, AbfsServiceType.DFS); } /** diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRestOperation.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRestOperation.java index c019fcbc3d3a7..73bdc72e78b8e 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRestOperation.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRestOperation.java @@ -487,9 +487,6 @@ private boolean executeHttpOperation(final int retryCount, retryPolicy = client.getRetryPolicy(failureReason); LOG.warn("Unknown host name: {}. Retrying to resolve the host name...", hostname); - if (httpOperation instanceof AbfsAHCHttpOperation) { - registerApacheHttpClientIoException(); - } if (abfsBackoffMetrics != null) { synchronized (this) { abfsBackoffMetrics.incrementMetricValue(NUMBER_OF_NETWORK_FAILED_REQUESTS); diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestWasbAbfsCompatibility.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestWasbAbfsCompatibility.java index 967bf6272ab2b..d1ab0b3a1f8c7 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestWasbAbfsCompatibility.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestWasbAbfsCompatibility.java @@ -69,6 +69,8 @@ public class ITestWasbAbfsCompatibility extends AbstractAbfsIntegrationTest { LoggerFactory.getLogger(ITestWasbAbfsCompatibility.class); public ITestWasbAbfsCompatibility() throws Exception { + // To ensure the wasb and abfs filesystem are initialized. + super.setup(); assumeThat(isIPAddress()).as("Emulator is not supported").isFalse(); assumeHnsDisabled(); assumeBlobServiceType(); diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsClient.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsClient.java index 14a8ca283c804..8505f5f3266f9 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsClient.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsClient.java @@ -66,6 +66,8 @@ import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_PATCH; import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_PUT; import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HUNDRED_CONTINUE; +import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.AZURE_CREATE_REMOTE_FILESYSTEM_DURING_INITIALIZATION; +import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_ACCOUNT_IS_HNS_ENABLED; import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_APACHE_HTTP_CLIENT_CACHE_WARMUP_COUNT; import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_CLUSTER_NAME; import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_CLUSTER_TYPE; @@ -890,8 +892,7 @@ public void testKeepAliveCacheInitializationWithApacheHttpClient() throws Except AbfsClient dfsClient = abfsClientHandler.getDfsClient(); AbfsClient blobClient = abfsClientHandler.getBlobClient(); - checkKacOnBothClientsAfterFSInit(dfsClient); - checkKacOnBothClientsAfterFSInit(blobClient); + checkKacState(dfsClient, blobClient); } /** @@ -917,9 +918,7 @@ public void testStaleConnectionBehavior() throws Exception { AbfsClient dfsClient = abfsClientHandler.getDfsClient(); AbfsClient blobClient = abfsClientHandler.getBlobClient(); - checkKacOnBothClientsAfterFSInit(dfsClient); - checkKacOnBothClientsAfterFSInit(blobClient); - + checkKacState(dfsClient, blobClient); // Wait for 5 minutes to make the cached connections stale // This will ensure all the connections in the KeepAliveCache are stale // and will be removed by the Apache HttpClient's KeepAliveStrategy. @@ -949,11 +948,13 @@ public void testApacheConnectionReuse() throws Exception { AbfsClient dfsClient = abfsClientHandler.getDfsClient(); AbfsClient blobClient = abfsClientHandler.getBlobClient(); - checkKacOnBothClientsAfterFSInit(dfsClient); - checkKacOnBothClientsAfterFSInit(blobClient); + checkKacState(dfsClient, blobClient); - checkConnectionReuse(dfsClient); - checkConnectionReuse(blobClient); + if (getAbfsServiceType() == AbfsServiceType.DFS) { + checkConnectionReuse(dfsClient); + } else { + checkConnectionReuse(blobClient); + } } /** @@ -969,8 +970,8 @@ public void testConnectionNotReusedOnIOException() throws Exception { AzureBlobFileSystem fs = this.getFileSystem(); AbfsClientHandler abfsClientHandler = fs.getAbfsStore().getClientHandler(); - AbfsClient dfsClient = abfsClientHandler.getDfsClient(); - KeepAliveCache keepAliveCache = dfsClient.getKeepAliveCache(); + AbfsClient client = abfsClientHandler.getClient(); + KeepAliveCache keepAliveCache = client.getKeepAliveCache(); HttpClientConnection connection = keepAliveCache.pollFirst(); Assertions.assertThat(connection) @@ -988,7 +989,7 @@ public void testConnectionNotReusedOnIOException() throws Exception { // First list call fail with IOException exception and that connection will not be reused. // Subsequent retry call will use a new connection from the cache. - dfsClient.listPath("/", false, 1, + client.listPath("/", false, 1, null, getTestTracingContext(fs, true), null); // After the failed operation, connection should NOT be reused @@ -1019,11 +1020,15 @@ public void testNumberOfConnectionsInKacWithoutWarmup() throws Exception { AzureBlobFileSystem fs = this.getFileSystem(); final Configuration configuration = fs.getConf(); configuration.setInt(FS_AZURE_APACHE_HTTP_CLIENT_CACHE_WARMUP_COUNT, 0); + // To avoid any network calls during FS initialization + configuration.setBoolean(FS_AZURE_ACCOUNT_IS_HNS_ENABLED, false); + configuration.setBoolean(AZURE_CREATE_REMOTE_FILESYSTEM_DURING_INITIALIZATION, false); fs = this.getFileSystem(configuration); AbfsClient dfsClient = fs.getAbfsStore().getClientHandler().getDfsClient(); AbfsClient blobClient = fs.getAbfsStore().getClientHandler().getBlobClient(); + // In case cache is not warmed up Assertions.assertThat(dfsClient.getKeepAliveCache().size()) .describedAs("KeepAliveCache will be empty when warmup count is set to 0") .isEqualTo(0); @@ -1032,13 +1037,32 @@ public void testNumberOfConnectionsInKacWithoutWarmup() throws Exception { .isEqualTo(0); } + /** + * Helper method to check the KeepAliveCache on both clients based on the + * configured service type. + * @param dfsClient AbfsClient instance for DFS endpoint + * @param blobClient AbfsClient instance for Blob endpoint + * + * @throws IOException if an error occurs while checking the cache + */ + private void checkKacState(AbfsClient dfsClient, AbfsClient blobClient) + throws IOException { + if (getAbfsServiceType() == AbfsServiceType.DFS) { + checkKacOnDefaultClientsAfterFSInit(dfsClient); + checkKacOnNonDefaultClientsAfterFSInit(blobClient); + } else { + checkKacOnDefaultClientsAfterFSInit(blobClient); + checkKacOnNonDefaultClientsAfterFSInit(dfsClient); + } + } + /** * Helper method to check the KeepAliveCache on both clients. * @param abfsClient AbfsClient instance to check * * @throws IOException if an error occurs while checking the cache */ - private void checkKacOnBothClientsAfterFSInit(AbfsClient abfsClient) throws IOException { + private void checkKacOnDefaultClientsAfterFSInit(AbfsClient abfsClient) throws IOException { AbfsApacheHttpClient abfsApacheHttpClient = abfsClient.getAbfsApacheHttpClient(); Assertions.assertThat(abfsApacheHttpClient) .describedAs("AbfsApacheHttpClient should not be null") @@ -1062,6 +1086,36 @@ private void checkKacOnBothClientsAfterFSInit(AbfsClient abfsClient) throws IOEx .isEqualTo(this.getConfiguration().getApacheCacheWarmupCount() - 1); } + /** + * Helper method to check the KeepAliveCache on both clients. + * @param abfsClient AbfsClient instance to check + * + * @throws IOException if an error occurs while checking the cache + */ + private void checkKacOnNonDefaultClientsAfterFSInit(AbfsClient abfsClient) throws IOException { + AbfsApacheHttpClient abfsApacheHttpClient = abfsClient.getAbfsApacheHttpClient(); + Assertions.assertThat(abfsApacheHttpClient) + .describedAs("AbfsApacheHttpClient should not be null") + .isNotNull(); + + KeepAliveCache keepAliveCache = abfsClient.getKeepAliveCache(); + + Assertions.assertThat(keepAliveCache.size()) + .describedAs("KeepAliveCache size should be 0 as non-default clients do not warmup") + .isEqualTo(0); + + Assertions.assertThat(keepAliveCache.get()) + .describedAs("KeepAliveCache should be null") + .isNull(); + + // 1 connection is taken in above get call, so size should be + // DEFAULT_APACHE_CACHE_WARMUP_CONNECTION_COUNT - 1 + // after the get call. + Assertions.assertThat(keepAliveCache.size()) + .describedAs("KeepAliveCache size should be 0 as no new connection is added") + .isEqualTo(0); + } + /** * Helper method to check the KeepAliveCache after making connections stale. * @param abfsClient AbfsClient instance to check @@ -1088,10 +1142,10 @@ private void checkKacAfterMakingConnectionsStale(AbfsClient abfsClient) * @throws IOException if an error occurs while checking the cache */ private void checkConnectionReuse(AbfsClient abfsClient) throws IOException { - KeepAliveCache dfsKeepAliveCache = abfsClient.getKeepAliveCache(); + KeepAliveCache keepAliveCache = abfsClient.getKeepAliveCache(); for (int i = 0; i < this.getConfiguration().getApacheCacheWarmupCount(); i++) { // Check first connection in the cache before the operation - HttpClientConnection connection = dfsKeepAliveCache.peekFirst(); + HttpClientConnection connection = keepAliveCache.peekFirst(); // Perform a list operation to reuse the connection // This will use the first connection in the cache. abfsClient.listPath("/", false, 1, @@ -1099,7 +1153,7 @@ private void checkConnectionReuse(AbfsClient abfsClient) throws IOException { // After the operation, the connection should be kept back in the last position Assertions.assertThat(connection) .describedAs("Connection will be put back to the cache for reuse.") - .isEqualTo(dfsKeepAliveCache.peekLast()); + .isEqualTo(keepAliveCache.peekLast()); } } } diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestApacheClientConnectionPool.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestApacheClientConnectionPool.java index 05313b52172b9..abb54008ed380 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestApacheClientConnectionPool.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestApacheClientConnectionPool.java @@ -19,6 +19,7 @@ package org.apache.hadoop.fs.azurebfs.services; import java.io.IOException; +import java.net.URL; import java.util.Map; import org.assertj.core.api.Assertions; @@ -28,6 +29,7 @@ import org.apache.hadoop.fs.ClosedIOException; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.azurebfs.AbfsConfiguration; import org.apache.hadoop.fs.azurebfs.AbstractAbfsIntegrationTest; import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem; import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsDriverException; @@ -44,6 +46,7 @@ import org.apache.http.impl.conn.DefaultHttpClientConnectionOperator; import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.COLON; +import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.EMPTY_STRING; import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.KEEP_ALIVE_CACHE_CLOSED; import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_METRIC_FORMAT; import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_NETWORKING_LIBRARY; @@ -69,6 +72,7 @@ public void testKacIsClosed() throws Throwable { Configuration configuration = new Configuration(getRawConfiguration()); configuration.set(FS_AZURE_NETWORKING_LIBRARY, APACHE_HTTP_CLIENT.name()); configuration.unset(FS_AZURE_METRIC_FORMAT); + AbfsApacheHttpClient.setUsable(); try (AzureBlobFileSystem fs = (AzureBlobFileSystem) FileSystem.newInstance( configuration)) { KeepAliveCache kac = fs.getAbfsStore().getClientHandler().getIngressClient() @@ -118,6 +122,37 @@ public void testConnectedConnectionLogging() throws Exception { .isEqualTo(4); } + /** + * Test to verify that the ApacheHttpClient falls back to JDK client + * when connection warmup fails. + * This test is applicable only for ApacheHttpClient. + */ + @Test + public void testApacheClientFallbackDuringConnectionWarmup() + throws Exception { + try (KeepAliveCache keepAliveCache = new KeepAliveCache( + new AbfsConfiguration(new Configuration(), EMPTY_STRING))) { + // Create a connection manager with invalid URL to force fallback to JDK client + // during connection warmup. + // This is to simulate failure during connection warmup in the connection manager. + // The invalid URL will cause the connection manager to fail to create connections + // during warmup, forcing it to fall back to JDK client. + final AbfsConnectionManager connMgr = new AbfsConnectionManager( + RegistryBuilder.create() + .register(HTTPS_SCHEME, new SSLConnectionSocketFactory( + DelegatingSSLSocketFactory.getDefaultFactory(), + getDefaultHostnameVerifier())) + .build(), + new AbfsHttpClientConnectionFactory(), keepAliveCache, + new AbfsConfiguration(new Configuration(), EMPTY_STRING), + new URL("https://test.com"), true); + + Assertions.assertThat(AbfsApacheHttpClient.usable()) + .describedAs("Apache HttpClient should be not usable") + .isFalse(); + } + } + private Map.Entry getTestConnection() throws IOException { HttpHost host = new HttpHost(getFileSystem().getUri().getHost(), From 6faa4c9c8ad8083e0a636b6439146a24c66c0ce3 Mon Sep 17 00:00:00 2001 From: bhattmanish98 Date: Mon, 15 Sep 2025 11:20:04 -0700 Subject: [PATCH 2/4] Remove unused import --- .../org/apache/hadoop/fs/azurebfs/services/AbfsClient.java | 1 - .../apache/hadoop/fs/azurebfs/services/AbfsRestOperation.java | 3 +++ .../fs/azurebfs/services/ITestApacheClientConnectionPool.java | 2 +- 3 files changed, 4 insertions(+), 2 deletions(-) diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java index 47fff8e7d4c71..6bc174e291a84 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java @@ -55,7 +55,6 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.azurebfs.AbfsConfiguration; -import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystemStore; import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystemStore.Permissions; import org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants; import org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.ApiVersion; diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRestOperation.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRestOperation.java index 73bdc72e78b8e..c019fcbc3d3a7 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRestOperation.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRestOperation.java @@ -487,6 +487,9 @@ private boolean executeHttpOperation(final int retryCount, retryPolicy = client.getRetryPolicy(failureReason); LOG.warn("Unknown host name: {}. Retrying to resolve the host name...", hostname); + if (httpOperation instanceof AbfsAHCHttpOperation) { + registerApacheHttpClientIoException(); + } if (abfsBackoffMetrics != null) { synchronized (this) { abfsBackoffMetrics.incrementMetricValue(NUMBER_OF_NETWORK_FAILED_REQUESTS); diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestApacheClientConnectionPool.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestApacheClientConnectionPool.java index abb54008ed380..c16ec2888a2c1 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestApacheClientConnectionPool.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestApacheClientConnectionPool.java @@ -72,7 +72,6 @@ public void testKacIsClosed() throws Throwable { Configuration configuration = new Configuration(getRawConfiguration()); configuration.set(FS_AZURE_NETWORKING_LIBRARY, APACHE_HTTP_CLIENT.name()); configuration.unset(FS_AZURE_METRIC_FORMAT); - AbfsApacheHttpClient.setUsable(); try (AzureBlobFileSystem fs = (AzureBlobFileSystem) FileSystem.newInstance( configuration)) { KeepAliveCache kac = fs.getAbfsStore().getClientHandler().getIngressClient() @@ -150,6 +149,7 @@ public void testApacheClientFallbackDuringConnectionWarmup() Assertions.assertThat(AbfsApacheHttpClient.usable()) .describedAs("Apache HttpClient should be not usable") .isFalse(); + AbfsApacheHttpClient.setUsable(); } } From 44765c0ae41d29a78198cb113bc06780be7092af Mon Sep 17 00:00:00 2001 From: bhattmanish98 Date: Tue, 7 Oct 2025 10:40:14 -0700 Subject: [PATCH 3/4] Addressed comments --- .../hadoop/fs/azurebfs/AbfsConfiguration.java | 8 +++++ .../azurebfs/constants/ConfigurationKeys.java | 7 +++++ .../constants/FileSystemConfigurations.java | 6 ++++ .../services/AbfsApacheHttpClient.java | 8 ++++- .../fs/azurebfs/services/AbfsClient.java | 3 ++ .../azurebfs/services/AbfsClientHandler.java | 3 ++ .../services/AbfsConnectionManager.java | 2 +- .../hadoop-azure/src/site/markdown/index.md | 4 +-- .../ITestApacheClientConnectionPool.java | 30 ++++++++++++++++++- 9 files changed, 66 insertions(+), 5 deletions(-) diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java index 0c173238c34c8..7c355671cf8b2 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java @@ -203,6 +203,10 @@ public class AbfsConfiguration{ DefaultValue = DEFAULT_HTTP_READ_TIMEOUT) private int httpReadTimeout; + @IntegerConfigurationValidatorAnnotation(ConfigurationKey = AZURE_EXPECT_100CONTINUE_WAIT_TIMEOUT, + DefaultValue = DEFAULT_EXPECT_100CONTINUE_WAIT_TIMEOUT) + private int expect100ContinueWaitTimeout; + @IntegerConfigurationValidatorAnnotation(ConfigurationKey = AZURE_OAUTH_TOKEN_FETCH_RETRY_COUNT, MinValue = 0, DefaultValue = DEFAULT_AZURE_OAUTH_TOKEN_FETCH_RETRY_MAX_ATTEMPTS) @@ -1033,6 +1037,10 @@ public int getHttpReadTimeout() { return this.httpReadTimeout; } + public int getExpect100ContinueWaitTimeout() { + return this.expect100ContinueWaitTimeout; + } + public long getAzureBlockSize() { return this.azureBlockSize; } diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java index 677473591348e..9afb37e35c770 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java @@ -97,6 +97,13 @@ public final class ConfigurationKeys { */ public static final String AZURE_HTTP_READ_TIMEOUT = "fs.azure.http.read.timeout"; + /** + * Config to set HTTP Expect100-Continue Wait Timeout Value for Rest Operations. + * Value: {@value}. + */ + public static final String AZURE_EXPECT_100CONTINUE_WAIT_TIMEOUT + = "fs.azure.http.expect.100continue.wait.timeout"; + // Retry strategy for getToken calls public static final String AZURE_OAUTH_TOKEN_FETCH_RETRY_COUNT = "fs.azure.oauth.token.fetch.retry.max.retries"; public static final String AZURE_OAUTH_TOKEN_FETCH_RETRY_MIN_BACKOFF = "fs.azure.oauth.token.fetch.retry.min.backoff.interval"; diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java index 59727cac53c91..83f636bf1d131 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java @@ -58,6 +58,12 @@ public final class FileSystemConfigurations { */ public static final int DEFAULT_HTTP_READ_TIMEOUT = 30_000; // 30 secs + /** + * Default value of connection request timeout to be used when 100continue is enabled. + * Value: {@value}. + */ + public static final int DEFAULT_EXPECT_100CONTINUE_WAIT_TIMEOUT = 3_000; // 3s + // Retry parameter defaults. public static final int DEFAULT_AZURE_OAUTH_TOKEN_FETCH_RETRY_MAX_ATTEMPTS = 5; public static final int DEFAULT_AZURE_OAUTH_TOKEN_FETCH_RETRY_MIN_BACKOFF_INTERVAL = 0; diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsApacheHttpClient.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsApacheHttpClient.java index 0dffead84245c..97f1e8a5c60c6 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsApacheHttpClient.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsApacheHttpClient.java @@ -93,8 +93,14 @@ static boolean usable() { final HttpClientBuilder builder = HttpClients.custom(); builder.setConnectionManager(connMgr) .setRequestExecutor( + // In case of Expect:100-continue, the timeout for waiting for + // the 100-continue response from the server is set using + // ExpectWaitContinueTimeout. For other requests, the read timeout + // is set using SocketTimeout. new AbfsManagedHttpRequestExecutor( - abfsConfiguration.getHttpReadTimeout())) + abfsConfiguration.isExpectHeaderEnabled() + ? abfsConfiguration.getExpect100ContinueWaitTimeout() + : abfsConfiguration.getHttpReadTimeout())) .disableContentCompression() .disableRedirectHandling() .disableAutomaticRetries() diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java index 6bc174e291a84..71da8f9bda96e 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java @@ -257,6 +257,9 @@ private AbfsClient(final URL baseUrl, == HttpOperationType.APACHE_HTTP_CLIENT) { keepAliveCache = new KeepAliveCache(abfsConfiguration); + // Warm up the connection pool during client initialization to avoid latency during first request. + // Since for every filesystem instance, we create both DFS and Blob client instance, + // so warmup is done only for the default client. abfsApacheHttpClient = new AbfsApacheHttpClient( DelegatingSSLSocketFactory.getDefaultFactory(), abfsConfiguration, keepAliveCache, baseUrl, diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientHandler.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientHandler.java index 0211b8bd52307..a18aec4fb2d8c 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientHandler.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientHandler.java @@ -68,6 +68,9 @@ public AbfsClientHandler(final URL baseUrl, final SASTokenProvider sasTokenProvider, final EncryptionContextProvider encryptionContextProvider, final AbfsClientContext abfsClientContext) throws IOException { + // This will initialize the default and ingress service types. + // This is needed before crating the clients so that we can do cache warmup + // only for default client. initServiceType(abfsConfiguration); this.dfsAbfsClient = createDfsClient(baseUrl, sharedKeyCredentials, abfsConfiguration, null, sasTokenProvider, encryptionContextProvider, diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsConnectionManager.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsConnectionManager.java index 4a15fa57af245..efd235645cd8d 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsConnectionManager.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsConnectionManager.java @@ -361,7 +361,7 @@ private int cacheExtraConnection(final HttpRoute route, } catch (RejectedExecutionException e) { LOG.debug("Task rejected for connection creation: {}", e.getMessage()); - return 0; + return -1; } } diff --git a/hadoop-tools/hadoop-azure/src/site/markdown/index.md b/hadoop-tools/hadoop-azure/src/site/markdown/index.md index 6695d814c9335..e52555ef76f9d 100644 --- a/hadoop-tools/hadoop-azure/src/site/markdown/index.md +++ b/hadoop-tools/hadoop-azure/src/site/markdown/index.md @@ -890,8 +890,8 @@ ABFS Driver can use the following networking libraries: The networking library can be configured using the configuration `fs.azure.networking.library` while initializing the filesystem. Following are the supported values: -- `JDK_HTTP_URL_CONNECTION` : Use JDK networking library [Default] -- `APACHE_HTTP_CLIENT` : Use Apache HttpClient +- `JDK_HTTP_URL_CONNECTION` : Use JDK networking library +- `APACHE_HTTP_CLIENT` : Use Apache HttpClient [Default] #### ApacheHttpClient networking layer configuration Options: diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestApacheClientConnectionPool.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestApacheClientConnectionPool.java index c16ec2888a2c1..a0248ee6f6ab6 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestApacheClientConnectionPool.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestApacheClientConnectionPool.java @@ -33,6 +33,7 @@ import org.apache.hadoop.fs.azurebfs.AbstractAbfsIntegrationTest; import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem; import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsDriverException; +import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException; import org.apache.hadoop.security.ssl.DelegatingSSLSocketFactory; import org.apache.hadoop.util.functional.Tuples; import org.apache.http.HttpHost; @@ -45,8 +46,10 @@ import org.apache.http.conn.ssl.SSLConnectionSocketFactory; import org.apache.http.impl.conn.DefaultHttpClientConnectionOperator; +import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.APACHE_IMPL; import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.COLON; import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.EMPTY_STRING; +import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.JDK_FALLBACK; import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.KEEP_ALIVE_CACHE_CLOSED; import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_METRIC_FORMAT; import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_NETWORKING_LIBRARY; @@ -74,7 +77,9 @@ public void testKacIsClosed() throws Throwable { configuration.unset(FS_AZURE_METRIC_FORMAT); try (AzureBlobFileSystem fs = (AzureBlobFileSystem) FileSystem.newInstance( configuration)) { - KeepAliveCache kac = fs.getAbfsStore().getClientHandler().getIngressClient() + KeepAliveCache kac = fs.getAbfsStore() + .getClientHandler() + .getIngressClient() .getKeepAliveCache(); kac.close(); AbfsDriverException ex = intercept(AbfsDriverException.class, @@ -149,10 +154,33 @@ public void testApacheClientFallbackDuringConnectionWarmup() Assertions.assertThat(AbfsApacheHttpClient.usable()) .describedAs("Apache HttpClient should be not usable") .isFalse(); + // Make a rest API call to verify that the client falls back to JDK client. + AzureBlobFileSystem fs = getFileSystem(); + verifyClientRequestId(fs, JDK_FALLBACK); AbfsApacheHttpClient.setUsable(); + verifyClientRequestId(fs, APACHE_IMPL); } } + /** + * Verify that the client request id contains the expected client. + * @param fs AzureBlobFileSystem instance + * @param expectedClient Expected client in the client request id. + * @throws AzureBlobFileSystemException if any failure occurs during the operation. + */ + private void verifyClientRequestId(AzureBlobFileSystem fs, + String expectedClient) + throws AzureBlobFileSystemException { + AbfsRestOperation op = fs.getAbfsStore() + .getClient() + .getFilesystemProperties(getTestTracingContext(fs, true)); + String[] clientRequestIdList = op.getResult() + .getClientRequestId().split(COLON); + Assertions.assertThat(clientRequestIdList[clientRequestIdList.length - 1]) + .describedAs("Http Client in use should be %s", expectedClient) + .isEqualTo(expectedClient); + } + private Map.Entry getTestConnection() throws IOException { HttpHost host = new HttpHost(getFileSystem().getUri().getHost(), From 7ff70868cc0483c55d875f66cf2379f04e75075e Mon Sep 17 00:00:00 2001 From: bhattmanish98 Date: Thu, 9 Oct 2025 06:12:43 -0700 Subject: [PATCH 4/4] Typo error fix --- .../apache/hadoop/fs/azurebfs/services/AbfsClientHandler.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientHandler.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientHandler.java index a18aec4fb2d8c..a7bf5699dc208 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientHandler.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientHandler.java @@ -69,7 +69,7 @@ public AbfsClientHandler(final URL baseUrl, final EncryptionContextProvider encryptionContextProvider, final AbfsClientContext abfsClientContext) throws IOException { // This will initialize the default and ingress service types. - // This is needed before crating the clients so that we can do cache warmup + // This is needed before creating the clients so that we can do cache warmup // only for default client. initServiceType(abfsConfiguration); this.dfsAbfsClient = createDfsClient(baseUrl, sharedKeyCredentials,