Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions flight/flight-sql-jdbc-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,12 @@ under the License.
<groupId>org.checkerframework</groupId>
<artifactId>checker-qual</artifactId>
</dependency>

<dependency>
<groupId>com.github.ben-manes.caffeine</groupId>
<artifactId>caffeine</artifactId>
<version>3.1.8</version>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.arrow.driver.jdbc.client.ArrowFlightSqlClientHandler;
import org.apache.arrow.driver.jdbc.client.utils.FlightClientCache;
import org.apache.arrow.driver.jdbc.utils.ArrowFlightConnectionConfigImpl;
import org.apache.arrow.flight.FlightClient;
import org.apache.arrow.memory.BufferAllocator;
Expand Down Expand Up @@ -113,6 +114,7 @@ private static ArrowFlightSqlClientHandler createNewClientHandler(
.withRetainCookies(config.retainCookies())
.withRetainAuth(config.retainAuth())
.withCatalog(config.getCatalog())
.withClientCache(config.useClientCache() ? new FlightClientCache() : null)
.withConnectTimeout(config.getConnectTimeout())
.build();
} catch (final SQLException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@
import java.util.Optional;
import java.util.Set;
import org.apache.arrow.driver.jdbc.client.utils.ClientAuthenticationUtils;
import org.apache.arrow.driver.jdbc.client.utils.FlightClientCache;
import org.apache.arrow.driver.jdbc.client.utils.FlightLocationQueue;
import org.apache.arrow.flight.CallOption;
import org.apache.arrow.flight.CallStatus;
import org.apache.arrow.flight.CloseSessionRequest;
Expand Down Expand Up @@ -75,21 +77,27 @@ public final class ArrowFlightSqlClientHandler implements AutoCloseable {
// JDBC connection string query parameter
private static final String CATALOG = "catalog";

private final String cacheKey;
private final FlightSqlClient sqlClient;
private final Set<CallOption> options = new HashSet<>();
private final Builder builder;
private final Optional<String> catalog;
private final @Nullable FlightClientCache flightClientCache;

ArrowFlightSqlClientHandler(
final String cacheKey,
final FlightSqlClient sqlClient,
final Builder builder,
final Collection<CallOption> credentialOptions,
final Optional<String> catalog) {
final Optional<String> catalog,
final @Nullable FlightClientCache flightClientCache) {
this.options.addAll(builder.options);
this.options.addAll(credentialOptions);
this.cacheKey = Preconditions.checkNotNull(cacheKey);
this.sqlClient = Preconditions.checkNotNull(sqlClient);
this.builder = builder;
this.catalog = catalog;
this.flightClientCache = flightClientCache;
}

/**
Expand All @@ -101,12 +109,15 @@ public final class ArrowFlightSqlClientHandler implements AutoCloseable {
* @return a new {@link ArrowFlightSqlClientHandler}.
*/
static ArrowFlightSqlClientHandler createNewHandler(
final String cacheKey,
final FlightClient client,
final Builder builder,
final Collection<CallOption> options,
final Optional<String> catalog) {
final Optional<String> catalog,
final @Nullable FlightClientCache flightClientCache) {
final ArrowFlightSqlClientHandler handler =
new ArrowFlightSqlClientHandler(new FlightSqlClient(client), builder, options, catalog);
new ArrowFlightSqlClientHandler(
cacheKey, new FlightSqlClient(client), builder, options, catalog, flightClientCache);
handler.setSetCatalogInSessionIfPresent();
return handler;
}
Expand Down Expand Up @@ -148,9 +159,14 @@ public List<CloseableEndpointStreamPair> getStreams(final FlightInfo flightInfo)
// location
// is the same as the original connection's Location and skip creating a FlightClient in
// that scenario.
// Also copy the cache to the client so we can share a cache. Cache needs to cache
// negative attempts too.
List<Exception> exceptions = new ArrayList<>();
CloseableEndpointStreamPair stream = null;
for (Location location : endpoint.getLocations()) {
FlightLocationQueue locations =
new FlightLocationQueue(flightClientCache, endpoint.getLocations());
while (locations.hasNext()) {
Location location = locations.next();
final URI endpointUri = location.getUri();
if (endpointUri.getScheme().equals(LocationSchemes.REUSE_CONNECTION)) {
stream =
Expand All @@ -163,6 +179,7 @@ public List<CloseableEndpointStreamPair> getStreams(final FlightInfo flightInfo)
.withHost(endpointUri.getHost())
.withPort(endpointUri.getPort())
.withEncryption(endpointUri.getScheme().equals(LocationSchemes.GRPC_TLS))
.withClientCache(flightClientCache)
.withConnectTimeout(builder.connectTimeout);

ArrowFlightSqlClientHandler endpointHandler = null;
Expand All @@ -177,12 +194,29 @@ public List<CloseableEndpointStreamPair> getStreams(final FlightInfo flightInfo)
stream.getStream().getSchema();
} catch (Exception ex) {
if (endpointHandler != null) {
// If the exception is related to connectivity, mark the client as a dud.
if (flightClientCache != null) {
if (ex instanceof FlightRuntimeException
&& ((FlightRuntimeException) ex).status().code()
== FlightStatusCode.UNAVAILABLE
&&
// IOException covers SocketException and Netty's (private)
// AnnotatedSocketException
// We are looking for things like "Network is unreachable"
ex.getCause() instanceof IOException) {
flightClientCache.markLocationAsDud(location.toString());
}
}

AutoCloseables.close(endpointHandler);
}
exceptions.add(ex);
continue;
}

if (flightClientCache != null) {
flightClientCache.markLocationAsReachable(location.toString());
}
break;
}
if (stream != null) {
Expand Down Expand Up @@ -549,6 +583,8 @@ public static final class Builder {

@VisibleForTesting Optional<String> catalog = Optional.empty();

@VisibleForTesting @Nullable FlightClientCache flightClientCache;

@VisibleForTesting @Nullable Duration connectTimeout;

// These two middleware are for internal use within build() and should not be exposed by builder
Expand Down Expand Up @@ -833,11 +869,20 @@ public Builder withCatalog(@Nullable final String catalog) {
return this;
}

public Builder withClientCache(FlightClientCache flightClientCache) {
this.flightClientCache = flightClientCache;
return this;
}

public Builder withConnectTimeout(Duration connectTimeout) {
this.connectTimeout = connectTimeout;
return this;
}

public String getCacheKey() {
return getLocation().toString();
}

/** Get the location that this client will connect to. */
public Location getLocation() {
if (useEncryption) {
Expand Down Expand Up @@ -931,7 +976,7 @@ public ArrowFlightSqlClientHandler build() throws SQLException {
options.toArray(new CallOption[0])));
}
return ArrowFlightSqlClientHandler.createNewHandler(
client, this, credentialOptions, catalog);
getCacheKey(), client, this, credentialOptions, catalog, flightClientCache);

} catch (final IllegalArgumentException
| GeneralSecurityException
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.arrow.driver.jdbc.client.utils;

import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import java.time.Duration;
import org.apache.arrow.util.VisibleForTesting;

/**
* A cache for Flight clients.
*
* <p>The intent is to avoid constantly recreating clients to the same locations. gRPC can multiplex
* multiple requests over a single TCP connection, and a cache would let us take advantage of that.
*
* <p>At the time being it only tracks whether a location is reachable or not. To actually cache
* clients, we would need a way to incorporate other connection parameters (authentication, etc.)
* into the cache key.
*/
public final class FlightClientCache {
@VisibleForTesting Cache<String, ClientCacheEntry> clientCache;

public FlightClientCache() {
this.clientCache = Caffeine.newBuilder().expireAfterWrite(Duration.ofSeconds(600)).build();
}

public boolean isDud(String key) {
return clientCache.getIfPresent(key) != null;
}

public void markLocationAsDud(String key) {
clientCache.put(key, new ClientCacheEntry());
}

public void markLocationAsReachable(String key) {
clientCache.invalidate(key);
}

/** A cache entry (empty because we only track reachability, see outer class docstring). */
public static final class ClientCacheEntry {}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.arrow.driver.jdbc.client.utils;

import java.util.ArrayDeque;
import java.util.Deque;
import java.util.Iterator;
import java.util.List;
import java.util.NoSuchElementException;
import org.apache.arrow.flight.Location;
import org.checkerframework.checker.nullness.qual.Nullable;

/**
* A queue of Flight locations to connect to for an endpoint.
*
* <p>This helper class is intended to encapsulate the retry logic in a testable manner.
*/
public final class FlightLocationQueue implements Iterator<Location> {
private final Deque<Location> locations;
private final Deque<Location> badLocations;

/**
* Create a new queue.
*
* @param flightClientCache An optional cache used to sort previously unreachable locations to the
* end.
* @param locations The locations to try.
*/
public FlightLocationQueue(
@Nullable FlightClientCache flightClientCache, List<Location> locations) {
this.locations = new ArrayDeque<>();
this.badLocations = new ArrayDeque<>();

for (Location location : locations) {
if (flightClientCache != null && flightClientCache.isDud(location.toString())) {
this.badLocations.add(location);
} else {
this.locations.add(location);
}
}
}

@Override
public boolean hasNext() {
return !locations.isEmpty() || !badLocations.isEmpty();
}

@Override
public Location next() {
if (!locations.isEmpty()) {
return locations.pop();
} else if (!badLocations.isEmpty()) {
return badLocations.pop();
}
throw new NoSuchElementException();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,11 @@ public Duration getConnectTimeout() {
return Duration.ofMillis(timeout);
}

/** Whether to enable the client cache. */
public boolean useClientCache() {
return ArrowFlightConnectionProperty.USE_CLIENT_CACHE.getBoolean(properties);
}

/**
* Gets the {@link CallOption}s from this {@link ConnectionConfig}.
*
Expand Down Expand Up @@ -226,6 +231,7 @@ public enum ArrowFlightConnectionProperty implements ConnectionProperty {
RETAIN_AUTH("retainAuth", true, Type.BOOLEAN, false),
CATALOG("catalog", null, Type.STRING, false),
CONNECT_TIMEOUT_MILLIS("connectTimeoutMs", 10000, Type.NUMBER, false),
USE_CLIENT_CACHE("useClientCache", true, Type.BOOLEAN, false),
;

private final String camelName;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -699,10 +699,9 @@ public void testFallbackUnresolvableFlightServer() throws Exception {
}
attempt1 = System.nanoTime();
elapsedMs = (attempt1 - start) / 1_000_000.;
// TODO(GH-661): this assertion should be flipped to assertTrue.
assertFalse(
assertTrue(
elapsedMs < 5000.,
String.format("Expected second attempt to be the same, but %f ms elapsed", elapsedMs));
String.format("Expected second attempt to be faster, but %f ms elapsed", elapsedMs));
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,7 @@ public void testDefaults() {
assertNull(builder.clientCertificatePath);
assertNull(builder.clientKeyPath);
assertEquals(Optional.empty(), builder.catalog);
assertNull(builder.flightClientCache);
assertNull(builder.connectTimeout);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.arrow.driver.jdbc.client.utils;

import static org.junit.jupiter.api.Assertions.*;

import org.apache.arrow.flight.Location;
import org.junit.jupiter.api.Test;

class FlightClientCacheTest {
@Test
void basicOperation() {
FlightClientCache cache = new FlightClientCache();

Location location1 = Location.forGrpcInsecure("localhost", 8080);
Location location2 = Location.forGrpcInsecure("localhost", 8081);

assertFalse(cache.isDud(location1.toString()));
assertFalse(cache.isDud(location2.toString()));

cache.markLocationAsReachable(location1.toString());
assertFalse(cache.isDud(location1.toString()));
assertFalse(cache.isDud(location2.toString()));

cache.markLocationAsDud(location1.toString());
assertTrue(cache.isDud(location1.toString()));
assertFalse(cache.isDud(location2.toString()));

cache.markLocationAsDud(location2.toString());
assertTrue(cache.isDud(location1.toString()));
assertTrue(cache.isDud(location2.toString()));

cache.markLocationAsReachable(location1.toString());
assertFalse(cache.isDud(location1.toString()));
assertTrue(cache.isDud(location2.toString()));
}
}
Loading
Loading