From 17703d215e943a72a725a208593a0dc238deb8f5 Mon Sep 17 00:00:00 2001 From: David Li Date: Fri, 16 Jun 2023 17:15:38 -0400 Subject: [PATCH] Add Java support for proposal --- dev/archery/archery/integration/runner.py | 10 +- .../arrow/flight/CancelFlightInfoResult.java | 126 +++++++++ .../org/apache/arrow/flight/CancelStatus.java | 44 ++++ .../org/apache/arrow/flight/FlightClient.java | 75 ++++++ .../apache/arrow/flight/FlightConstants.java | 14 + .../apache/arrow/flight/FlightEndpoint.java | 83 +++++- ...xpirationTimeCancelFlightInfoScenario.java | 62 +++++ ...ExpirationTimeCloseFlightInfoScenario.java | 59 +++++ .../tests/ExpirationTimeDoGetScenario.java | 137 ++++++++++ .../ExpirationTimeListActionsScenario.java | 58 +++++ .../tests/ExpirationTimeProducer.java | 239 ++++++++++++++++++ ...tionTimeRefreshFlightEndpointScenario.java | 141 +++++++++++ .../tests/IntegrationAssertions.java | 6 + .../flight/integration/tests/Scenarios.java | 5 + .../integration/tests/IntegrationTest.java | 25 ++ .../arrow/flight/sql/CancelListener.java | 1 + .../apache/arrow/flight/sql/CancelResult.java | 3 + .../flight/sql/CancelStatusListener.java | 47 ++++ .../flight/sql/FlightEndpointListener.java | 46 ++++ .../arrow/flight/sql/FlightSqlClient.java | 37 +++ .../arrow/flight/sql/FlightSqlProducer.java | 77 ++++++ 21 files changed, 1279 insertions(+), 16 deletions(-) create mode 100644 java/flight/flight-core/src/main/java/org/apache/arrow/flight/CancelFlightInfoResult.java create mode 100644 java/flight/flight-core/src/main/java/org/apache/arrow/flight/CancelStatus.java create mode 100644 java/flight/flight-integration-tests/src/main/java/org/apache/arrow/flight/integration/tests/ExpirationTimeCancelFlightInfoScenario.java create mode 100644 java/flight/flight-integration-tests/src/main/java/org/apache/arrow/flight/integration/tests/ExpirationTimeCloseFlightInfoScenario.java create mode 100644 java/flight/flight-integration-tests/src/main/java/org/apache/arrow/flight/integration/tests/ExpirationTimeDoGetScenario.java create mode 100644 java/flight/flight-integration-tests/src/main/java/org/apache/arrow/flight/integration/tests/ExpirationTimeListActionsScenario.java create mode 100644 java/flight/flight-integration-tests/src/main/java/org/apache/arrow/flight/integration/tests/ExpirationTimeProducer.java create mode 100644 java/flight/flight-integration-tests/src/main/java/org/apache/arrow/flight/integration/tests/ExpirationTimeRefreshFlightEndpointScenario.java create mode 100644 java/flight/flight-sql/src/main/java/org/apache/arrow/flight/sql/CancelStatusListener.java create mode 100644 java/flight/flight-sql/src/main/java/org/apache/arrow/flight/sql/FlightEndpointListener.java diff --git a/dev/archery/archery/integration/runner.py b/dev/archery/archery/integration/runner.py index 767f1ed89d5..db69e60a304 100644 --- a/dev/archery/archery/integration/runner.py +++ b/dev/archery/archery/integration/runner.py @@ -439,32 +439,32 @@ def run_all_tests(with_cpp=True, with_java=True, with_js=True, "expiration_time:do_get", description=("Ensure FlightEndpoint.expiration_time with " "DoGet is working as expected."), - skip={"Java", "JS", "C#", "Rust"}, + skip={"JS", "C#", "Rust"}, ), Scenario( "expiration_time:list_actions", description=("Ensure FlightEndpoint.expiration_time related " "pre-defined actions is working with ListActions " "as expected."), - skip={"Java", "JS", "C#", "Rust"}, + skip={"JS", "C#", "Rust"}, ), Scenario( "expiration_time:cancel_flight_info", description=("Ensure FlightEndpoint.expiration_time and " "CancelFlightInfo are working as expected."), - skip={"Java", "JS", "C#", "Rust"}, + skip={"JS", "C#", "Rust"}, ), Scenario( "expiration_time:close_flight_info", description=("Ensure FlightEndpoint.expiration_time and " "CloseFlightInfo are working as expected."), - skip={"Java", "JS", "C#", "Rust"}, + skip={"JS", "C#", "Rust"}, ), Scenario( "expiration_time:refresh_flight_endpoint", description=("Ensure FlightEndpoint.expiration_time and " "RefreshFlightEndpoint are working as expected."), - skip={"Java", "JS", "C#", "Rust"}, + skip={"JS", "C#", "Rust"}, ), Scenario( "flight_sql", diff --git a/java/flight/flight-core/src/main/java/org/apache/arrow/flight/CancelFlightInfoResult.java b/java/flight/flight-core/src/main/java/org/apache/arrow/flight/CancelFlightInfoResult.java new file mode 100644 index 00000000000..eff5afdeeb7 --- /dev/null +++ b/java/flight/flight-core/src/main/java/org/apache/arrow/flight/CancelFlightInfoResult.java @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.flight; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Objects; + +import org.apache.arrow.flight.impl.Flight; + +/** + * The result of cancelling a FlightInfo. + */ +public class CancelFlightInfoResult { + private final CancelStatus status; + + public CancelFlightInfoResult(CancelStatus status) { + this.status = status; + } + + CancelFlightInfoResult(Flight.CancelFlightInfoResult proto) { + switch (proto.getStatus()) { + case CANCEL_STATUS_UNSPECIFIED: + status = CancelStatus.UNSPECIFIED; + break; + case CANCEL_STATUS_CANCELLED: + status = CancelStatus.CANCELLED; + break; + case CANCEL_STATUS_CANCELLING: + status = CancelStatus.CANCELLING; + break; + case CANCEL_STATUS_NOT_CANCELLABLE: + status = CancelStatus.NOT_CANCELLABLE; + break; + default: + throw new IllegalArgumentException(""); + } + } + + public CancelStatus getStatus() { + return status; + } + + Flight.CancelFlightInfoResult toProtocol() { + Flight.CancelFlightInfoResult.Builder b = Flight.CancelFlightInfoResult.newBuilder(); + switch (status) { + case UNSPECIFIED: + b.setStatus(Flight.CancelStatus.CANCEL_STATUS_UNSPECIFIED); + break; + case CANCELLED: + b.setStatus(Flight.CancelStatus.CANCEL_STATUS_CANCELLED); + break; + case CANCELLING: + b.setStatus(Flight.CancelStatus.CANCEL_STATUS_CANCELLING); + break; + case NOT_CANCELLABLE: + b.setStatus(Flight.CancelStatus.CANCEL_STATUS_NOT_CANCELLABLE); + break; + default: + // Not possible + throw new AssertionError(); + } + return b.build(); + } + + /** + * Get the serialized form of this protocol message. + * + *

Intended to help interoperability by allowing non-Flight services to still return Flight types. + */ + public ByteBuffer serialize() { + return ByteBuffer.wrap(toProtocol().toByteArray()); + } + + /** + * Parse the serialized form of this protocol message. + * + *

Intended to help interoperability by allowing Flight clients to obtain stream info from non-Flight services. + * + * @param serialized The serialized form of the message, as returned by {@link #serialize()}. + * @return The deserialized message. + * @throws IOException if the serialized form is invalid. + */ + public static CancelFlightInfoResult deserialize(ByteBuffer serialized) throws IOException { + return new CancelFlightInfoResult(Flight.CancelFlightInfoResult.parseFrom(serialized)); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + CancelFlightInfoResult that = (CancelFlightInfoResult) o; + return status == that.status; + } + + @Override + public int hashCode() { + return Objects.hash(status); + } + + @Override + public String toString() { + return "CancelFlightInfoResult{" + + "status=" + status + + '}'; + } +} diff --git a/java/flight/flight-core/src/main/java/org/apache/arrow/flight/CancelStatus.java b/java/flight/flight-core/src/main/java/org/apache/arrow/flight/CancelStatus.java new file mode 100644 index 00000000000..e92bcc8b258 --- /dev/null +++ b/java/flight/flight-core/src/main/java/org/apache/arrow/flight/CancelStatus.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.flight; + +/** The result of cancelling a FlightInfo. */ +public enum CancelStatus { + /** + * The cancellation status is unknown. Servers should avoid using + * this value (send a NOT_FOUND error if the requested query is + * not known). Clients can retry the request. + */ + UNSPECIFIED, + /** + * The cancellation request is complete. Subsequent requests with + * the same payload may return CANCELLED or a NOT_FOUND error. + */ + CANCELLED, + /** + * The cancellation request is in progress. The client may retry + * the cancellation request. + */ + CANCELLING, + /** + * The query is not cancellable. The client should not retry the + * cancellation request. + */ + NOT_CANCELLABLE, + ; +} diff --git a/java/flight/flight-core/src/main/java/org/apache/arrow/flight/FlightClient.java b/java/flight/flight-core/src/main/java/org/apache/arrow/flight/FlightClient.java index 8cf2bbaf253..8af5fffd01d 100644 --- a/java/flight/flight-core/src/main/java/org/apache/arrow/flight/FlightClient.java +++ b/java/flight/flight-core/src/main/java/org/apache/arrow/flight/FlightClient.java @@ -17,8 +17,10 @@ package org.apache.arrow.flight; +import java.io.IOException; import java.io.InputStream; import java.net.URISyntaxException; +import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Iterator; import java.util.List; @@ -489,6 +491,79 @@ public void getResult() { } } + /** + * Cancel execution of a distributed query. + * + * @param info The query to cancel. + * @param options Call options. + * @return The server response. + */ + public CancelFlightInfoResult cancelFlightInfo(FlightInfo info, CallOption... options) { + Action action = new Action(FlightConstants.CANCEL_FLIGHT_INFO.getType(), info.serialize().array()); + Iterator results = doAction(action, options); + if (!results.hasNext()) { + throw CallStatus.INTERNAL + .withDescription("Server did not return a response") + .toRuntimeException(); + } + + CancelFlightInfoResult result; + try { + result = CancelFlightInfoResult.deserialize(ByteBuffer.wrap(results.next().getBody())); + } catch (IOException e) { + throw CallStatus.INTERNAL + .withDescription("Failed to parse server response: " + e) + .withCause(e) + .toRuntimeException(); + } + results.forEachRemaining((ignored) -> { + }); + return result; + } + + /** + * Request the server to free resources associated with a query. + * + * @param info The query to close. + * @param options Call options. + */ + public void closeFlightInfo(FlightInfo info, CallOption... options) { + Action action = new Action(FlightConstants.CLOSE_FLIGHT_INFO.getType(), info.serialize().array()); + Iterator results = doAction(action, options); + results.forEachRemaining((ignored) -> { + }); + } + + /** + * Request the server to extend the lifetime of a query result set. + * + * @param endpoint The result set partition. + * @param options Call options. + * @return The new endpoint with an updated expiration time. + */ + public FlightEndpoint refreshFlightEndpoint(FlightEndpoint endpoint, CallOption... options) { + Action action = new Action(FlightConstants.REFRESH_FLIGHT_ENDPOINT.getType(), endpoint.serialize().array()); + Iterator results = doAction(action, options); + if (!results.hasNext()) { + throw CallStatus.INTERNAL + .withDescription("Server did not return a response") + .toRuntimeException(); + } + + FlightEndpoint result; + try { + result = FlightEndpoint.deserialize(ByteBuffer.wrap(results.next().getBody())); + } catch (IOException | URISyntaxException e) { + throw CallStatus.INTERNAL + .withDescription("Failed to parse server response: " + e) + .withCause(e) + .toRuntimeException(); + } + results.forEachRemaining((ignored) -> { + }); + return result; + } + /** * Interface for writers to an Arrow data stream. */ diff --git a/java/flight/flight-core/src/main/java/org/apache/arrow/flight/FlightConstants.java b/java/flight/flight-core/src/main/java/org/apache/arrow/flight/FlightConstants.java index 2d039c9d24e..c7fd978df2f 100644 --- a/java/flight/flight-core/src/main/java/org/apache/arrow/flight/FlightConstants.java +++ b/java/flight/flight-core/src/main/java/org/apache/arrow/flight/FlightConstants.java @@ -26,4 +26,18 @@ public interface FlightConstants { FlightServerMiddleware.Key HEADER_KEY = FlightServerMiddleware.Key.of("org.apache.arrow.flight.ServerHeaderMiddleware"); + + ActionType CANCEL_FLIGHT_INFO = new ActionType("CancelFlightInfo", + "Explicitly cancel a running FlightInfo.\n" + + "Request Message: FlightInfo to be canceled\n" + + "Response Message: ActionCancelFlightInfoResult"); + + ActionType CLOSE_FLIGHT_INFO = new ActionType("CloseFlightInfo", + "Close the given FlightInfo explicitly.\n" + + "Request Message: FlightInfo to be closed\n" + + "Response Message: N/A"); + ActionType REFRESH_FLIGHT_ENDPOINT = new ActionType("RefreshFlightEndpoint", + "Extend expiration time of the given FlightEndpoint.\n" + + "Request Message: FlightEndpoint to be refreshed\n" + + "Response Message: Refreshed FlightEndpoint"); } diff --git a/java/flight/flight-core/src/main/java/org/apache/arrow/flight/FlightEndpoint.java b/java/flight/flight-core/src/main/java/org/apache/arrow/flight/FlightEndpoint.java index 2e46b694dfb..ad78cfbd210 100644 --- a/java/flight/flight-core/src/main/java/org/apache/arrow/flight/FlightEndpoint.java +++ b/java/flight/flight-core/src/main/java/org/apache/arrow/flight/FlightEndpoint.java @@ -17,32 +17,49 @@ package org.apache.arrow.flight; +import java.io.IOException; import java.net.URISyntaxException; +import java.nio.ByteBuffer; +import java.time.Instant; import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; import java.util.List; import java.util.Objects; +import java.util.Optional; import org.apache.arrow.flight.impl.Flight; -import com.google.common.collect.ImmutableList; +import com.google.protobuf.Timestamp; /** * POJO to convert to/from the underlying protobuf FlightEndpoint. */ public class FlightEndpoint { - private List locations; - private Ticket ticket; + private final List locations; + private final Ticket ticket; + private final Instant expirationTime; /** - * Constructs a new instance. + * Constructs a new endpoint with no expiration time. * * @param ticket A ticket that describe the key of a data stream. * @param locations The possible locations the stream can be retrieved from. */ public FlightEndpoint(Ticket ticket, Location... locations) { - super(); + this(ticket, /*expirationTime*/null, locations); + } + + /** + * Constructs a new endpoint with an expiration time. + * + * @param ticket A ticket that describe the key of a data stream. + * @param locations The possible locations the stream can be retrieved from. + */ + public FlightEndpoint(Ticket ticket, Instant expirationTime, Location... locations) { Objects.requireNonNull(ticket); - this.locations = ImmutableList.copyOf(locations); + this.locations = Collections.unmodifiableList(new ArrayList<>(Arrays.asList(locations))); + this.expirationTime = expirationTime; this.ticket = ticket; } @@ -50,11 +67,17 @@ public FlightEndpoint(Ticket ticket, Location... locations) { * Constructs from the protocol buffer representation. */ FlightEndpoint(Flight.FlightEndpoint flt) throws URISyntaxException { - locations = new ArrayList<>(); + this.locations = new ArrayList<>(); for (final Flight.Location location : flt.getLocationList()) { - locations.add(new Location(location.getUri())); + this.locations.add(new Location(location.getUri())); + } + if (flt.hasExpirationTime()) { + this.expirationTime = Instant.ofEpochSecond( + flt.getExpirationTime().getSeconds(), flt.getExpirationTime().getNanos()); + } else { + this.expirationTime = null; } - ticket = new Ticket(flt.getTicket()); + this.ticket = new Ticket(flt.getTicket()); } public List getLocations() { @@ -65,6 +88,10 @@ public Ticket getTicket() { return ticket; } + public Optional getExpirationTime() { + return Optional.ofNullable(expirationTime); + } + /** * Converts to the protocol buffer representation. */ @@ -75,9 +102,41 @@ Flight.FlightEndpoint toProtocol() { for (Location l : locations) { b.addLocation(l.toProtocol()); } + + if (expirationTime != null) { + b.setExpirationTime( + Timestamp.newBuilder() + .setSeconds(expirationTime.getEpochSecond()) + .setNanos(expirationTime.getNano()) + .build()); + } + return b.build(); } + /** + * Get the serialized form of this protocol message. + * + *

Intended to help interoperability by allowing non-Flight services to still return Flight types. + */ + public ByteBuffer serialize() { + return ByteBuffer.wrap(toProtocol().toByteArray()); + } + + /** + * Parse the serialized form of this protocol message. + * + *

Intended to help interoperability by allowing Flight clients to obtain stream info from non-Flight services. + * + * @param serialized The serialized form of the message, as returned by {@link #serialize()}. + * @return The deserialized message. + * @throws IOException if the serialized form is invalid. + * @throws URISyntaxException if the serialized form contains an unsupported URI format. + */ + public static FlightEndpoint deserialize(ByteBuffer serialized) throws IOException, URISyntaxException { + return new FlightEndpoint(Flight.FlightEndpoint.parseFrom(serialized)); + } + @Override public boolean equals(Object o) { if (this == o) { @@ -88,12 +147,13 @@ public boolean equals(Object o) { } FlightEndpoint that = (FlightEndpoint) o; return locations.equals(that.locations) && - ticket.equals(that.ticket); + ticket.equals(that.ticket) && + Objects.equals(expirationTime, that.expirationTime); } @Override public int hashCode() { - return Objects.hash(locations, ticket); + return Objects.hash(locations, ticket, expirationTime); } @Override @@ -101,6 +161,7 @@ public String toString() { return "FlightEndpoint{" + "locations=" + locations + ", ticket=" + ticket + + ", expirationTime=" + (expirationTime == null ? "(none)" : expirationTime.toString()) + '}'; } } diff --git a/java/flight/flight-integration-tests/src/main/java/org/apache/arrow/flight/integration/tests/ExpirationTimeCancelFlightInfoScenario.java b/java/flight/flight-integration-tests/src/main/java/org/apache/arrow/flight/integration/tests/ExpirationTimeCancelFlightInfoScenario.java new file mode 100644 index 00000000000..69ebada517e --- /dev/null +++ b/java/flight/flight-integration-tests/src/main/java/org/apache/arrow/flight/integration/tests/ExpirationTimeCancelFlightInfoScenario.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.flight.integration.tests; + +import java.nio.charset.StandardCharsets; + +import org.apache.arrow.flight.CancelFlightInfoResult; +import org.apache.arrow.flight.CancelStatus; +import org.apache.arrow.flight.FlightClient; +import org.apache.arrow.flight.FlightDescriptor; +import org.apache.arrow.flight.FlightEndpoint; +import org.apache.arrow.flight.FlightInfo; +import org.apache.arrow.flight.FlightProducer; +import org.apache.arrow.flight.FlightRuntimeException; +import org.apache.arrow.flight.FlightServer; +import org.apache.arrow.flight.FlightStream; +import org.apache.arrow.flight.Location; +import org.apache.arrow.memory.BufferAllocator; + +/** Test CancelFlightInfo. */ +final class ExpirationTimeCancelFlightInfoScenario implements Scenario { + @Override + public FlightProducer producer(BufferAllocator allocator, Location location) throws Exception { + return new ExpirationTimeProducer(allocator); + } + + @Override + public void buildServer(FlightServer.Builder builder) { + } + + @Override + public void client(BufferAllocator allocator, Location location, FlightClient client) throws Exception { + FlightInfo info = client.getInfo(FlightDescriptor.command("expiration".getBytes(StandardCharsets.UTF_8))); + CancelFlightInfoResult result = client.cancelFlightInfo(info); + IntegrationAssertions.assertEquals(CancelStatus.CANCELLED, result.getStatus()); + + // All requests should fail + for (FlightEndpoint endpoint : info.getEndpoints()) { + IntegrationAssertions.assertThrows(FlightRuntimeException.class, () -> { + try (FlightStream stream = client.getStream(endpoint.getTicket())) { + while (stream.next()) { + } + } + }); + } + } +} diff --git a/java/flight/flight-integration-tests/src/main/java/org/apache/arrow/flight/integration/tests/ExpirationTimeCloseFlightInfoScenario.java b/java/flight/flight-integration-tests/src/main/java/org/apache/arrow/flight/integration/tests/ExpirationTimeCloseFlightInfoScenario.java new file mode 100644 index 00000000000..b634e709143 --- /dev/null +++ b/java/flight/flight-integration-tests/src/main/java/org/apache/arrow/flight/integration/tests/ExpirationTimeCloseFlightInfoScenario.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.flight.integration.tests; + +import java.nio.charset.StandardCharsets; + +import org.apache.arrow.flight.FlightClient; +import org.apache.arrow.flight.FlightDescriptor; +import org.apache.arrow.flight.FlightEndpoint; +import org.apache.arrow.flight.FlightInfo; +import org.apache.arrow.flight.FlightProducer; +import org.apache.arrow.flight.FlightRuntimeException; +import org.apache.arrow.flight.FlightServer; +import org.apache.arrow.flight.FlightStream; +import org.apache.arrow.flight.Location; +import org.apache.arrow.memory.BufferAllocator; + +/** Test CloseFlightInfo. */ +final class ExpirationTimeCloseFlightInfoScenario implements Scenario { + @Override + public FlightProducer producer(BufferAllocator allocator, Location location) throws Exception { + return new ExpirationTimeProducer(allocator); + } + + @Override + public void buildServer(FlightServer.Builder builder) { + } + + @Override + public void client(BufferAllocator allocator, Location location, FlightClient client) throws Exception { + FlightInfo info = client.getInfo(FlightDescriptor.command("expiration".getBytes(StandardCharsets.UTF_8))); + client.closeFlightInfo(info); + + // All requests should fail + for (FlightEndpoint endpoint : info.getEndpoints()) { + IntegrationAssertions.assertThrows(FlightRuntimeException.class, () -> { + try (FlightStream stream = client.getStream(endpoint.getTicket())) { + while (stream.next()) { + } + } + }); + } + } +} diff --git a/java/flight/flight-integration-tests/src/main/java/org/apache/arrow/flight/integration/tests/ExpirationTimeDoGetScenario.java b/java/flight/flight-integration-tests/src/main/java/org/apache/arrow/flight/integration/tests/ExpirationTimeDoGetScenario.java new file mode 100644 index 00000000000..ec6b9189196 --- /dev/null +++ b/java/flight/flight-integration-tests/src/main/java/org/apache/arrow/flight/integration/tests/ExpirationTimeDoGetScenario.java @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.flight.integration.tests; + +import java.nio.charset.StandardCharsets; +import java.time.Instant; +import java.time.temporal.ChronoUnit; +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; + +import org.apache.arrow.flight.FlightClient; +import org.apache.arrow.flight.FlightDescriptor; +import org.apache.arrow.flight.FlightEndpoint; +import org.apache.arrow.flight.FlightInfo; +import org.apache.arrow.flight.FlightProducer; +import org.apache.arrow.flight.FlightRuntimeException; +import org.apache.arrow.flight.FlightServer; +import org.apache.arrow.flight.FlightStream; +import org.apache.arrow.flight.Location; +import org.apache.arrow.memory.BufferAllocator; +import org.apache.arrow.util.AutoCloseables; +import org.apache.arrow.vector.UInt4Vector; +import org.apache.arrow.vector.VectorLoader; +import org.apache.arrow.vector.VectorSchemaRoot; +import org.apache.arrow.vector.VectorUnloader; +import org.apache.arrow.vector.ipc.message.ArrowRecordBatch; + +/** Test DoGet with expiration times. */ +final class ExpirationTimeDoGetScenario implements Scenario { + @Override + public FlightProducer producer(BufferAllocator allocator, Location location) throws Exception { + return new ExpirationTimeProducer(allocator); + } + + @Override + public void buildServer(FlightServer.Builder builder) { + } + + @Override + public void client(BufferAllocator allocator, Location location, FlightClient client) throws Exception { + FlightInfo info = client.getInfo(FlightDescriptor.command("expiration_time".getBytes(StandardCharsets.UTF_8))); + + List batches = new ArrayList<>(); + + try { + // First read from all endpoints + for (FlightEndpoint endpoint : info.getEndpoints()) { + try (FlightStream stream = client.getStream(endpoint.getTicket())) { + while (stream.next()) { + batches.add(new VectorUnloader(stream.getRoot()).getRecordBatch()); + } + } + } + + // Re-read only from endpoints with expiration time + for (FlightEndpoint endpoint : info.getEndpoints()) { + if (endpoint.getExpirationTime().isPresent()) { + try (FlightStream stream = client.getStream(endpoint.getTicket())) { + while (stream.next()) { + batches.add(new VectorUnloader(stream.getRoot()).getRecordBatch()); + } + } + } else { + IntegrationAssertions.assertThrows(FlightRuntimeException.class, () -> { + try (FlightStream stream = client.getStream(endpoint.getTicket())) { + while (stream.next()) { + } + } + }); + } + } + + // No re-read after expiration + for (FlightEndpoint endpoint : info.getEndpoints()) { + Optional maybeExpiration = endpoint.getExpirationTime(); + if (!maybeExpiration.isPresent()) { + continue; + } + Instant now = Instant.now(); + Instant expiration = maybeExpiration.get(); + if (expiration.isAfter(now)) { + Thread.sleep(ChronoUnit.MILLIS.between(now, expiration) + 1); + } + IntegrationAssertions.assertThrows(FlightRuntimeException.class, () -> { + try (FlightStream stream = client.getStream(endpoint.getTicket())) { + while (stream.next()) { + } + } + }); + } + + // Check data + IntegrationAssertions.assertEquals(5, batches.size()); + try (final VectorSchemaRoot root = VectorSchemaRoot.create(ExpirationTimeProducer.SCHEMA, allocator)) { + final VectorLoader loader = new VectorLoader(root); + + loader.load(batches.get(0)); + IntegrationAssertions.assertEquals(1, root.getRowCount()); + IntegrationAssertions.assertEquals(0, ((UInt4Vector) root.getVector(0)).getObject(0)); + + loader.load(batches.get(1)); + IntegrationAssertions.assertEquals(1, root.getRowCount()); + IntegrationAssertions.assertEquals(1, ((UInt4Vector) root.getVector(0)).getObject(0)); + + loader.load(batches.get(2)); + IntegrationAssertions.assertEquals(1, root.getRowCount()); + IntegrationAssertions.assertEquals(2, ((UInt4Vector) root.getVector(0)).getObject(0)); + + loader.load(batches.get(3)); + IntegrationAssertions.assertEquals(1, root.getRowCount()); + IntegrationAssertions.assertEquals(1, ((UInt4Vector) root.getVector(0)).getObject(0)); + + loader.load(batches.get(4)); + IntegrationAssertions.assertEquals(1, root.getRowCount()); + IntegrationAssertions.assertEquals(2, ((UInt4Vector) root.getVector(0)).getObject(0)); + } + } finally { + AutoCloseables.close(batches); + } + } +} diff --git a/java/flight/flight-integration-tests/src/main/java/org/apache/arrow/flight/integration/tests/ExpirationTimeListActionsScenario.java b/java/flight/flight-integration-tests/src/main/java/org/apache/arrow/flight/integration/tests/ExpirationTimeListActionsScenario.java new file mode 100644 index 00000000000..bd234825cc1 --- /dev/null +++ b/java/flight/flight-integration-tests/src/main/java/org/apache/arrow/flight/integration/tests/ExpirationTimeListActionsScenario.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.flight.integration.tests; + +import java.util.Iterator; + +import org.apache.arrow.flight.ActionType; +import org.apache.arrow.flight.FlightClient; +import org.apache.arrow.flight.FlightConstants; +import org.apache.arrow.flight.FlightProducer; +import org.apache.arrow.flight.FlightServer; +import org.apache.arrow.flight.Location; +import org.apache.arrow.memory.BufferAllocator; + +/** Test ListActions with expiration times. */ +final class ExpirationTimeListActionsScenario implements Scenario { + @Override + public FlightProducer producer(BufferAllocator allocator, Location location) throws Exception { + return new ExpirationTimeProducer(allocator); + } + + @Override + public void buildServer(FlightServer.Builder builder) { + } + + @Override + public void client(BufferAllocator allocator, Location location, FlightClient client) throws Exception { + Iterator actions = client.listActions().iterator(); + IntegrationAssertions.assertTrue("Expected 3 actions", actions.hasNext()); + ActionType action = actions.next(); + IntegrationAssertions.assertEquals(FlightConstants.CANCEL_FLIGHT_INFO.getType(), action.getType()); + + IntegrationAssertions.assertTrue("Expected 3 actions", actions.hasNext()); + action = actions.next(); + IntegrationAssertions.assertEquals(FlightConstants.CLOSE_FLIGHT_INFO.getType(), action.getType()); + + IntegrationAssertions.assertTrue("Expected 3 actions", actions.hasNext()); + action = actions.next(); + IntegrationAssertions.assertEquals(FlightConstants.REFRESH_FLIGHT_ENDPOINT.getType(), action.getType()); + + IntegrationAssertions.assertFalse("Expected 3 actions", actions.hasNext()); + } +} diff --git a/java/flight/flight-integration-tests/src/main/java/org/apache/arrow/flight/integration/tests/ExpirationTimeProducer.java b/java/flight/flight-integration-tests/src/main/java/org/apache/arrow/flight/integration/tests/ExpirationTimeProducer.java new file mode 100644 index 00000000000..0ee68adf8b7 --- /dev/null +++ b/java/flight/flight-integration-tests/src/main/java/org/apache/arrow/flight/integration/tests/ExpirationTimeProducer.java @@ -0,0 +1,239 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.flight.integration.tests; + +import java.io.IOException; +import java.net.URISyntaxException; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.time.Instant; +import java.time.temporal.ChronoUnit; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +import org.apache.arrow.flight.Action; +import org.apache.arrow.flight.ActionType; +import org.apache.arrow.flight.CallStatus; +import org.apache.arrow.flight.CancelFlightInfoResult; +import org.apache.arrow.flight.CancelStatus; +import org.apache.arrow.flight.FlightConstants; +import org.apache.arrow.flight.FlightDescriptor; +import org.apache.arrow.flight.FlightEndpoint; +import org.apache.arrow.flight.FlightInfo; +import org.apache.arrow.flight.Location; +import org.apache.arrow.flight.NoOpFlightProducer; +import org.apache.arrow.flight.Result; +import org.apache.arrow.flight.Ticket; +import org.apache.arrow.memory.BufferAllocator; +import org.apache.arrow.vector.UInt4Vector; +import org.apache.arrow.vector.VectorSchemaRoot; +import org.apache.arrow.vector.types.Types; +import org.apache.arrow.vector.types.pojo.Field; +import org.apache.arrow.vector.types.pojo.Schema; + +/** The server used for testing FlightEndpoint.expiration_time. + *

+ * GetFlightInfo() returns a FlightInfo that has the following + * three FlightEndpoints: + * + *

    + *
  1. No expiration time
  2. + *
  3. 2 seconds expiration time
  4. + *
  5. 3 seconds expiration time
  6. + *
+ * + * The client can't read data from the first endpoint multiple times + * but can read data from the second and third endpoints. The client + * can't re-read data from the second endpoint 2 seconds later. The + * client can't re-read data from the third endpoint 3 seconds + * later. + *

+ * The client can cancel a returned FlightInfo by pre-defined + * CancelFlightInfo action. The client can't read data from endpoints + * even within 3 seconds after the action. + *

+ * The client can extend the expiration time of a FlightEndpoint in + * a returned FlightInfo by pre-defined RefreshFlightEndpoint + * action. The client can read data from endpoints multiple times + * within more 10 seconds after the action. + *

+ * The client can close a returned FlightInfo explicitly by + * pre-defined CloseFlightInfo action. The client can't read data + * from endpoints even within 3 seconds after the action. + */ +final class ExpirationTimeProducer extends NoOpFlightProducer { + public static final Schema SCHEMA = new Schema( + Collections.singletonList(Field.notNullable("number", Types.MinorType.UINT4.getType()))); + + private final BufferAllocator allocator; + private final List statuses; + + ExpirationTimeProducer(BufferAllocator allocator) { + this.allocator = allocator; + this.statuses = new ArrayList<>(); + } + + @Override + public FlightInfo getFlightInfo(CallContext context, FlightDescriptor descriptor) { + statuses.clear(); + List endpoints = new ArrayList<>(); + Instant now = Instant.now(); + endpoints.add(addEndpoint("No expiration time", null)); + endpoints.add(addEndpoint("2 seconds", now.plus(2, ChronoUnit.SECONDS))); + endpoints.add(addEndpoint("3 seconds", now.plus(3, ChronoUnit.SECONDS))); + return new FlightInfo(SCHEMA, descriptor, endpoints, -1, -1); + } + + @Override + public void getStream(CallContext context, Ticket ticket, ServerStreamListener listener) { + // Obviously, not safe (since we don't lock), but we assume calls are not concurrent + int index = parseIndexFromTicket(ticket); + EndpointStatus status = statuses.get(index); + if (status.closed) { + listener.error(CallStatus.NOT_FOUND + .withDescription("Invalid flight: closed") + .toRuntimeException()); + return; + } else if (status.cancelled) { + listener.error(CallStatus.NOT_FOUND + .withDescription("Invalid flight: cancelled") + .toRuntimeException()); + return; + } else if (status.expirationTime != null && Instant.now().isAfter(status.expirationTime)) { + listener.error(CallStatus.NOT_FOUND + .withDescription("Invalid flight: expired") + .toRuntimeException()); + return; + } else if (status.expirationTime == null && status.numGets > 0) { + listener.error(CallStatus.NOT_FOUND + .withDescription("Invalid flight: can't read multiple times") + .toRuntimeException()); + return; + } + status.numGets++; + + try (final VectorSchemaRoot root = VectorSchemaRoot.create(SCHEMA, allocator)) { + listener.start(root); + UInt4Vector vector = (UInt4Vector) root.getVector(0); + vector.setSafe(0, index); + root.setRowCount(1); + listener.putNext(); + } + listener.completed(); + } + + @Override + public void doAction(CallContext context, Action action, StreamListener listener) { + try { + if (action.getType().equals(FlightConstants.CANCEL_FLIGHT_INFO.getType())) { + FlightInfo info = FlightInfo.deserialize(ByteBuffer.wrap(action.getBody())); + CancelStatus cancelStatus = CancelStatus.UNSPECIFIED; + for (FlightEndpoint endpoint : info.getEndpoints()) { + int index = parseIndexFromTicket(endpoint.getTicket()); + EndpointStatus status = statuses.get(index); + if (status.cancelled) { + cancelStatus = CancelStatus.NOT_CANCELLABLE; + } else { + status.cancelled = true; + if (cancelStatus == CancelStatus.UNSPECIFIED) { + cancelStatus = CancelStatus.CANCELLED; + } + } + } + listener.onNext(new Result(new CancelFlightInfoResult(cancelStatus).serialize().array())); + } else if (action.getType().equals(FlightConstants.CLOSE_FLIGHT_INFO.getType())) { + FlightInfo info = FlightInfo.deserialize(ByteBuffer.wrap(action.getBody())); + for (FlightEndpoint endpoint : info.getEndpoints()) { + int index = parseIndexFromTicket(endpoint.getTicket()); + statuses.get(index).closed = true; + } + } else if (action.getType().equals(FlightConstants.REFRESH_FLIGHT_ENDPOINT.getType())) { + FlightEndpoint endpoint = FlightEndpoint.deserialize(ByteBuffer.wrap(action.getBody())); + int index = parseIndexFromTicket(endpoint.getTicket()); + EndpointStatus status = statuses.get(index); + if (status.closed) { + listener.onError(CallStatus.INVALID_ARGUMENT + .withDescription("Invalid flight: cancelled") + .toRuntimeException()); + return; + } + + String ticketBody = new String(endpoint.getTicket().getBytes(), StandardCharsets.UTF_8); + ticketBody += ": refreshed (+ 10 seconds)"; + Ticket ticket = new Ticket(ticketBody.getBytes(StandardCharsets.UTF_8)); + Instant expiration = Instant.now().plus(10, ChronoUnit.SECONDS); + status.expirationTime = expiration; + FlightEndpoint newEndpoint = new FlightEndpoint( + ticket, expiration, endpoint.getLocations().toArray(new Location[0])); + listener.onNext(new Result(newEndpoint.serialize().array())); + } else { + listener.onError(CallStatus.INVALID_ARGUMENT + .withDescription("Unknown action: " + action.getType()) + .toRuntimeException()); + return; + } + } catch (IOException | URISyntaxException e) { + listener.onError(CallStatus.INTERNAL.withCause(e).withDescription(e.toString()).toRuntimeException()); + return; + } + listener.onCompleted(); + } + + @Override + public void listActions(CallContext context, StreamListener listener) { + listener.onNext(FlightConstants.CANCEL_FLIGHT_INFO); + listener.onNext(FlightConstants.CLOSE_FLIGHT_INFO); + listener.onNext(FlightConstants.REFRESH_FLIGHT_ENDPOINT); + listener.onCompleted(); + } + + private FlightEndpoint addEndpoint(String ticket, Instant expirationTime) { + Ticket flightTicket = new Ticket(String.format("%d:%s", statuses.size(), ticket).getBytes(StandardCharsets.UTF_8)); + statuses.add(new EndpointStatus(expirationTime)); + return new FlightEndpoint(flightTicket, expirationTime); + } + + private int parseIndexFromTicket(Ticket ticket) { + final String contents = new String(ticket.getBytes(), StandardCharsets.UTF_8); + int index = contents.indexOf(':'); + if (index == -1) { + throw CallStatus.INVALID_ARGUMENT.withDescription("Invalid ticket").toRuntimeException(); + } + int endpointIndex = Integer.parseInt(contents.substring(0, index)); + if (endpointIndex < 0 || endpointIndex >= statuses.size()) { + throw CallStatus.NOT_FOUND.withDescription("Out of bounds").toRuntimeException(); + } + return endpointIndex; + } + + /** The status of a returned endpoint. */ + static final class EndpointStatus { + Instant expirationTime; + int numGets; + boolean cancelled; + boolean closed; + + EndpointStatus(Instant expirationTime) { + this.expirationTime = expirationTime; + this.numGets = 0; + this.cancelled = false; + this.closed = false; + } + } +} diff --git a/java/flight/flight-integration-tests/src/main/java/org/apache/arrow/flight/integration/tests/ExpirationTimeRefreshFlightEndpointScenario.java b/java/flight/flight-integration-tests/src/main/java/org/apache/arrow/flight/integration/tests/ExpirationTimeRefreshFlightEndpointScenario.java new file mode 100644 index 00000000000..ad859e0bc9b --- /dev/null +++ b/java/flight/flight-integration-tests/src/main/java/org/apache/arrow/flight/integration/tests/ExpirationTimeRefreshFlightEndpointScenario.java @@ -0,0 +1,141 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.flight.integration.tests; + +import java.nio.charset.StandardCharsets; +import java.time.Instant; +import java.time.temporal.ChronoUnit; +import java.util.ArrayList; +import java.util.List; + +import org.apache.arrow.flight.FlightClient; +import org.apache.arrow.flight.FlightDescriptor; +import org.apache.arrow.flight.FlightEndpoint; +import org.apache.arrow.flight.FlightInfo; +import org.apache.arrow.flight.FlightProducer; +import org.apache.arrow.flight.FlightServer; +import org.apache.arrow.flight.FlightStream; +import org.apache.arrow.flight.Location; +import org.apache.arrow.memory.BufferAllocator; +import org.apache.arrow.util.AutoCloseables; +import org.apache.arrow.vector.UInt4Vector; +import org.apache.arrow.vector.VectorLoader; +import org.apache.arrow.vector.VectorSchemaRoot; +import org.apache.arrow.vector.VectorUnloader; +import org.apache.arrow.vector.ipc.message.ArrowRecordBatch; + +/** Test RefreshFlightEndpoint. */ +final class ExpirationTimeRefreshFlightEndpointScenario implements Scenario { + @Override + public FlightProducer producer(BufferAllocator allocator, Location location) throws Exception { + return new ExpirationTimeProducer(allocator); + } + + @Override + public void buildServer(FlightServer.Builder builder) { + } + + @Override + public void client(BufferAllocator allocator, Location location, FlightClient client) throws Exception { + FlightInfo info = client.getInfo(FlightDescriptor.command("expiration".getBytes(StandardCharsets.UTF_8))); + + List batches = new ArrayList<>(); + + try { + // First read from all endpoints + for (FlightEndpoint endpoint : info.getEndpoints()) { + try (FlightStream stream = client.getStream(endpoint.getTicket())) { + while (stream.next()) { + batches.add(new VectorUnloader(stream.getRoot()).getRecordBatch()); + } + } + } + + // Refresh all endpoints with expiration time + List refreshedEndpoints = new ArrayList<>(); + Instant maxExpiration = null; + for (FlightEndpoint endpoint : info.getEndpoints()) { + if (!endpoint.getExpirationTime().isPresent()) { + continue; + } + Instant expiration = endpoint.getExpirationTime().get(); + if (maxExpiration == null || expiration.isAfter(maxExpiration)) { + maxExpiration = expiration; + } + FlightEndpoint refreshed = client.refreshFlightEndpoint(endpoint); + + IntegrationAssertions.assertTrue("Refreshed FlightEndpoint must have expiration time", + refreshed.getExpirationTime().isPresent()); + IntegrationAssertions.assertTrue("Refreshed FlightEndpoint must have newer expiration time", + refreshed.getExpirationTime().get().isAfter(expiration)); + + refreshedEndpoints.add(refreshed); + } + IntegrationAssertions.assertNotNull(maxExpiration); + + for (FlightEndpoint endpoint : refreshedEndpoints) { + IntegrationAssertions.assertFalse( + "One of the refreshed endpoints expires before one of the original endpoints", + endpoint.getExpirationTime().get().isBefore(maxExpiration)); + } + + // Expire all original endpoints + Instant now = Instant.now(); + if (maxExpiration.isAfter(now)) { + Thread.sleep(ChronoUnit.MILLIS.between(now, maxExpiration) + 1); + } + + // Re-read from refreshed endpoints + for (FlightEndpoint endpoint : refreshedEndpoints) { + try (FlightStream stream = client.getStream(endpoint.getTicket())) { + while (stream.next()) { + batches.add(new VectorUnloader(stream.getRoot()).getRecordBatch()); + } + } + } + + // Check data + IntegrationAssertions.assertEquals(5, batches.size()); + try (final VectorSchemaRoot root = VectorSchemaRoot.create(ExpirationTimeProducer.SCHEMA, allocator)) { + final VectorLoader loader = new VectorLoader(root); + + loader.load(batches.get(0)); + IntegrationAssertions.assertEquals(1, root.getRowCount()); + IntegrationAssertions.assertEquals(0, ((UInt4Vector) root.getVector(0)).getObject(0)); + + loader.load(batches.get(1)); + IntegrationAssertions.assertEquals(1, root.getRowCount()); + IntegrationAssertions.assertEquals(1, ((UInt4Vector) root.getVector(0)).getObject(0)); + + loader.load(batches.get(2)); + IntegrationAssertions.assertEquals(1, root.getRowCount()); + IntegrationAssertions.assertEquals(2, ((UInt4Vector) root.getVector(0)).getObject(0)); + + loader.load(batches.get(3)); + IntegrationAssertions.assertEquals(1, root.getRowCount()); + IntegrationAssertions.assertEquals(1, ((UInt4Vector) root.getVector(0)).getObject(0)); + + loader.load(batches.get(4)); + IntegrationAssertions.assertEquals(1, root.getRowCount()); + IntegrationAssertions.assertEquals(2, ((UInt4Vector) root.getVector(0)).getObject(0)); + } + } finally { + AutoCloseables.close(batches); + } + } +} diff --git a/java/flight/flight-integration-tests/src/main/java/org/apache/arrow/flight/integration/tests/IntegrationAssertions.java b/java/flight/flight-integration-tests/src/main/java/org/apache/arrow/flight/integration/tests/IntegrationAssertions.java index a60efcbb78d..036bcc3811b 100644 --- a/java/flight/flight-integration-tests/src/main/java/org/apache/arrow/flight/integration/tests/IntegrationAssertions.java +++ b/java/flight/flight-integration-tests/src/main/java/org/apache/arrow/flight/integration/tests/IntegrationAssertions.java @@ -88,6 +88,12 @@ static void assertTrue(String message, boolean value) { } } + static void assertNotNull(Object actual) { + if (actual == null) { + throw new AssertionError("Expected: (not null)\n\nbut got: null\n"); + } + } + /** * Convert a throwable into a FlightRuntimeException with error details, for debugging. */ diff --git a/java/flight/flight-integration-tests/src/main/java/org/apache/arrow/flight/integration/tests/Scenarios.java b/java/flight/flight-integration-tests/src/main/java/org/apache/arrow/flight/integration/tests/Scenarios.java index c2e10fcf47e..d9cd70c5f59 100644 --- a/java/flight/flight-integration-tests/src/main/java/org/apache/arrow/flight/integration/tests/Scenarios.java +++ b/java/flight/flight-integration-tests/src/main/java/org/apache/arrow/flight/integration/tests/Scenarios.java @@ -40,6 +40,11 @@ final class Scenarios { private Scenarios() { scenarios = new TreeMap<>(); scenarios.put("auth:basic_proto", AuthBasicProtoScenario::new); + scenarios.put("expiration_time:cancel_flight_info", ExpirationTimeCancelFlightInfoScenario::new); + scenarios.put("expiration_time:close_flight_info", ExpirationTimeCloseFlightInfoScenario::new); + scenarios.put("expiration_time:refresh_flight_endpoint", ExpirationTimeRefreshFlightEndpointScenario::new); + scenarios.put("expiration_time:do_get", ExpirationTimeDoGetScenario::new); + scenarios.put("expiration_time:list_actions", ExpirationTimeListActionsScenario::new); scenarios.put("middleware", MiddlewareScenario::new); scenarios.put("ordered", OrderedScenario::new); scenarios.put("flight_sql", FlightSqlScenario::new); diff --git a/java/flight/flight-integration-tests/src/test/java/org/apache/arrow/flight/integration/tests/IntegrationTest.java b/java/flight/flight-integration-tests/src/test/java/org/apache/arrow/flight/integration/tests/IntegrationTest.java index 4507dfb1292..083de1cbdc1 100644 --- a/java/flight/flight-integration-tests/src/test/java/org/apache/arrow/flight/integration/tests/IntegrationTest.java +++ b/java/flight/flight-integration-tests/src/test/java/org/apache/arrow/flight/integration/tests/IntegrationTest.java @@ -33,6 +33,31 @@ void authBasicProto() throws Exception { testScenario("auth:basic_proto"); } + @Test + void expirationTimeCancelFlightInfo() throws Exception { + testScenario("expiration_time:cancel_flight_info"); + } + + @Test + void expirationTimeCloseFlightInfo() throws Exception { + testScenario("expiration_time:close_flight_info"); + } + + @Test + void expirationTimeDoGet() throws Exception { + testScenario("expiration_time:do_get"); + } + + @Test + void expirationTimeListActions() throws Exception { + testScenario("expiration_time:list_actions"); + } + + @Test + void expirationTimeRefreshFlightEndpoint() throws Exception { + testScenario("expiration_time:refresh_flight_endpoint"); + } + @Test void middleware() throws Exception { testScenario("middleware"); diff --git a/java/flight/flight-sql/src/main/java/org/apache/arrow/flight/sql/CancelListener.java b/java/flight/flight-sql/src/main/java/org/apache/arrow/flight/sql/CancelListener.java index 3438f788dcf..e7b645be747 100644 --- a/java/flight/flight-sql/src/main/java/org/apache/arrow/flight/sql/CancelListener.java +++ b/java/flight/flight-sql/src/main/java/org/apache/arrow/flight/sql/CancelListener.java @@ -24,6 +24,7 @@ import com.google.protobuf.Any; /** Typed StreamListener for cancelQuery. */ +@SuppressWarnings("deprecation") class CancelListener implements FlightProducer.StreamListener { private final FlightProducer.StreamListener listener; diff --git a/java/flight/flight-sql/src/main/java/org/apache/arrow/flight/sql/CancelResult.java b/java/flight/flight-sql/src/main/java/org/apache/arrow/flight/sql/CancelResult.java index d1ae4178310..28fa197cd60 100644 --- a/java/flight/flight-sql/src/main/java/org/apache/arrow/flight/sql/CancelResult.java +++ b/java/flight/flight-sql/src/main/java/org/apache/arrow/flight/sql/CancelResult.java @@ -21,7 +21,10 @@ /** * The result of cancelling a query. + * + * @deprecated Prefer {@link org.apache.arrow.flight.CancelStatus}. */ +@Deprecated public enum CancelResult { UNSPECIFIED, CANCELLED, diff --git a/java/flight/flight-sql/src/main/java/org/apache/arrow/flight/sql/CancelStatusListener.java b/java/flight/flight-sql/src/main/java/org/apache/arrow/flight/sql/CancelStatusListener.java new file mode 100644 index 00000000000..e7f5beee4d3 --- /dev/null +++ b/java/flight/flight-sql/src/main/java/org/apache/arrow/flight/sql/CancelStatusListener.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.flight.sql; + +import org.apache.arrow.flight.CancelFlightInfoResult; +import org.apache.arrow.flight.CancelStatus; +import org.apache.arrow.flight.FlightProducer; +import org.apache.arrow.flight.Result; + +/** Typed StreamListener for cancelFlightInfo. */ +class CancelStatusListener implements FlightProducer.StreamListener { + private final FlightProducer.StreamListener listener; + + CancelStatusListener(FlightProducer.StreamListener listener) { + this.listener = listener; + } + + @Override + public void onNext(CancelStatus val) { + listener.onNext(new Result(new CancelFlightInfoResult(val).serialize().array())); + } + + @Override + public void onError(Throwable t) { + listener.onError(t); + } + + @Override + public void onCompleted() { + listener.onCompleted(); + } +} diff --git a/java/flight/flight-sql/src/main/java/org/apache/arrow/flight/sql/FlightEndpointListener.java b/java/flight/flight-sql/src/main/java/org/apache/arrow/flight/sql/FlightEndpointListener.java new file mode 100644 index 00000000000..aa9357c3a48 --- /dev/null +++ b/java/flight/flight-sql/src/main/java/org/apache/arrow/flight/sql/FlightEndpointListener.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.flight.sql; + +import org.apache.arrow.flight.FlightEndpoint; +import org.apache.arrow.flight.FlightProducer; +import org.apache.arrow.flight.Result; + +/** Typed StreamListener for refreshFlightEndpoint. */ +public class FlightEndpointListener implements FlightProducer.StreamListener { + private final FlightProducer.StreamListener listener; + + FlightEndpointListener(FlightProducer.StreamListener listener) { + this.listener = listener; + } + + @Override + public void onNext(FlightEndpoint val) { + listener.onNext(new Result(val.serialize().array())); + } + + @Override + public void onError(Throwable t) { + listener.onError(t); + } + + @Override + public void onCompleted() { + listener.onCompleted(); + } +} diff --git a/java/flight/flight-sql/src/main/java/org/apache/arrow/flight/sql/FlightSqlClient.java b/java/flight/flight-sql/src/main/java/org/apache/arrow/flight/sql/FlightSqlClient.java index 922495a18e0..63aee483b57 100644 --- a/java/flight/flight-sql/src/main/java/org/apache/arrow/flight/sql/FlightSqlClient.java +++ b/java/flight/flight-sql/src/main/java/org/apache/arrow/flight/sql/FlightSqlClient.java @@ -59,8 +59,10 @@ import org.apache.arrow.flight.Action; import org.apache.arrow.flight.CallOption; import org.apache.arrow.flight.CallStatus; +import org.apache.arrow.flight.CancelFlightInfoResult; import org.apache.arrow.flight.FlightClient; import org.apache.arrow.flight.FlightDescriptor; +import org.apache.arrow.flight.FlightEndpoint; import org.apache.arrow.flight.FlightInfo; import org.apache.arrow.flight.FlightStream; import org.apache.arrow.flight.PutResult; @@ -851,6 +853,27 @@ public void rollback(Savepoint savepoint, CallOption... options) { preparedStatementResults.forEachRemaining((ignored) -> { }); } + /** + * Cancel execution of a distributed query. + * + * @param info The query to cancel. + * @param options Call options. + * @return The server response. + */ + public CancelFlightInfoResult cancelFlightInfo(FlightInfo info, CallOption... options) { + return client.cancelFlightInfo(info, options); + } + + /** + * Request the server to free resources associated with a query. + * + * @param info The query to close. + * @param options Call options. + */ + public void closeFlightInfo(FlightInfo info, CallOption... options) { + client.closeFlightInfo(info, options); + } + /** * Explicitly cancel a running query. *

@@ -860,7 +883,10 @@ public void rollback(Savepoint savepoint, CallOption... options) { * commit or rollback as appropriate. This only indicates the client no longer * wishes to read the remainder of the query results or continue submitting * data. + * + * @deprecated Prefer {@link #cancelFlightInfo}. */ + @Deprecated public CancelResult cancelQuery(FlightInfo info, CallOption... options) { ActionCancelQueryRequest request = ActionCancelQueryRequest.newBuilder() .setInfo(ByteString.copyFrom(info.serialize())) @@ -888,6 +914,17 @@ public CancelResult cancelQuery(FlightInfo info, CallOption... options) { } } + /** + * Request the server to extend the lifetime of a query result set. + * + * @param endpoint The result set partition. + * @param options Call options. + * @return The new endpoint with an updated expiration time. + */ + public FlightEndpoint refreshFlightEndpoint(FlightEndpoint endpoint, CallOption... options) { + return client.refreshFlightEndpoint(endpoint, options); + } + @Override public void close() throws Exception { AutoCloseables.close(client); diff --git a/java/flight/flight-sql/src/main/java/org/apache/arrow/flight/sql/FlightSqlProducer.java b/java/flight/flight-sql/src/main/java/org/apache/arrow/flight/sql/FlightSqlProducer.java index 00a83667990..249796dc66d 100644 --- a/java/flight/flight-sql/src/main/java/org/apache/arrow/flight/sql/FlightSqlProducer.java +++ b/java/flight/flight-sql/src/main/java/org/apache/arrow/flight/sql/FlightSqlProducer.java @@ -48,12 +48,16 @@ import java.io.IOException; import java.net.URISyntaxException; +import java.nio.ByteBuffer; import java.util.List; import org.apache.arrow.flight.Action; import org.apache.arrow.flight.ActionType; import org.apache.arrow.flight.CallStatus; +import org.apache.arrow.flight.CancelStatus; +import org.apache.arrow.flight.FlightConstants; import org.apache.arrow.flight.FlightDescriptor; +import org.apache.arrow.flight.FlightEndpoint; import org.apache.arrow.flight.FlightInfo; import org.apache.arrow.flight.FlightProducer; import org.apache.arrow.flight.FlightStream; @@ -319,6 +323,7 @@ default void doAction(CallContext context, Action action, StreamListener FlightSqlUtils.unpackAndParseOrThrow(action.getBody(), ActionBeginTransactionRequest.class); beginTransaction(request, context, new ProtoListener<>(listener)); } else if (actionType.equals(FlightSqlUtils.FLIGHT_SQL_CANCEL_QUERY.getType())) { + //noinspection deprecation final ActionCancelQueryRequest request = FlightSqlUtils.unpackAndParseOrThrow(action.getBody(), ActionCancelQueryRequest.class); final FlightInfo info; @@ -352,6 +357,42 @@ default void doAction(CallContext context, Action action, StreamListener ActionEndTransactionRequest request = FlightSqlUtils.unpackAndParseOrThrow(action.getBody(), ActionEndTransactionRequest.class); endTransaction(request, context, new NoResultListener(listener)); + } else if (actionType.equals(FlightConstants.CANCEL_FLIGHT_INFO.getType())) { + final FlightInfo info; + try { + info = FlightInfo.deserialize(ByteBuffer.wrap(action.getBody())); + } catch (IOException | URISyntaxException e) { + listener.onError(CallStatus.INTERNAL + .withDescription("Could not unpack FlightInfo: " + e) + .withCause(e) + .toRuntimeException()); + return; + } + cancelFlightInfo(info, context, new CancelStatusListener(listener)); + } else if (actionType.equals(FlightConstants.CLOSE_FLIGHT_INFO.getType())) { + final FlightInfo info; + try { + info = FlightInfo.deserialize(ByteBuffer.wrap(action.getBody())); + } catch (IOException | URISyntaxException e) { + listener.onError(CallStatus.INTERNAL + .withDescription("Could not unpack FlightInfo: " + e) + .withCause(e) + .toRuntimeException()); + return; + } + closeFlightInfo(info, context, new NoResultListener(listener)); + } else if (actionType.equals(FlightConstants.REFRESH_FLIGHT_ENDPOINT.getType())) { + final FlightEndpoint endpoint; + try { + endpoint = FlightEndpoint.deserialize(ByteBuffer.wrap(action.getBody())); + } catch (IOException | URISyntaxException e) { + listener.onError(CallStatus.INTERNAL + .withDescription("Could not unpack FlightInfo: " + e) + .withCause(e) + .toRuntimeException()); + return; + } + refreshFlightEndpoint(endpoint, context, new FlightEndpointListener(listener)); } else { throw CallStatus.INVALID_ARGUMENT .withDescription("Unrecognized request: " + action.getType()) @@ -383,13 +424,38 @@ default void beginTransaction(ActionBeginTransactionRequest request, CallContext listener.onError(CallStatus.UNIMPLEMENTED.toRuntimeException()); } + /** + * Explicitly cancel a query. + * + * @param info The FlightInfo of the query to cancel. + * @param context Per-call context. + * @param listener An interface for sending data back to the client. + */ + default void cancelFlightInfo(FlightInfo info, CallContext context, StreamListener listener) { + listener.onError(CallStatus.UNIMPLEMENTED.toRuntimeException()); + } + + + /** + * Explicitly free resources associated with a query. + * + * @param info The FlightInfo of the query to close. + * @param context Per-call context. + * @param listener An interface for sending data back to the client. + */ + default void closeFlightInfo(FlightInfo info, CallContext context, StreamListener listener) { + listener.onError(CallStatus.UNIMPLEMENTED.toRuntimeException()); + } + /** * Explicitly cancel a query. * * @param info The FlightInfo of the query to cancel. * @param context Per-call context. * @param listener Whether cancellation succeeded. + * @deprecated Prefer {@link #cancelFlightInfo(FlightInfo, CallContext, StreamListener)}. */ + @Deprecated default void cancelQuery(FlightInfo info, CallContext context, StreamListener listener) { listener.onError(CallStatus.UNIMPLEMENTED.toRuntimeException()); } @@ -811,6 +877,17 @@ void getStreamImportedKeys(CommandGetImportedKeys command, CallContext context, void getStreamCrossReference(CommandGetCrossReference command, CallContext context, ServerStreamListener listener); + /** + * Refresh the duration of the given endpoint. + * + * @param endpoint The endpoint to refresh. + * @param context Per-call context. + * @param listener An interface for sending data back to the client. + */ + default void refreshFlightEndpoint(FlightEndpoint endpoint, CallContext context, + StreamListener listener) { + listener.onError(CallStatus.UNIMPLEMENTED.toRuntimeException()); + } /** * Default schema templates for the {@link FlightSqlProducer}.