Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions dev/archery/archery/integration/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -439,32 +439,32 @@ def run_all_tests(with_cpp=True, with_java=True, with_js=True,
"expiration_time:do_get",
description=("Ensure FlightEndpoint.expiration_time with "
"DoGet is working as expected."),
skip={"Java", "JS", "C#", "Rust"},
skip={"JS", "C#", "Rust"},
),
Scenario(
"expiration_time:list_actions",
description=("Ensure FlightEndpoint.expiration_time related "
"pre-defined actions is working with ListActions "
"as expected."),
skip={"Java", "JS", "C#", "Rust"},
skip={"JS", "C#", "Rust"},
),
Scenario(
"expiration_time:cancel_flight_info",
description=("Ensure FlightEndpoint.expiration_time and "
"CancelFlightInfo are working as expected."),
skip={"Java", "JS", "C#", "Rust"},
skip={"JS", "C#", "Rust"},
),
Scenario(
"expiration_time:close_flight_info",
description=("Ensure FlightEndpoint.expiration_time and "
"CloseFlightInfo are working as expected."),
skip={"Java", "JS", "C#", "Rust"},
skip={"JS", "C#", "Rust"},
),
Scenario(
"expiration_time:refresh_flight_endpoint",
description=("Ensure FlightEndpoint.expiration_time and "
"RefreshFlightEndpoint are working as expected."),
skip={"Java", "JS", "C#", "Rust"},
skip={"JS", "C#", "Rust"},
),
Scenario(
"flight_sql",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.arrow.flight;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Objects;

import org.apache.arrow.flight.impl.Flight;

/**
* The result of cancelling a FlightInfo.
*/
public class CancelFlightInfoResult {
private final CancelStatus status;

public CancelFlightInfoResult(CancelStatus status) {
this.status = status;
}

CancelFlightInfoResult(Flight.CancelFlightInfoResult proto) {
switch (proto.getStatus()) {
case CANCEL_STATUS_UNSPECIFIED:
status = CancelStatus.UNSPECIFIED;
break;
case CANCEL_STATUS_CANCELLED:
status = CancelStatus.CANCELLED;
break;
case CANCEL_STATUS_CANCELLING:
status = CancelStatus.CANCELLING;
break;
case CANCEL_STATUS_NOT_CANCELLABLE:
status = CancelStatus.NOT_CANCELLABLE;
break;
default:
throw new IllegalArgumentException("");
}
}

public CancelStatus getStatus() {
return status;
}

Flight.CancelFlightInfoResult toProtocol() {
Flight.CancelFlightInfoResult.Builder b = Flight.CancelFlightInfoResult.newBuilder();
switch (status) {
case UNSPECIFIED:
b.setStatus(Flight.CancelStatus.CANCEL_STATUS_UNSPECIFIED);
break;
case CANCELLED:
b.setStatus(Flight.CancelStatus.CANCEL_STATUS_CANCELLED);
break;
case CANCELLING:
b.setStatus(Flight.CancelStatus.CANCEL_STATUS_CANCELLING);
break;
case NOT_CANCELLABLE:
b.setStatus(Flight.CancelStatus.CANCEL_STATUS_NOT_CANCELLABLE);
break;
default:
// Not possible
throw new AssertionError();
}
return b.build();
}

/**
* Get the serialized form of this protocol message.
*
* <p>Intended to help interoperability by allowing non-Flight services to still return Flight types.
*/
public ByteBuffer serialize() {
return ByteBuffer.wrap(toProtocol().toByteArray());
}

/**
* Parse the serialized form of this protocol message.
*
* <p>Intended to help interoperability by allowing Flight clients to obtain stream info from non-Flight services.
*
* @param serialized The serialized form of the message, as returned by {@link #serialize()}.
* @return The deserialized message.
* @throws IOException if the serialized form is invalid.
*/
public static CancelFlightInfoResult deserialize(ByteBuffer serialized) throws IOException {
return new CancelFlightInfoResult(Flight.CancelFlightInfoResult.parseFrom(serialized));
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
CancelFlightInfoResult that = (CancelFlightInfoResult) o;
return status == that.status;
}

@Override
public int hashCode() {
return Objects.hash(status);
}

@Override
public String toString() {
return "CancelFlightInfoResult{" +
"status=" + status +
'}';
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.arrow.flight;

/** The result of cancelling a FlightInfo. */
public enum CancelStatus {
/**
* The cancellation status is unknown. Servers should avoid using
* this value (send a NOT_FOUND error if the requested query is
* not known). Clients can retry the request.
*/
UNSPECIFIED,
/**
* The cancellation request is complete. Subsequent requests with
* the same payload may return CANCELLED or a NOT_FOUND error.
*/
CANCELLED,
/**
* The cancellation request is in progress. The client may retry
* the cancellation request.
*/
CANCELLING,
/**
* The query is not cancellable. The client should not retry the
* cancellation request.
*/
NOT_CANCELLABLE,
;
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,10 @@

package org.apache.arrow.flight;

import java.io.IOException;
import java.io.InputStream;
import java.net.URISyntaxException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
Expand Down Expand Up @@ -489,6 +491,79 @@ public void getResult() {
}
}

/**
* Cancel execution of a distributed query.
*
* @param info The query to cancel.
* @param options Call options.
* @return The server response.
*/
public CancelFlightInfoResult cancelFlightInfo(FlightInfo info, CallOption... options) {
Action action = new Action(FlightConstants.CANCEL_FLIGHT_INFO.getType(), info.serialize().array());
Iterator<Result> results = doAction(action, options);
if (!results.hasNext()) {
throw CallStatus.INTERNAL
.withDescription("Server did not return a response")
.toRuntimeException();
}

CancelFlightInfoResult result;
try {
result = CancelFlightInfoResult.deserialize(ByteBuffer.wrap(results.next().getBody()));
} catch (IOException e) {
throw CallStatus.INTERNAL
.withDescription("Failed to parse server response: " + e)
.withCause(e)
.toRuntimeException();
}
results.forEachRemaining((ignored) -> {
});
return result;
}

/**
* Request the server to free resources associated with a query.
*
* @param info The query to close.
* @param options Call options.
*/
public void closeFlightInfo(FlightInfo info, CallOption... options) {
Action action = new Action(FlightConstants.CLOSE_FLIGHT_INFO.getType(), info.serialize().array());
Iterator<Result> results = doAction(action, options);
results.forEachRemaining((ignored) -> {
});
}

/**
* Request the server to extend the lifetime of a query result set.
*
* @param endpoint The result set partition.
* @param options Call options.
* @return The new endpoint with an updated expiration time.
*/
public FlightEndpoint refreshFlightEndpoint(FlightEndpoint endpoint, CallOption... options) {
Action action = new Action(FlightConstants.REFRESH_FLIGHT_ENDPOINT.getType(), endpoint.serialize().array());
Iterator<Result> results = doAction(action, options);
if (!results.hasNext()) {
throw CallStatus.INTERNAL
.withDescription("Server did not return a response")
.toRuntimeException();
}

FlightEndpoint result;
try {
result = FlightEndpoint.deserialize(ByteBuffer.wrap(results.next().getBody()));
} catch (IOException | URISyntaxException e) {
throw CallStatus.INTERNAL
.withDescription("Failed to parse server response: " + e)
.withCause(e)
.toRuntimeException();
}
results.forEachRemaining((ignored) -> {
});
return result;
}

/**
* Interface for writers to an Arrow data stream.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,4 +26,18 @@ public interface FlightConstants {

FlightServerMiddleware.Key<ServerHeaderMiddleware> HEADER_KEY =
FlightServerMiddleware.Key.of("org.apache.arrow.flight.ServerHeaderMiddleware");

ActionType CANCEL_FLIGHT_INFO = new ActionType("CancelFlightInfo",
"Explicitly cancel a running FlightInfo.\n" +
"Request Message: FlightInfo to be canceled\n" +
"Response Message: ActionCancelFlightInfoResult");

ActionType CLOSE_FLIGHT_INFO = new ActionType("CloseFlightInfo",
"Close the given FlightInfo explicitly.\n" +
"Request Message: FlightInfo to be closed\n" +
"Response Message: N/A");
ActionType REFRESH_FLIGHT_ENDPOINT = new ActionType("RefreshFlightEndpoint",
"Extend expiration time of the given FlightEndpoint.\n" +
"Request Message: FlightEndpoint to be refreshed\n" +
"Response Message: Refreshed FlightEndpoint");
}
Loading