Skip to content
Closed
16 changes: 11 additions & 5 deletions cpp/src/arrow/flight/flight_benchmark.cc
Original file line number Diff line number Diff line change
Expand Up @@ -294,16 +294,22 @@ Status DoSinglePerfRun(FlightClient* client, const FlightClientOptions client_op
int64_t start_total_records = stats->total_records;

auto test_loop = test_put ? &RunDoPutTest : &RunDoGetTest;
auto ConsumeStream = [&stats, &test_loop, &client_options,
auto ConsumeStream = [&client, &stats, &test_loop, &client_options,
&call_options](const FlightEndpoint& endpoint) {
std::unique_ptr<FlightClient> client;
RETURN_NOT_OK(
FlightClient::Connect(endpoint.locations.front(), client_options, &client));
std::unique_ptr<FlightClient> local_client;
FlightClient* data_client;
if (endpoint.locations.empty()) {
data_client = client;
} else {
RETURN_NOT_OK(FlightClient::Connect(endpoint.locations.front(), client_options,
&local_client));
data_client = local_client.get();
}

perf::Token token;
token.ParseFromString(endpoint.ticket.ticket);

const auto& result = test_loop(client.get(), call_options, token, endpoint, stats);
const auto& result = test_loop(data_client, call_options, token, endpoint, stats);
if (result.ok()) {
const PerformanceResult& perf = result.ValueOrDie();
stats->Update(perf.num_batches, perf.num_records, perf.num_bytes);
Expand Down
25 changes: 12 additions & 13 deletions cpp/src/arrow/flight/integration_tests/test_integration_client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -91,11 +91,8 @@ Status UploadBatchesToFlight(const std::vector<std::shared_ptr<RecordBatch>>& ch

/// \brief Retrieve the given Flight and compare to the original expected batches.
Status ConsumeFlightLocation(
const Location& location, const Ticket& ticket,
FlightClient* read_client, const Ticket& ticket,
const std::vector<std::shared_ptr<RecordBatch>>& retrieved_data) {
std::unique_ptr<FlightClient> read_client;
RETURN_NOT_OK(FlightClient::Connect(location, &read_client));

std::unique_ptr<FlightStreamReader> stream;
RETURN_NOT_OK(read_client->DoGet(ticket, &stream));

Expand Down Expand Up @@ -187,15 +184,17 @@ class IntegrationTestScenario : public Scenario {
for (const FlightEndpoint& endpoint : info->endpoints()) {
const auto& ticket = endpoint.ticket;

auto locations = endpoint.locations;
if (locations.size() == 0) {
return Status::IOError("No locations returned from Flight server.");
}

for (const auto& location : locations) {
std::cout << "Verifying location " << location.ToString() << std::endl;
// 3. Stream data from the server, comparing individual batches.
ABORT_NOT_OK(ConsumeFlightLocation(location, ticket, original_data));
// 3. Stream data from the server, comparing individual batches.
if (endpoint.locations.size() == 0) {
RETURN_NOT_OK(ConsumeFlightLocation(client.get(), ticket, original_data));
} else {
for (const auto& location : endpoint.locations) {
std::cout << "Verifying location " << location.ToString() << std::endl;
std::unique_ptr<FlightClient> read_client;
RETURN_NOT_OK(FlightClient::Connect(location, &read_client));
RETURN_NOT_OK(ConsumeFlightLocation(read_client.get(), ticket, original_data));
RETURN_NOT_OK(read_client->Close());
}
}
}
return Status::OK();
Expand Down
31 changes: 26 additions & 5 deletions format/Flight.proto
Original file line number Diff line number Diff line change
Expand Up @@ -259,8 +259,15 @@ message FlightInfo {
FlightDescriptor flight_descriptor = 2;

/*
* A list of endpoints associated with the flight. To consume the whole
* flight, all endpoints must be consumed.
* A list of endpoints associated with the flight. To consume the
* whole flight, all endpoints (and hence all Tickets) must be
* consumed. Endpoints can be consumed in any order.
*
* In other words, an application can use multiple endpoints to
* represent partitioned data.
*
* There is no ordering defined on endpoints. Hence, if the returned
* data has an ordering, it should be returned in a single endpoint.
*/
repeated FlightEndpoint endpoint = 3;

Expand All @@ -280,9 +287,20 @@ message FlightEndpoint {
Ticket ticket = 1;

/*
* A list of URIs where this ticket can be redeemed. If the list is
* empty, the expectation is that the ticket can only be redeemed on the
* current service where the ticket was generated.
* A list of URIs where this ticket can be redeemed via DoGet().
*
* If the list is empty, the expectation is that the ticket can only
* be redeemed on the current service where the ticket was
* generated.
*
* If the list is not empty, the expectation is that the ticket can
* be redeemed at any of the locations, and that the data returned
* will be equivalent. In this case, the ticket may only be redeemed
* at one of the given locations, and not (necessarily) on the
* current service.
*
* In other words, an application can use multiple locations to
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this clarification is very helpful to me

* represent redundant and/or load balanced services.
*/
repeated Location location = 2;
}
Expand All @@ -298,6 +316,9 @@ message Location {
/*
* An opaque identifier that the service can use to retrieve a particular
* portion of a stream.
*
* Tickets are meant to be single use. It is an error/application-defined
* behavior to reuse a ticket.
*/
message Ticket {
bytes ticket = 1;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.arrow.flight.FlightStream;
import org.apache.arrow.flight.Location;
import org.apache.arrow.flight.PutResult;
import org.apache.arrow.flight.Ticket;
import org.apache.arrow.memory.ArrowBuf;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.RootAllocator;
Expand Down Expand Up @@ -152,46 +153,55 @@ public void onNext(PutResult val) {
// 3. Download the data from the server.
List<Location> locations = endpoint.getLocations();
if (locations.isEmpty()) {
throw new RuntimeException("No locations returned from Flight server.");
// No locations provided, validate the server itself.
testTicket(allocator, client, endpoint.getTicket(), inputPath);
} else {
// All locations should be equivalent, validate each one.
for (Location location : locations) {
try (FlightClient readClient = FlightClient.builder(allocator, location).build()) {
testTicket(allocator, readClient, endpoint.getTicket(), inputPath);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}
for (Location location : locations) {
System.out.println("Verifying location " + location.getUri());
try (FlightClient readClient = FlightClient.builder(allocator, location).build();
FlightStream stream = readClient.getStream(endpoint.getTicket());
VectorSchemaRoot root = stream.getRoot();
VectorSchemaRoot downloadedRoot = VectorSchemaRoot.create(root.getSchema(), allocator);
JsonFileReader reader = new JsonFileReader(new File(inputPath), allocator)) {
VectorLoader loader = new VectorLoader(downloadedRoot);
VectorUnloader unloader = new VectorUnloader(root);

Schema jsonSchema = reader.start();
Validator.compareSchemas(root.getSchema(), jsonSchema);
try (VectorSchemaRoot jsonRoot = VectorSchemaRoot.create(jsonSchema, allocator)) {

while (stream.next()) {
try (final ArrowRecordBatch arb = unloader.getRecordBatch()) {
loader.load(arb);
if (reader.read(jsonRoot)) {

// 4. Validate the data.
Validator.compareVectorSchemaRoot(jsonRoot, downloadedRoot);
jsonRoot.clear();
} else {
throw new RuntimeException("Flight stream has more batches than JSON");
}
}
}
}
}

// Verify no more batches with data in JSON
// NOTE: Currently the C++ Flight server skips empty batches at end of the stream
if (reader.read(jsonRoot) && jsonRoot.getRowCount() > 0) {
throw new RuntimeException("JSON has more batches with than Flight stream");
private static void testTicket(BufferAllocator allocator, FlightClient readClient, Ticket ticket, String inputPath) {
try (FlightStream stream = readClient.getStream(ticket);
VectorSchemaRoot root = stream.getRoot();
VectorSchemaRoot downloadedRoot = VectorSchemaRoot.create(root.getSchema(), allocator);
JsonFileReader reader = new JsonFileReader(new File(inputPath), allocator)) {
VectorLoader loader = new VectorLoader(downloadedRoot);
VectorUnloader unloader = new VectorUnloader(root);

Schema jsonSchema = reader.start();
Validator.compareSchemas(root.getSchema(), jsonSchema);
try (VectorSchemaRoot jsonRoot = VectorSchemaRoot.create(jsonSchema, allocator)) {

while (stream.next()) {
try (final ArrowRecordBatch arb = unloader.getRecordBatch()) {
loader.load(arb);
if (reader.read(jsonRoot)) {

// 4. Validate the data.
Validator.compareVectorSchemaRoot(jsonRoot, downloadedRoot);
jsonRoot.clear();
} else {
throw new RuntimeException("Flight stream has more batches than JSON");
}
}
} catch (Exception e) {
throw new RuntimeException(e);
}

// Verify no more batches with data in JSON
// NOTE: Currently the C++ Flight server skips empty batches at end of the stream
if (reader.read(jsonRoot) && jsonRoot.getRowCount() > 0) {
throw new RuntimeException("JSON has more batches with than Flight stream");
}
}
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}