diff --git a/flight/flight-sql-jdbc-core/pom.xml b/flight/flight-sql-jdbc-core/pom.xml index 3362b60a8e..fe7dbe7b29 100644 --- a/flight/flight-sql-jdbc-core/pom.xml +++ b/flight/flight-sql-jdbc-core/pom.xml @@ -147,6 +147,12 @@ under the License. org.checkerframework checker-qual + + + com.github.ben-manes.caffeine + caffeine + 3.1.8 + diff --git a/flight/flight-sql-jdbc-core/src/main/java/org/apache/arrow/driver/jdbc/ArrowFlightConnection.java b/flight/flight-sql-jdbc-core/src/main/java/org/apache/arrow/driver/jdbc/ArrowFlightConnection.java index cf9804d68b..747287ed13 100644 --- a/flight/flight-sql-jdbc-core/src/main/java/org/apache/arrow/driver/jdbc/ArrowFlightConnection.java +++ b/flight/flight-sql-jdbc-core/src/main/java/org/apache/arrow/driver/jdbc/ArrowFlightConnection.java @@ -24,6 +24,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import org.apache.arrow.driver.jdbc.client.ArrowFlightSqlClientHandler; +import org.apache.arrow.driver.jdbc.client.utils.FlightClientCache; import org.apache.arrow.driver.jdbc.utils.ArrowFlightConnectionConfigImpl; import org.apache.arrow.flight.FlightClient; import org.apache.arrow.memory.BufferAllocator; @@ -113,6 +114,7 @@ private static ArrowFlightSqlClientHandler createNewClientHandler( .withRetainCookies(config.retainCookies()) .withRetainAuth(config.retainAuth()) .withCatalog(config.getCatalog()) + .withClientCache(config.useClientCache() ? new FlightClientCache() : null) .withConnectTimeout(config.getConnectTimeout()) .build(); } catch (final SQLException e) { diff --git a/flight/flight-sql-jdbc-core/src/main/java/org/apache/arrow/driver/jdbc/client/ArrowFlightSqlClientHandler.java b/flight/flight-sql-jdbc-core/src/main/java/org/apache/arrow/driver/jdbc/client/ArrowFlightSqlClientHandler.java index cbbe223eb8..17c2c16ebf 100644 --- a/flight/flight-sql-jdbc-core/src/main/java/org/apache/arrow/driver/jdbc/client/ArrowFlightSqlClientHandler.java +++ b/flight/flight-sql-jdbc-core/src/main/java/org/apache/arrow/driver/jdbc/client/ArrowFlightSqlClientHandler.java @@ -33,6 +33,8 @@ import java.util.Optional; import java.util.Set; import org.apache.arrow.driver.jdbc.client.utils.ClientAuthenticationUtils; +import org.apache.arrow.driver.jdbc.client.utils.FlightClientCache; +import org.apache.arrow.driver.jdbc.client.utils.FlightLocationQueue; import org.apache.arrow.flight.CallOption; import org.apache.arrow.flight.CallStatus; import org.apache.arrow.flight.CloseSessionRequest; @@ -75,21 +77,27 @@ public final class ArrowFlightSqlClientHandler implements AutoCloseable { // JDBC connection string query parameter private static final String CATALOG = "catalog"; + private final String cacheKey; private final FlightSqlClient sqlClient; private final Set options = new HashSet<>(); private final Builder builder; private final Optional catalog; + private final @Nullable FlightClientCache flightClientCache; ArrowFlightSqlClientHandler( + final String cacheKey, final FlightSqlClient sqlClient, final Builder builder, final Collection credentialOptions, - final Optional catalog) { + final Optional catalog, + final @Nullable FlightClientCache flightClientCache) { this.options.addAll(builder.options); this.options.addAll(credentialOptions); + this.cacheKey = Preconditions.checkNotNull(cacheKey); this.sqlClient = Preconditions.checkNotNull(sqlClient); this.builder = builder; this.catalog = catalog; + this.flightClientCache = flightClientCache; } /** @@ -101,12 +109,15 @@ public final class ArrowFlightSqlClientHandler implements AutoCloseable { * @return a new {@link ArrowFlightSqlClientHandler}. */ static ArrowFlightSqlClientHandler createNewHandler( + final String cacheKey, final FlightClient client, final Builder builder, final Collection options, - final Optional catalog) { + final Optional catalog, + final @Nullable FlightClientCache flightClientCache) { final ArrowFlightSqlClientHandler handler = - new ArrowFlightSqlClientHandler(new FlightSqlClient(client), builder, options, catalog); + new ArrowFlightSqlClientHandler( + cacheKey, new FlightSqlClient(client), builder, options, catalog, flightClientCache); handler.setSetCatalogInSessionIfPresent(); return handler; } @@ -148,9 +159,14 @@ public List getStreams(final FlightInfo flightInfo) // location // is the same as the original connection's Location and skip creating a FlightClient in // that scenario. + // Also copy the cache to the client so we can share a cache. Cache needs to cache + // negative attempts too. List exceptions = new ArrayList<>(); CloseableEndpointStreamPair stream = null; - for (Location location : endpoint.getLocations()) { + FlightLocationQueue locations = + new FlightLocationQueue(flightClientCache, endpoint.getLocations()); + while (locations.hasNext()) { + Location location = locations.next(); final URI endpointUri = location.getUri(); if (endpointUri.getScheme().equals(LocationSchemes.REUSE_CONNECTION)) { stream = @@ -163,6 +179,7 @@ public List getStreams(final FlightInfo flightInfo) .withHost(endpointUri.getHost()) .withPort(endpointUri.getPort()) .withEncryption(endpointUri.getScheme().equals(LocationSchemes.GRPC_TLS)) + .withClientCache(flightClientCache) .withConnectTimeout(builder.connectTimeout); ArrowFlightSqlClientHandler endpointHandler = null; @@ -177,12 +194,29 @@ public List getStreams(final FlightInfo flightInfo) stream.getStream().getSchema(); } catch (Exception ex) { if (endpointHandler != null) { + // If the exception is related to connectivity, mark the client as a dud. + if (flightClientCache != null) { + if (ex instanceof FlightRuntimeException + && ((FlightRuntimeException) ex).status().code() + == FlightStatusCode.UNAVAILABLE + && + // IOException covers SocketException and Netty's (private) + // AnnotatedSocketException + // We are looking for things like "Network is unreachable" + ex.getCause() instanceof IOException) { + flightClientCache.markLocationAsDud(location.toString()); + } + } + AutoCloseables.close(endpointHandler); } exceptions.add(ex); continue; } + if (flightClientCache != null) { + flightClientCache.markLocationAsReachable(location.toString()); + } break; } if (stream != null) { @@ -549,6 +583,8 @@ public static final class Builder { @VisibleForTesting Optional catalog = Optional.empty(); + @VisibleForTesting @Nullable FlightClientCache flightClientCache; + @VisibleForTesting @Nullable Duration connectTimeout; // These two middleware are for internal use within build() and should not be exposed by builder @@ -833,11 +869,20 @@ public Builder withCatalog(@Nullable final String catalog) { return this; } + public Builder withClientCache(FlightClientCache flightClientCache) { + this.flightClientCache = flightClientCache; + return this; + } + public Builder withConnectTimeout(Duration connectTimeout) { this.connectTimeout = connectTimeout; return this; } + public String getCacheKey() { + return getLocation().toString(); + } + /** Get the location that this client will connect to. */ public Location getLocation() { if (useEncryption) { @@ -931,7 +976,7 @@ public ArrowFlightSqlClientHandler build() throws SQLException { options.toArray(new CallOption[0]))); } return ArrowFlightSqlClientHandler.createNewHandler( - client, this, credentialOptions, catalog); + getCacheKey(), client, this, credentialOptions, catalog, flightClientCache); } catch (final IllegalArgumentException | GeneralSecurityException diff --git a/flight/flight-sql-jdbc-core/src/main/java/org/apache/arrow/driver/jdbc/client/utils/FlightClientCache.java b/flight/flight-sql-jdbc-core/src/main/java/org/apache/arrow/driver/jdbc/client/utils/FlightClientCache.java new file mode 100644 index 0000000000..36e8441baa --- /dev/null +++ b/flight/flight-sql-jdbc-core/src/main/java/org/apache/arrow/driver/jdbc/client/utils/FlightClientCache.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.arrow.driver.jdbc.client.utils; + +import com.github.benmanes.caffeine.cache.Cache; +import com.github.benmanes.caffeine.cache.Caffeine; +import java.time.Duration; +import org.apache.arrow.util.VisibleForTesting; + +/** + * A cache for Flight clients. + * + *

The intent is to avoid constantly recreating clients to the same locations. gRPC can multiplex + * multiple requests over a single TCP connection, and a cache would let us take advantage of that. + * + *

At the time being it only tracks whether a location is reachable or not. To actually cache + * clients, we would need a way to incorporate other connection parameters (authentication, etc.) + * into the cache key. + */ +public final class FlightClientCache { + @VisibleForTesting Cache clientCache; + + public FlightClientCache() { + this.clientCache = Caffeine.newBuilder().expireAfterWrite(Duration.ofSeconds(600)).build(); + } + + public boolean isDud(String key) { + return clientCache.getIfPresent(key) != null; + } + + public void markLocationAsDud(String key) { + clientCache.put(key, new ClientCacheEntry()); + } + + public void markLocationAsReachable(String key) { + clientCache.invalidate(key); + } + + /** A cache entry (empty because we only track reachability, see outer class docstring). */ + public static final class ClientCacheEntry {} +} diff --git a/flight/flight-sql-jdbc-core/src/main/java/org/apache/arrow/driver/jdbc/client/utils/FlightLocationQueue.java b/flight/flight-sql-jdbc-core/src/main/java/org/apache/arrow/driver/jdbc/client/utils/FlightLocationQueue.java new file mode 100644 index 0000000000..f507ec53e7 --- /dev/null +++ b/flight/flight-sql-jdbc-core/src/main/java/org/apache/arrow/driver/jdbc/client/utils/FlightLocationQueue.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.arrow.driver.jdbc.client.utils; + +import java.util.ArrayDeque; +import java.util.Deque; +import java.util.Iterator; +import java.util.List; +import java.util.NoSuchElementException; +import org.apache.arrow.flight.Location; +import org.checkerframework.checker.nullness.qual.Nullable; + +/** + * A queue of Flight locations to connect to for an endpoint. + * + *

This helper class is intended to encapsulate the retry logic in a testable manner. + */ +public final class FlightLocationQueue implements Iterator { + private final Deque locations; + private final Deque badLocations; + + /** + * Create a new queue. + * + * @param flightClientCache An optional cache used to sort previously unreachable locations to the + * end. + * @param locations The locations to try. + */ + public FlightLocationQueue( + @Nullable FlightClientCache flightClientCache, List locations) { + this.locations = new ArrayDeque<>(); + this.badLocations = new ArrayDeque<>(); + + for (Location location : locations) { + if (flightClientCache != null && flightClientCache.isDud(location.toString())) { + this.badLocations.add(location); + } else { + this.locations.add(location); + } + } + } + + @Override + public boolean hasNext() { + return !locations.isEmpty() || !badLocations.isEmpty(); + } + + @Override + public Location next() { + if (!locations.isEmpty()) { + return locations.pop(); + } else if (!badLocations.isEmpty()) { + return badLocations.pop(); + } + throw new NoSuchElementException(); + } +} diff --git a/flight/flight-sql-jdbc-core/src/main/java/org/apache/arrow/driver/jdbc/utils/ArrowFlightConnectionConfigImpl.java b/flight/flight-sql-jdbc-core/src/main/java/org/apache/arrow/driver/jdbc/utils/ArrowFlightConnectionConfigImpl.java index ab6a5898b7..76ba964a53 100644 --- a/flight/flight-sql-jdbc-core/src/main/java/org/apache/arrow/driver/jdbc/utils/ArrowFlightConnectionConfigImpl.java +++ b/flight/flight-sql-jdbc-core/src/main/java/org/apache/arrow/driver/jdbc/utils/ArrowFlightConnectionConfigImpl.java @@ -174,6 +174,11 @@ public Duration getConnectTimeout() { return Duration.ofMillis(timeout); } + /** Whether to enable the client cache. */ + public boolean useClientCache() { + return ArrowFlightConnectionProperty.USE_CLIENT_CACHE.getBoolean(properties); + } + /** * Gets the {@link CallOption}s from this {@link ConnectionConfig}. * @@ -226,6 +231,7 @@ public enum ArrowFlightConnectionProperty implements ConnectionProperty { RETAIN_AUTH("retainAuth", true, Type.BOOLEAN, false), CATALOG("catalog", null, Type.STRING, false), CONNECT_TIMEOUT_MILLIS("connectTimeoutMs", 10000, Type.NUMBER, false), + USE_CLIENT_CACHE("useClientCache", true, Type.BOOLEAN, false), ; private final String camelName; diff --git a/flight/flight-sql-jdbc-core/src/test/java/org/apache/arrow/driver/jdbc/ResultSetTest.java b/flight/flight-sql-jdbc-core/src/test/java/org/apache/arrow/driver/jdbc/ResultSetTest.java index cd47408f52..569b5495fe 100644 --- a/flight/flight-sql-jdbc-core/src/test/java/org/apache/arrow/driver/jdbc/ResultSetTest.java +++ b/flight/flight-sql-jdbc-core/src/test/java/org/apache/arrow/driver/jdbc/ResultSetTest.java @@ -699,10 +699,9 @@ public void testFallbackUnresolvableFlightServer() throws Exception { } attempt1 = System.nanoTime(); elapsedMs = (attempt1 - start) / 1_000_000.; - // TODO(GH-661): this assertion should be flipped to assertTrue. - assertFalse( + assertTrue( elapsedMs < 5000., - String.format("Expected second attempt to be the same, but %f ms elapsed", elapsedMs)); + String.format("Expected second attempt to be faster, but %f ms elapsed", elapsedMs)); } } } diff --git a/flight/flight-sql-jdbc-core/src/test/java/org/apache/arrow/driver/jdbc/client/ArrowFlightSqlClientHandlerBuilderTest.java b/flight/flight-sql-jdbc-core/src/test/java/org/apache/arrow/driver/jdbc/client/ArrowFlightSqlClientHandlerBuilderTest.java index 7b416638e1..6524eaf39a 100644 --- a/flight/flight-sql-jdbc-core/src/test/java/org/apache/arrow/driver/jdbc/client/ArrowFlightSqlClientHandlerBuilderTest.java +++ b/flight/flight-sql-jdbc-core/src/test/java/org/apache/arrow/driver/jdbc/client/ArrowFlightSqlClientHandlerBuilderTest.java @@ -147,6 +147,7 @@ public void testDefaults() { assertNull(builder.clientCertificatePath); assertNull(builder.clientKeyPath); assertEquals(Optional.empty(), builder.catalog); + assertNull(builder.flightClientCache); assertNull(builder.connectTimeout); } diff --git a/flight/flight-sql-jdbc-core/src/test/java/org/apache/arrow/driver/jdbc/client/utils/FlightClientCacheTest.java b/flight/flight-sql-jdbc-core/src/test/java/org/apache/arrow/driver/jdbc/client/utils/FlightClientCacheTest.java new file mode 100644 index 0000000000..8e818967a5 --- /dev/null +++ b/flight/flight-sql-jdbc-core/src/test/java/org/apache/arrow/driver/jdbc/client/utils/FlightClientCacheTest.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.arrow.driver.jdbc.client.utils; + +import static org.junit.jupiter.api.Assertions.*; + +import org.apache.arrow.flight.Location; +import org.junit.jupiter.api.Test; + +class FlightClientCacheTest { + @Test + void basicOperation() { + FlightClientCache cache = new FlightClientCache(); + + Location location1 = Location.forGrpcInsecure("localhost", 8080); + Location location2 = Location.forGrpcInsecure("localhost", 8081); + + assertFalse(cache.isDud(location1.toString())); + assertFalse(cache.isDud(location2.toString())); + + cache.markLocationAsReachable(location1.toString()); + assertFalse(cache.isDud(location1.toString())); + assertFalse(cache.isDud(location2.toString())); + + cache.markLocationAsDud(location1.toString()); + assertTrue(cache.isDud(location1.toString())); + assertFalse(cache.isDud(location2.toString())); + + cache.markLocationAsDud(location2.toString()); + assertTrue(cache.isDud(location1.toString())); + assertTrue(cache.isDud(location2.toString())); + + cache.markLocationAsReachable(location1.toString()); + assertFalse(cache.isDud(location1.toString())); + assertTrue(cache.isDud(location2.toString())); + } +} diff --git a/flight/flight-sql-jdbc-core/src/test/java/org/apache/arrow/driver/jdbc/client/utils/FlightLocationQueueTest.java b/flight/flight-sql-jdbc-core/src/test/java/org/apache/arrow/driver/jdbc/client/utils/FlightLocationQueueTest.java new file mode 100644 index 0000000000..0603f86e59 --- /dev/null +++ b/flight/flight-sql-jdbc-core/src/test/java/org/apache/arrow/driver/jdbc/client/utils/FlightLocationQueueTest.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.arrow.driver.jdbc.client.utils; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.util.Collections; +import java.util.List; +import java.util.NoSuchElementException; +import org.apache.arrow.flight.Location; +import org.junit.jupiter.api.Test; + +class FlightLocationQueueTest { + @Test + void basicOperation() { + Location location1 = Location.forGrpcInsecure("localhost", 8080); + Location location2 = Location.forGrpcInsecure("localhost", 8081); + FlightLocationQueue queue = new FlightLocationQueue(null, List.of(location1, location2)); + assertTrue(queue.hasNext()); + assertEquals(location1, queue.next()); + assertTrue(queue.hasNext()); + assertEquals(location2, queue.next()); + assertFalse(queue.hasNext()); + } + + @Test + void badAfterGood() { + Location location1 = Location.forGrpcInsecure("localhost", 8080); + Location location2 = Location.forGrpcInsecure("localhost", 8081); + FlightClientCache cache = new FlightClientCache(); + cache.markLocationAsDud(location1.toString()); + FlightLocationQueue queue = new FlightLocationQueue(cache, List.of(location1, location2)); + assertTrue(queue.hasNext()); + assertEquals(location2, queue.next()); + assertTrue(queue.hasNext()); + assertEquals(location1, queue.next()); + assertFalse(queue.hasNext()); + } + + @Test + void iteratorInvariants() { + FlightLocationQueue empty = new FlightLocationQueue(null, Collections.emptyList()); + assertFalse(empty.hasNext()); + assertThrows(NoSuchElementException.class, empty::next); + } +} diff --git a/flight/flight-sql-jdbc-core/src/test/java/org/apache/arrow/driver/jdbc/utils/ArrowFlightConnectionConfigImplTest.java b/flight/flight-sql-jdbc-core/src/test/java/org/apache/arrow/driver/jdbc/utils/ArrowFlightConnectionConfigImplTest.java index c780d53fab..ecce7708c0 100644 --- a/flight/flight-sql-jdbc-core/src/test/java/org/apache/arrow/driver/jdbc/utils/ArrowFlightConnectionConfigImplTest.java +++ b/flight/flight-sql-jdbc-core/src/test/java/org/apache/arrow/driver/jdbc/utils/ArrowFlightConnectionConfigImplTest.java @@ -24,6 +24,7 @@ import static org.apache.arrow.driver.jdbc.utils.ArrowFlightConnectionConfigImpl.ArrowFlightConnectionProperty.PORT; import static org.apache.arrow.driver.jdbc.utils.ArrowFlightConnectionConfigImpl.ArrowFlightConnectionProperty.THREAD_POOL_SIZE; import static org.apache.arrow.driver.jdbc.utils.ArrowFlightConnectionConfigImpl.ArrowFlightConnectionProperty.USER; +import static org.apache.arrow.driver.jdbc.utils.ArrowFlightConnectionConfigImpl.ArrowFlightConnectionProperty.USE_CLIENT_CACHE; import static org.apache.arrow.driver.jdbc.utils.ArrowFlightConnectionConfigImpl.ArrowFlightConnectionProperty.USE_ENCRYPTION; import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.MatcherAssert.assertThat; @@ -122,6 +123,12 @@ public static Stream provideParameters() { 5000, Duration.ofMillis(5000), (Function) - ArrowFlightConnectionConfigImpl::getConnectTimeout)); + ArrowFlightConnectionConfigImpl::getConnectTimeout), + Arguments.of( + USE_CLIENT_CACHE, + false, + false, + (Function) + ArrowFlightConnectionConfigImpl::useClientCache)); } }