From ac0648e9d37d76d3bef17ce5125a6d690629d21a Mon Sep 17 00:00:00 2001 From: David Li Date: Tue, 15 Mar 2022 11:09:15 -0400 Subject: [PATCH 1/8] ARROW-15921: [Format][FlightRPC] Clarify FlightEndpoint.locations interpretation --- format/Flight.proto | 18 +++++++++++++----- 1 file changed, 13 insertions(+), 5 deletions(-) diff --git a/format/Flight.proto b/format/Flight.proto index 2dab84a40be..e001da32053 100644 --- a/format/Flight.proto +++ b/format/Flight.proto @@ -259,8 +259,9 @@ message FlightInfo { FlightDescriptor flight_descriptor = 2; /* - * A list of endpoints associated with the flight. To consume the whole - * flight, all endpoints must be consumed. + * A list of endpoints associated with the flight. To consume the + * whole flight, all endpoints must be consumed. In other words, + * multiple endpoints provide partitioning. */ repeated FlightEndpoint endpoint = 3; @@ -280,9 +281,16 @@ message FlightEndpoint { Ticket ticket = 1; /* - * A list of URIs where this ticket can be redeemed. If the list is - * empty, the expectation is that the ticket can only be redeemed on the - * current service where the ticket was generated. + * A list of URIs where this ticket can be redeemed. + * + * If the list is empty, the expectation is that the ticket can only + * be redeemed on the current service where the ticket was + * generated. + * + * If the list is not empty, the expectation is that the ticket can + * be redeemed at any of the locations, and that the data returned + * will be equivalent. In other words, multiple locations provide + * redundancy/load balancing. */ repeated Location location = 2; } From 6ade3530264f23cf1ea7f31226eacfe1ce5784ed Mon Sep 17 00:00:00 2001 From: David Li Date: Tue, 15 Mar 2022 11:15:32 -0400 Subject: [PATCH 2/8] ARROW-15921: [C++][FlightRPC] Properly interpret FlightEndpoint.locations --- cpp/src/arrow/flight/flight_benchmark.cc | 16 ++++++++---- .../test_integration_client.cc | 25 +++++++++---------- 2 files changed, 23 insertions(+), 18 deletions(-) diff --git a/cpp/src/arrow/flight/flight_benchmark.cc b/cpp/src/arrow/flight/flight_benchmark.cc index 6649de52cd9..94693d663c7 100644 --- a/cpp/src/arrow/flight/flight_benchmark.cc +++ b/cpp/src/arrow/flight/flight_benchmark.cc @@ -294,16 +294,22 @@ Status DoSinglePerfRun(FlightClient* client, const FlightClientOptions client_op int64_t start_total_records = stats->total_records; auto test_loop = test_put ? &RunDoPutTest : &RunDoGetTest; - auto ConsumeStream = [&stats, &test_loop, &client_options, + auto ConsumeStream = [&client, &stats, &test_loop, &client_options, &call_options](const FlightEndpoint& endpoint) { - std::unique_ptr client; - RETURN_NOT_OK( - FlightClient::Connect(endpoint.locations.front(), client_options, &client)); + std::unique_ptr local_client; + FlightClient* data_client; + if (endpoint.locations.empty()) { + data_client = client; + } else { + RETURN_NOT_OK(FlightClient::Connect(endpoint.locations.front(), client_options, + &local_client)); + data_client = local_client.get(); + } perf::Token token; token.ParseFromString(endpoint.ticket.ticket); - const auto& result = test_loop(client.get(), call_options, token, endpoint, stats); + const auto& result = test_loop(data_client, call_options, token, endpoint, stats); if (result.ok()) { const PerformanceResult& perf = result.ValueOrDie(); stats->Update(perf.num_batches, perf.num_records, perf.num_bytes); diff --git a/cpp/src/arrow/flight/integration_tests/test_integration_client.cc b/cpp/src/arrow/flight/integration_tests/test_integration_client.cc index 366284389f1..dca7964de27 100644 --- a/cpp/src/arrow/flight/integration_tests/test_integration_client.cc +++ b/cpp/src/arrow/flight/integration_tests/test_integration_client.cc @@ -91,11 +91,8 @@ Status UploadBatchesToFlight(const std::vector>& ch /// \brief Retrieve the given Flight and compare to the original expected batches. Status ConsumeFlightLocation( - const Location& location, const Ticket& ticket, + FlightClient* read_client, const Ticket& ticket, const std::vector>& retrieved_data) { - std::unique_ptr read_client; - RETURN_NOT_OK(FlightClient::Connect(location, &read_client)); - std::unique_ptr stream; RETURN_NOT_OK(read_client->DoGet(ticket, &stream)); @@ -187,15 +184,17 @@ class IntegrationTestScenario : public Scenario { for (const FlightEndpoint& endpoint : info->endpoints()) { const auto& ticket = endpoint.ticket; - auto locations = endpoint.locations; - if (locations.size() == 0) { - return Status::IOError("No locations returned from Flight server."); - } - - for (const auto& location : locations) { - std::cout << "Verifying location " << location.ToString() << std::endl; - // 3. Stream data from the server, comparing individual batches. - ABORT_NOT_OK(ConsumeFlightLocation(location, ticket, original_data)); + // 3. Stream data from the server, comparing individual batches. + if (endpoint.locations.size() == 0) { + RETURN_NOT_OK(ConsumeFlightLocation(client.get(), ticket, original_data)); + } else { + for (const auto& location : endpoint.locations) { + std::cout << "Verifying location " << location.ToString() << std::endl; + std::unique_ptr read_client; + RETURN_NOT_OK(FlightClient::Connect(location, &read_client)); + RETURN_NOT_OK(ConsumeFlightLocation(read_client.get(), ticket, original_data)); + RETURN_NOT_OK(read_client->Close()); + } } } return Status::OK(); From ea7dbb8ad54894520974902e9e19b8d573da2588 Mon Sep 17 00:00:00 2001 From: David Li Date: Tue, 15 Mar 2022 11:35:09 -0400 Subject: [PATCH 3/8] ARROW-15921: [Java][FlightRPC] Properly interpret FlightEndpoint.locations --- .../tests/IntegrationTestClient.java | 80 +++++++++++-------- 1 file changed, 46 insertions(+), 34 deletions(-) diff --git a/java/flight/flight-integration-tests/src/main/java/org/apache/arrow/flight/integration/tests/IntegrationTestClient.java b/java/flight/flight-integration-tests/src/main/java/org/apache/arrow/flight/integration/tests/IntegrationTestClient.java index 2a36747b618..893d0b87acd 100644 --- a/java/flight/flight-integration-tests/src/main/java/org/apache/arrow/flight/integration/tests/IntegrationTestClient.java +++ b/java/flight/flight-integration-tests/src/main/java/org/apache/arrow/flight/integration/tests/IntegrationTestClient.java @@ -32,6 +32,7 @@ import org.apache.arrow.flight.FlightStream; import org.apache.arrow.flight.Location; import org.apache.arrow.flight.PutResult; +import org.apache.arrow.flight.Ticket; import org.apache.arrow.memory.ArrowBuf; import org.apache.arrow.memory.BufferAllocator; import org.apache.arrow.memory.RootAllocator; @@ -152,46 +153,57 @@ public void onNext(PutResult val) { // 3. Download the data from the server. List locations = endpoint.getLocations(); if (locations.isEmpty()) { - throw new RuntimeException("No locations returned from Flight server."); + // No locations provided, validate the server itself. + System.out.println("Verifying location " + server.getUri()); + testTicket(allocator, client, endpoint.getTicket(), inputPath); + } else { + // All locations should be equivalent, validate each one. + for (Location location : locations) { + System.out.println("Verifying location " + location.getUri()); + try (FlightClient readClient = FlightClient.builder(allocator, location).build()) { + testTicket(allocator, readClient, endpoint.getTicket(), inputPath); + } catch (Exception e) { + throw new RuntimeException(e); + } + } } - for (Location location : locations) { - System.out.println("Verifying location " + location.getUri()); - try (FlightClient readClient = FlightClient.builder(allocator, location).build(); - FlightStream stream = readClient.getStream(endpoint.getTicket()); - VectorSchemaRoot root = stream.getRoot(); - VectorSchemaRoot downloadedRoot = VectorSchemaRoot.create(root.getSchema(), allocator); - JsonFileReader reader = new JsonFileReader(new File(inputPath), allocator)) { - VectorLoader loader = new VectorLoader(downloadedRoot); - VectorUnloader unloader = new VectorUnloader(root); - - Schema jsonSchema = reader.start(); - Validator.compareSchemas(root.getSchema(), jsonSchema); - try (VectorSchemaRoot jsonRoot = VectorSchemaRoot.create(jsonSchema, allocator)) { - - while (stream.next()) { - try (final ArrowRecordBatch arb = unloader.getRecordBatch()) { - loader.load(arb); - if (reader.read(jsonRoot)) { - - // 4. Validate the data. - Validator.compareVectorSchemaRoot(jsonRoot, downloadedRoot); - jsonRoot.clear(); - } else { - throw new RuntimeException("Flight stream has more batches than JSON"); - } - } - } + } + } - // Verify no more batches with data in JSON - // NOTE: Currently the C++ Flight server skips empty batches at end of the stream - if (reader.read(jsonRoot) && jsonRoot.getRowCount() > 0) { - throw new RuntimeException("JSON has more batches with than Flight stream"); + private static void testTicket(BufferAllocator allocator, FlightClient readClient, Ticket ticket, String inputPath) { + try (FlightStream stream = readClient.getStream(ticket); + VectorSchemaRoot root = stream.getRoot(); + VectorSchemaRoot downloadedRoot = VectorSchemaRoot.create(root.getSchema(), allocator); + JsonFileReader reader = new JsonFileReader(new File(inputPath), allocator)) { + VectorLoader loader = new VectorLoader(downloadedRoot); + VectorUnloader unloader = new VectorUnloader(root); + + Schema jsonSchema = reader.start(); + Validator.compareSchemas(root.getSchema(), jsonSchema); + try (VectorSchemaRoot jsonRoot = VectorSchemaRoot.create(jsonSchema, allocator)) { + + while (stream.next()) { + try (final ArrowRecordBatch arb = unloader.getRecordBatch()) { + loader.load(arb); + if (reader.read(jsonRoot)) { + + // 4. Validate the data. + Validator.compareVectorSchemaRoot(jsonRoot, downloadedRoot); + jsonRoot.clear(); + } else { + throw new RuntimeException("Flight stream has more batches than JSON"); } } - } catch (Exception e) { - throw new RuntimeException(e); + } + + // Verify no more batches with data in JSON + // NOTE: Currently the C++ Flight server skips empty batches at end of the stream + if (reader.read(jsonRoot) && jsonRoot.getRowCount() > 0) { + throw new RuntimeException("JSON has more batches with than Flight stream"); } } + } catch (Exception e) { + throw new RuntimeException(e); } } } From 7d213d9d1801ac5a613e4a1d2759242d6666993c Mon Sep 17 00:00:00 2001 From: David Li Date: Wed, 16 Mar 2022 08:21:08 -0400 Subject: [PATCH 4/8] ARROW-15921: [Java][FlightRPC] Remove println statements --- .../arrow/flight/integration/tests/IntegrationTestClient.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/java/flight/flight-integration-tests/src/main/java/org/apache/arrow/flight/integration/tests/IntegrationTestClient.java b/java/flight/flight-integration-tests/src/main/java/org/apache/arrow/flight/integration/tests/IntegrationTestClient.java index 893d0b87acd..2ea9874f3de 100644 --- a/java/flight/flight-integration-tests/src/main/java/org/apache/arrow/flight/integration/tests/IntegrationTestClient.java +++ b/java/flight/flight-integration-tests/src/main/java/org/apache/arrow/flight/integration/tests/IntegrationTestClient.java @@ -154,12 +154,10 @@ public void onNext(PutResult val) { List locations = endpoint.getLocations(); if (locations.isEmpty()) { // No locations provided, validate the server itself. - System.out.println("Verifying location " + server.getUri()); testTicket(allocator, client, endpoint.getTicket(), inputPath); } else { // All locations should be equivalent, validate each one. for (Location location : locations) { - System.out.println("Verifying location " + location.getUri()); try (FlightClient readClient = FlightClient.builder(allocator, location).build()) { testTicket(allocator, readClient, endpoint.getTicket(), inputPath); } catch (Exception e) { From d6d7044f7580808777d2ead0b5253741c38c3f23 Mon Sep 17 00:00:00 2001 From: David Li Date: Thu, 17 Mar 2022 15:42:28 -0400 Subject: [PATCH 5/8] ARROW-15921: [Format][FlightRPC] Clarify Ticket usage --- format/Flight.proto | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) diff --git a/format/Flight.proto b/format/Flight.proto index e001da32053..ab73c046238 100644 --- a/format/Flight.proto +++ b/format/Flight.proto @@ -260,8 +260,9 @@ message FlightInfo { /* * A list of endpoints associated with the flight. To consume the - * whole flight, all endpoints must be consumed. In other words, - * multiple endpoints provide partitioning. + * whole flight, all endpoints (and hence all Tickets) must be consumed. + * + * In other words, multiple endpoints provide partitioning. */ repeated FlightEndpoint endpoint = 3; @@ -289,8 +290,11 @@ message FlightEndpoint { * * If the list is not empty, the expectation is that the ticket can * be redeemed at any of the locations, and that the data returned - * will be equivalent. In other words, multiple locations provide - * redundancy/load balancing. + * will be equivalent. In this case, the ticket may only be redeemed + * at one of the given locations, and not (necessarily) on the + * current service. + * + * In other words, multiple locations provide redundancy/load balancing. */ repeated Location location = 2; } @@ -306,6 +310,9 @@ message Location { /* * An opaque identifier that the service can use to retrieve a particular * portion of a stream. + * + * Tickets are meant to be single use. It is an error/undefined behavior + * to reuse a ticket. */ message Ticket { bytes ticket = 1; From 8e0b2c682fd3e988f6f92e30c6c87a2c02b66401 Mon Sep 17 00:00:00 2001 From: David Li Date: Thu, 17 Mar 2022 18:14:18 -0400 Subject: [PATCH 6/8] ARROW-15921: [Format][FlightRPC] Clarify endpoints/locations --- format/Flight.proto | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/format/Flight.proto b/format/Flight.proto index ab73c046238..17d464197e2 100644 --- a/format/Flight.proto +++ b/format/Flight.proto @@ -260,9 +260,11 @@ message FlightInfo { /* * A list of endpoints associated with the flight. To consume the - * whole flight, all endpoints (and hence all Tickets) must be consumed. + * whole flight, all endpoints (and hence all Tickets) must be + * consumed. Endpoints can be consumed in any order. * - * In other words, multiple endpoints provide partitioning. + * In other words, an application can use multiple endpoints to + * represent partitioned data. */ repeated FlightEndpoint endpoint = 3; @@ -294,7 +296,8 @@ message FlightEndpoint { * at one of the given locations, and not (necessarily) on the * current service. * - * In other words, multiple locations provide redundancy/load balancing. + * In other words, an application can use multiple locations to + * represent redundant and/or load balanced services. */ repeated Location location = 2; } From 11bdb7b1354cd5658082576be494367105d2e590 Mon Sep 17 00:00:00 2001 From: David Li Date: Fri, 18 Mar 2022 12:49:31 -0400 Subject: [PATCH 7/8] ARROW-15921: [Format][FlightRPC] Clarify endpoint ordering --- format/Flight.proto | 3 +++ 1 file changed, 3 insertions(+) diff --git a/format/Flight.proto b/format/Flight.proto index 17d464197e2..f45e15d936e 100644 --- a/format/Flight.proto +++ b/format/Flight.proto @@ -265,6 +265,9 @@ message FlightInfo { * * In other words, an application can use multiple endpoints to * represent partitioned data. + * + * There is no ordering defined on endpoints. Hence, if the returned + * data has an ordering, it should be returned in a single endpoint. */ repeated FlightEndpoint endpoint = 3; From ca0a6ae45ca0313d1d1ea67e182fe4b9ce849191 Mon Sep 17 00:00:00 2001 From: David Li Date: Mon, 21 Mar 2022 10:47:04 -0400 Subject: [PATCH 8/8] ARROW-15921: [Format][FlightRPC] Clarify redemption/single-use --- format/Flight.proto | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/format/Flight.proto b/format/Flight.proto index f45e15d936e..87e5fda796d 100644 --- a/format/Flight.proto +++ b/format/Flight.proto @@ -287,7 +287,7 @@ message FlightEndpoint { Ticket ticket = 1; /* - * A list of URIs where this ticket can be redeemed. + * A list of URIs where this ticket can be redeemed via DoGet(). * * If the list is empty, the expectation is that the ticket can only * be redeemed on the current service where the ticket was @@ -317,8 +317,8 @@ message Location { * An opaque identifier that the service can use to retrieve a particular * portion of a stream. * - * Tickets are meant to be single use. It is an error/undefined behavior - * to reuse a ticket. + * Tickets are meant to be single use. It is an error/application-defined + * behavior to reuse a ticket. */ message Ticket { bytes ticket = 1;