Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -262,15 +262,85 @@ public FlightInfo getInfo(final String query) {
@Override
public void close() throws SQLException {
if (catalog.isPresent()) {
sqlClient.closeSession(new CloseSessionRequest(), getOptions());
try {
sqlClient.closeSession(new CloseSessionRequest(), getOptions());
} catch (FlightRuntimeException fre) {
handleBenignCloseException(
fre, "Failed to close Flight SQL session.", "closing Flight SQL session");
}
}
try {
AutoCloseables.close(sqlClient);
} catch (FlightRuntimeException fre) {
handleBenignCloseException(
fre, "Failed to clean up client resources.", "closing Flight SQL client");
} catch (final Exception e) {
throw new SQLException("Failed to clean up client resources.", e);
}
}

/**
* Handles FlightRuntimeException during close operations, suppressing benign gRPC shutdown errors
* while re-throwing genuine failures.
*
* @param fre the FlightRuntimeException to handle
* @param sqlErrorMessage the SQLException message to use for genuine failures
* @param operationDescription description of the operation for logging
* @throws SQLException if the exception represents a genuine failure
*/
private void handleBenignCloseException(
FlightRuntimeException fre, String sqlErrorMessage, String operationDescription)
throws SQLException {
if (isBenignCloseException(fre)) {
logSuppressedCloseException(fre, operationDescription);
} else {
throw new SQLException(sqlErrorMessage, fre);
}
}

/**
* Handles FlightRuntimeException during close operations, suppressing benign gRPC shutdown errors
* while re-throwing genuine failures as FlightRuntimeException.
*
* @param fre the FlightRuntimeException to handle
* @param operationDescription description of the operation for logging
* @throws FlightRuntimeException if the exception represents a genuine failure
*/
private void handleBenignCloseException(FlightRuntimeException fre, String operationDescription)
throws FlightRuntimeException {
if (isBenignCloseException(fre)) {
logSuppressedCloseException(fre, operationDescription);
} else {
throw fre;
}
}

/**
* Determines if a FlightRuntimeException represents a benign close operation error that should be
* suppressed.
*
* @param fre the FlightRuntimeException to check
* @return true if the exception should be suppressed, false otherwise
*/
private boolean isBenignCloseException(FlightRuntimeException fre) {
return fre.status().code().equals(FlightStatusCode.UNAVAILABLE)
|| (fre.status().code().equals(FlightStatusCode.INTERNAL)
&& fre.getMessage() != null
&& fre.getMessage().contains("Connection closed after GOAWAY"));
}

/**
* Logs a suppressed close exception with appropriate level based on debug settings.
*
* @param fre the FlightRuntimeException being suppressed
* @param operationDescription description of the operation for logging
*/
private void logSuppressedCloseException(
FlightRuntimeException fre, String operationDescription) {
// ARROW-17785 and GH-863: suppress exceptions caused by flaky gRPC layer during shutdown
LOGGER.info("Suppressed error {}", operationDescription, fre);
}

/** A prepared statement handler. */
public interface PreparedStatement extends AutoCloseable {
/**
Expand Down Expand Up @@ -386,14 +456,7 @@ public void close() {
try {
preparedStatement.close(getOptions());
} catch (FlightRuntimeException fre) {
// ARROW-17785: suppress exceptions caused by flaky gRPC layer
if (fre.status().code().equals(FlightStatusCode.UNAVAILABLE)
|| (fre.status().code().equals(FlightStatusCode.INTERNAL)
&& fre.getMessage().contains("Connection closed after GOAWAY"))) {
LOGGER.warn("Supressed error closing PreparedStatement", fre);
return;
}
throw fre;
handleBenignCloseException(fre, "closing PreparedStatement");
}
}
};
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.arrow.driver.jdbc.client;

import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;

import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Optional;
import org.apache.arrow.flight.CallOption;
import org.apache.arrow.flight.CallStatus;
import org.apache.arrow.flight.CloseSessionRequest;
import org.apache.arrow.flight.FlightStatusCode;
import org.apache.arrow.flight.sql.FlightSqlClient;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;

public class ArrowFlightSqlClientHandlerTest {

@ParameterizedTest
@MethodSource
public void testCloseHandlesFlightRuntimeException(
boolean throwFromCloseSession, CallStatus callStatus, boolean shouldSuppress)
throws Exception {
FlightSqlClient sqlClient = mock(FlightSqlClient.class);
String cacheKey = "cacheKey";
Optional<String> catalog =
throwFromCloseSession ? Optional.of("test_catalog") : Optional.empty();
final Collection<CallOption> credentialOptions = new ArrayList<>();
ArrowFlightSqlClientHandler.Builder builder = new ArrowFlightSqlClientHandler.Builder();

if (throwFromCloseSession) {
doThrow(callStatus.toRuntimeException())
.when(sqlClient)
.closeSession(any(CloseSessionRequest.class), any(CallOption[].class));
} else {
doThrow(callStatus.toRuntimeException()).when(sqlClient).close();
}

ArrowFlightSqlClientHandler sqlClientHandler =
new ArrowFlightSqlClientHandler(
cacheKey, sqlClient, builder, credentialOptions, catalog, null);

if (shouldSuppress) {
assertDoesNotThrow(sqlClientHandler::close);
} else {
assertThrows(SQLException.class, sqlClientHandler::close);
}
}

private static Object[] testCloseHandlesFlightRuntimeException() {
CallStatus benignInternalError =
new CallStatus(FlightStatusCode.INTERNAL, null, "Connection closed after GOAWAY", null);
CallStatus notBenignInternalError =
new CallStatus(FlightStatusCode.INTERNAL, null, "Not a benign internal error", null);
CallStatus unavailableError = new CallStatus(FlightStatusCode.UNAVAILABLE, null, null, null);
CallStatus unknownError = new CallStatus(FlightStatusCode.UNKNOWN, null, null, null);
return new Object[] {
new Object[] {true, benignInternalError, true},
new Object[] {false, benignInternalError, true},
new Object[] {true, notBenignInternalError, false},
new Object[] {false, notBenignInternalError, false},
new Object[] {true, unavailableError, true},
new Object[] {false, unavailableError, true},
new Object[] {true, unknownError, false},
new Object[] {false, unknownError, false},
};
}
}
Loading