diff --git a/cpp/src/arrow/flight/flight_benchmark.cc b/cpp/src/arrow/flight/flight_benchmark.cc index 6649de52cd9..94693d663c7 100644 --- a/cpp/src/arrow/flight/flight_benchmark.cc +++ b/cpp/src/arrow/flight/flight_benchmark.cc @@ -294,16 +294,22 @@ Status DoSinglePerfRun(FlightClient* client, const FlightClientOptions client_op int64_t start_total_records = stats->total_records; auto test_loop = test_put ? &RunDoPutTest : &RunDoGetTest; - auto ConsumeStream = [&stats, &test_loop, &client_options, + auto ConsumeStream = [&client, &stats, &test_loop, &client_options, &call_options](const FlightEndpoint& endpoint) { - std::unique_ptr client; - RETURN_NOT_OK( - FlightClient::Connect(endpoint.locations.front(), client_options, &client)); + std::unique_ptr local_client; + FlightClient* data_client; + if (endpoint.locations.empty()) { + data_client = client; + } else { + RETURN_NOT_OK(FlightClient::Connect(endpoint.locations.front(), client_options, + &local_client)); + data_client = local_client.get(); + } perf::Token token; token.ParseFromString(endpoint.ticket.ticket); - const auto& result = test_loop(client.get(), call_options, token, endpoint, stats); + const auto& result = test_loop(data_client, call_options, token, endpoint, stats); if (result.ok()) { const PerformanceResult& perf = result.ValueOrDie(); stats->Update(perf.num_batches, perf.num_records, perf.num_bytes); diff --git a/cpp/src/arrow/flight/integration_tests/test_integration_client.cc b/cpp/src/arrow/flight/integration_tests/test_integration_client.cc index 366284389f1..dca7964de27 100644 --- a/cpp/src/arrow/flight/integration_tests/test_integration_client.cc +++ b/cpp/src/arrow/flight/integration_tests/test_integration_client.cc @@ -91,11 +91,8 @@ Status UploadBatchesToFlight(const std::vector>& ch /// \brief Retrieve the given Flight and compare to the original expected batches. Status ConsumeFlightLocation( - const Location& location, const Ticket& ticket, + FlightClient* read_client, const Ticket& ticket, const std::vector>& retrieved_data) { - std::unique_ptr read_client; - RETURN_NOT_OK(FlightClient::Connect(location, &read_client)); - std::unique_ptr stream; RETURN_NOT_OK(read_client->DoGet(ticket, &stream)); @@ -187,15 +184,17 @@ class IntegrationTestScenario : public Scenario { for (const FlightEndpoint& endpoint : info->endpoints()) { const auto& ticket = endpoint.ticket; - auto locations = endpoint.locations; - if (locations.size() == 0) { - return Status::IOError("No locations returned from Flight server."); - } - - for (const auto& location : locations) { - std::cout << "Verifying location " << location.ToString() << std::endl; - // 3. Stream data from the server, comparing individual batches. - ABORT_NOT_OK(ConsumeFlightLocation(location, ticket, original_data)); + // 3. Stream data from the server, comparing individual batches. + if (endpoint.locations.size() == 0) { + RETURN_NOT_OK(ConsumeFlightLocation(client.get(), ticket, original_data)); + } else { + for (const auto& location : endpoint.locations) { + std::cout << "Verifying location " << location.ToString() << std::endl; + std::unique_ptr read_client; + RETURN_NOT_OK(FlightClient::Connect(location, &read_client)); + RETURN_NOT_OK(ConsumeFlightLocation(read_client.get(), ticket, original_data)); + RETURN_NOT_OK(read_client->Close()); + } } } return Status::OK(); diff --git a/format/Flight.proto b/format/Flight.proto index 2dab84a40be..87e5fda796d 100644 --- a/format/Flight.proto +++ b/format/Flight.proto @@ -259,8 +259,15 @@ message FlightInfo { FlightDescriptor flight_descriptor = 2; /* - * A list of endpoints associated with the flight. To consume the whole - * flight, all endpoints must be consumed. + * A list of endpoints associated with the flight. To consume the + * whole flight, all endpoints (and hence all Tickets) must be + * consumed. Endpoints can be consumed in any order. + * + * In other words, an application can use multiple endpoints to + * represent partitioned data. + * + * There is no ordering defined on endpoints. Hence, if the returned + * data has an ordering, it should be returned in a single endpoint. */ repeated FlightEndpoint endpoint = 3; @@ -280,9 +287,20 @@ message FlightEndpoint { Ticket ticket = 1; /* - * A list of URIs where this ticket can be redeemed. If the list is - * empty, the expectation is that the ticket can only be redeemed on the - * current service where the ticket was generated. + * A list of URIs where this ticket can be redeemed via DoGet(). + * + * If the list is empty, the expectation is that the ticket can only + * be redeemed on the current service where the ticket was + * generated. + * + * If the list is not empty, the expectation is that the ticket can + * be redeemed at any of the locations, and that the data returned + * will be equivalent. In this case, the ticket may only be redeemed + * at one of the given locations, and not (necessarily) on the + * current service. + * + * In other words, an application can use multiple locations to + * represent redundant and/or load balanced services. */ repeated Location location = 2; } @@ -298,6 +316,9 @@ message Location { /* * An opaque identifier that the service can use to retrieve a particular * portion of a stream. + * + * Tickets are meant to be single use. It is an error/application-defined + * behavior to reuse a ticket. */ message Ticket { bytes ticket = 1; diff --git a/java/flight/flight-integration-tests/src/main/java/org/apache/arrow/flight/integration/tests/IntegrationTestClient.java b/java/flight/flight-integration-tests/src/main/java/org/apache/arrow/flight/integration/tests/IntegrationTestClient.java index 2a36747b618..2ea9874f3de 100644 --- a/java/flight/flight-integration-tests/src/main/java/org/apache/arrow/flight/integration/tests/IntegrationTestClient.java +++ b/java/flight/flight-integration-tests/src/main/java/org/apache/arrow/flight/integration/tests/IntegrationTestClient.java @@ -32,6 +32,7 @@ import org.apache.arrow.flight.FlightStream; import org.apache.arrow.flight.Location; import org.apache.arrow.flight.PutResult; +import org.apache.arrow.flight.Ticket; import org.apache.arrow.memory.ArrowBuf; import org.apache.arrow.memory.BufferAllocator; import org.apache.arrow.memory.RootAllocator; @@ -152,46 +153,55 @@ public void onNext(PutResult val) { // 3. Download the data from the server. List locations = endpoint.getLocations(); if (locations.isEmpty()) { - throw new RuntimeException("No locations returned from Flight server."); + // No locations provided, validate the server itself. + testTicket(allocator, client, endpoint.getTicket(), inputPath); + } else { + // All locations should be equivalent, validate each one. + for (Location location : locations) { + try (FlightClient readClient = FlightClient.builder(allocator, location).build()) { + testTicket(allocator, readClient, endpoint.getTicket(), inputPath); + } catch (Exception e) { + throw new RuntimeException(e); + } + } } - for (Location location : locations) { - System.out.println("Verifying location " + location.getUri()); - try (FlightClient readClient = FlightClient.builder(allocator, location).build(); - FlightStream stream = readClient.getStream(endpoint.getTicket()); - VectorSchemaRoot root = stream.getRoot(); - VectorSchemaRoot downloadedRoot = VectorSchemaRoot.create(root.getSchema(), allocator); - JsonFileReader reader = new JsonFileReader(new File(inputPath), allocator)) { - VectorLoader loader = new VectorLoader(downloadedRoot); - VectorUnloader unloader = new VectorUnloader(root); - - Schema jsonSchema = reader.start(); - Validator.compareSchemas(root.getSchema(), jsonSchema); - try (VectorSchemaRoot jsonRoot = VectorSchemaRoot.create(jsonSchema, allocator)) { - - while (stream.next()) { - try (final ArrowRecordBatch arb = unloader.getRecordBatch()) { - loader.load(arb); - if (reader.read(jsonRoot)) { - - // 4. Validate the data. - Validator.compareVectorSchemaRoot(jsonRoot, downloadedRoot); - jsonRoot.clear(); - } else { - throw new RuntimeException("Flight stream has more batches than JSON"); - } - } - } + } + } - // Verify no more batches with data in JSON - // NOTE: Currently the C++ Flight server skips empty batches at end of the stream - if (reader.read(jsonRoot) && jsonRoot.getRowCount() > 0) { - throw new RuntimeException("JSON has more batches with than Flight stream"); + private static void testTicket(BufferAllocator allocator, FlightClient readClient, Ticket ticket, String inputPath) { + try (FlightStream stream = readClient.getStream(ticket); + VectorSchemaRoot root = stream.getRoot(); + VectorSchemaRoot downloadedRoot = VectorSchemaRoot.create(root.getSchema(), allocator); + JsonFileReader reader = new JsonFileReader(new File(inputPath), allocator)) { + VectorLoader loader = new VectorLoader(downloadedRoot); + VectorUnloader unloader = new VectorUnloader(root); + + Schema jsonSchema = reader.start(); + Validator.compareSchemas(root.getSchema(), jsonSchema); + try (VectorSchemaRoot jsonRoot = VectorSchemaRoot.create(jsonSchema, allocator)) { + + while (stream.next()) { + try (final ArrowRecordBatch arb = unloader.getRecordBatch()) { + loader.load(arb); + if (reader.read(jsonRoot)) { + + // 4. Validate the data. + Validator.compareVectorSchemaRoot(jsonRoot, downloadedRoot); + jsonRoot.clear(); + } else { + throw new RuntimeException("Flight stream has more batches than JSON"); } } - } catch (Exception e) { - throw new RuntimeException(e); + } + + // Verify no more batches with data in JSON + // NOTE: Currently the C++ Flight server skips empty batches at end of the stream + if (reader.read(jsonRoot) && jsonRoot.getRowCount() > 0) { + throw new RuntimeException("JSON has more batches with than Flight stream"); } } + } catch (Exception e) { + throw new RuntimeException(e); } } }