diff --git a/cpp/src/arrow/flight/flight_internals_test.cc b/cpp/src/arrow/flight/flight_internals_test.cc index 3fef31b0ea4..187f1207bfe 100644 --- a/cpp/src/arrow/flight/flight_internals_test.cc +++ b/cpp/src/arrow/flight/flight_internals_test.cc @@ -213,26 +213,27 @@ TEST(FlightTypes, FlightInfo) { auto endpoint1 = FlightEndpoint{Ticket{"foo"}, {}}; auto endpoint2 = FlightEndpoint{Ticket{"foo"}, {location}}; std::vector values = { - MakeFlightInfo(schema1, desc1, {}, -1, -1), - MakeFlightInfo(schema1, desc2, {}, -1, -1), - MakeFlightInfo(schema2, desc1, {}, -1, -1), - MakeFlightInfo(schema1, desc1, {endpoint1}, -1, 42), - MakeFlightInfo(schema1, desc2, {endpoint1, endpoint2}, 64, -1), + MakeFlightInfo(schema1, desc1, {}, -1, -1, false), + MakeFlightInfo(schema1, desc2, {}, -1, -1, true), + MakeFlightInfo(schema2, desc1, {}, -1, -1, false), + MakeFlightInfo(schema1, desc1, {endpoint1}, -1, 42, true), + MakeFlightInfo(schema1, desc2, {endpoint1, endpoint2}, 64, -1, false), }; std::vector reprs = { " " - "endpoints=[] total_records=-1 total_bytes=-1>", + "endpoints=[] total_records=-1 total_bytes=-1 ordered=false>", " " - "endpoints=[] total_records=-1 total_bytes=-1>", + "endpoints=[] total_records=-1 total_bytes=-1 ordered=true>", " " - "endpoints=[] total_records=-1 total_bytes=-1>", + "endpoints=[] total_records=-1 total_bytes=-1 ordered=false>", " " "endpoints=[ locations=[]>] " - "total_records=-1 total_bytes=42>", + "total_records=-1 total_bytes=42 ordered=true>", " " "endpoints=[ locations=[]>, " " locations=" - "[grpc+tcp://localhost:1234]>] total_records=64 total_bytes=-1>", + "[grpc+tcp://localhost:1234]>] total_records=64 total_bytes=-1 " + "ordered=false>", }; ASSERT_NO_FATAL_FAILURE(TestRoundtrip(values, reprs)); diff --git a/cpp/src/arrow/flight/integration_tests/flight_integration_test.cc b/cpp/src/arrow/flight/integration_tests/flight_integration_test.cc index e29a281f327..2e5fefb853b 100644 --- a/cpp/src/arrow/flight/integration_tests/flight_integration_test.cc +++ b/cpp/src/arrow/flight/integration_tests/flight_integration_test.cc @@ -53,6 +53,8 @@ TEST(FlightIntegration, AuthBasicProto) { ASSERT_OK(RunScenario("auth:basic_prot TEST(FlightIntegration, Middleware) { ASSERT_OK(RunScenario("middleware")); } +TEST(FlightIntegration, Ordered) { ASSERT_OK(RunScenario("ordered")); } + TEST(FlightIntegration, FlightSql) { ASSERT_OK(RunScenario("flight_sql")); } TEST(FlightIntegration, FlightSqlExtension) { diff --git a/cpp/src/arrow/flight/integration_tests/test_integration.cc b/cpp/src/arrow/flight/integration_tests/test_integration.cc index f6af1429785..f4628842edc 100644 --- a/cpp/src/arrow/flight/integration_tests/test_integration.cc +++ b/cpp/src/arrow/flight/integration_tests/test_integration.cc @@ -27,6 +27,7 @@ #include "arrow/array/array_binary.h" #include "arrow/array/array_nested.h" #include "arrow/array/array_primitive.h" +#include "arrow/array/builder_primitive.h" #include "arrow/flight/client_middleware.h" #include "arrow/flight/server_middleware.h" #include "arrow/flight/sql/client.h" @@ -37,6 +38,8 @@ #include "arrow/flight/types.h" #include "arrow/ipc/dictionary.h" #include "arrow/status.h" +#include "arrow/table.h" +#include "arrow/table_builder.h" #include "arrow/testing/gtest_util.h" #include "arrow/util/checked_cast.h" @@ -210,8 +213,8 @@ class MiddlewareServer : public FlightServerBase { // Return a fake location - the test doesn't read it ARROW_ASSIGN_OR_RAISE(auto location, Location::ForGrpcTcp("localhost", 10010)); std::vector endpoints{FlightEndpoint{{"foo"}, {location}}}; - ARROW_ASSIGN_OR_RAISE(auto info, - FlightInfo::Make(*schema, descriptor, endpoints, -1, -1)); + ARROW_ASSIGN_OR_RAISE( + auto info, FlightInfo::Make(*schema, descriptor, endpoints, -1, -1, false)); *result = std::make_unique(info); return Status::OK(); } @@ -271,6 +274,142 @@ class MiddlewareScenario : public Scenario { std::shared_ptr client_middleware_; }; +/// \brief The server used for testing FlightInfo.ordered. +/// +/// If the given command is "ordered", the server sets +/// FlightInfo.ordered. The client that supports FlightInfo.ordered +/// must read data from endpoints from front to back. The client that +/// doesn't support FlightInfo.ordered may read data from endpoints in +/// random order. +/// +/// This scenario is passed only when the client supports +/// FlightInfo.ordered. +class OrderedServer : public FlightServerBase { + Status GetFlightInfo(const ServerCallContext& context, + const FlightDescriptor& descriptor, + std::unique_ptr* result) override { + const auto ordered = (descriptor.type == FlightDescriptor::DescriptorType::CMD && + descriptor.cmd == "ordered"); + auto schema = BuildSchema(); + std::vector endpoints; + if (ordered) { + endpoints.push_back(FlightEndpoint{{"1"}, {}}); + endpoints.push_back(FlightEndpoint{{"2"}, {}}); + endpoints.push_back(FlightEndpoint{{"3"}, {}}); + } else { + endpoints.push_back(FlightEndpoint{{"1"}, {}}); + endpoints.push_back(FlightEndpoint{{"3"}, {}}); + endpoints.push_back(FlightEndpoint{{"2"}, {}}); + } + ARROW_ASSIGN_OR_RAISE( + auto info, FlightInfo::Make(*schema, descriptor, endpoints, -1, -1, ordered)); + *result = std::make_unique(info); + return Status::OK(); + } + + Status DoGet(const ServerCallContext& context, const Ticket& request, + std::unique_ptr* stream) override { + ARROW_ASSIGN_OR_RAISE(auto builder, RecordBatchBuilder::Make( + BuildSchema(), arrow::default_memory_pool())); + auto number_builder = builder->GetFieldAs(0); + if (request.ticket == "1") { + ARROW_RETURN_NOT_OK(number_builder->Append(1)); + ARROW_RETURN_NOT_OK(number_builder->Append(2)); + ARROW_RETURN_NOT_OK(number_builder->Append(3)); + } else if (request.ticket == "2") { + ARROW_RETURN_NOT_OK(number_builder->Append(10)); + ARROW_RETURN_NOT_OK(number_builder->Append(20)); + ARROW_RETURN_NOT_OK(number_builder->Append(30)); + } else if (request.ticket == "3") { + ARROW_RETURN_NOT_OK(number_builder->Append(100)); + ARROW_RETURN_NOT_OK(number_builder->Append(200)); + ARROW_RETURN_NOT_OK(number_builder->Append(300)); + } else { + return Status::KeyError("Could not find flight: ", request.ticket); + } + ARROW_ASSIGN_OR_RAISE(auto record_batch, builder->Flush()); + std::vector> record_batches{record_batch}; + ARROW_ASSIGN_OR_RAISE(auto record_batch_reader, + RecordBatchReader::Make(record_batches)); + *stream = std::make_unique(record_batch_reader); + return Status::OK(); + } + + private: + std::shared_ptr BuildSchema() { + return arrow::schema({arrow::field("number", arrow::int32(), false)}); + } +}; + +/// \brief The ordered scenario. +/// +/// This tests that the server and client get expected header values. +class OrderedScenario : public Scenario { + Status MakeServer(std::unique_ptr* server, + FlightServerOptions* options) override { + server->reset(new OrderedServer()); + return Status::OK(); + } + + Status MakeClient(FlightClientOptions* options) override { return Status::OK(); } + + Status RunClient(std::unique_ptr client) override { + ARROW_ASSIGN_OR_RAISE(auto info, + client->GetFlightInfo(FlightDescriptor::Command("ordered"))); + if (!info->ordered()) { + return Status::Invalid("Server must return FlightInfo.ordered = true"); + } + std::vector> tables; + for (const auto& endpoint : info->endpoints()) { + if (!endpoint.locations.empty()) { + std::stringstream ss; + ss << "["; + for (const auto& location : endpoint.locations) { + if (ss.str().size() != 1) { + ss << ", "; + } + ss << location.ToString(); + } + ss << "]"; + return Status::Invalid( + "Expected to receive empty locations to use the original service: ", + ss.str()); + } + ARROW_ASSIGN_OR_RAISE(auto reader, client->DoGet(endpoint.ticket)); + ARROW_ASSIGN_OR_RAISE(auto table, reader->ToTable()); + tables.push_back(table); + } + ARROW_ASSIGN_OR_RAISE(auto table, ConcatenateTables(tables)); + + // Build expected table + auto schema = arrow::schema({arrow::field("number", arrow::int32(), false)}); + ARROW_ASSIGN_OR_RAISE(auto builder, + RecordBatchBuilder::Make(schema, arrow::default_memory_pool())); + auto number_builder = builder->GetFieldAs(0); + ARROW_RETURN_NOT_OK(number_builder->Append(1)); + ARROW_RETURN_NOT_OK(number_builder->Append(2)); + ARROW_RETURN_NOT_OK(number_builder->Append(3)); + ARROW_RETURN_NOT_OK(number_builder->Append(10)); + ARROW_RETURN_NOT_OK(number_builder->Append(20)); + ARROW_RETURN_NOT_OK(number_builder->Append(30)); + ARROW_RETURN_NOT_OK(number_builder->Append(100)); + ARROW_RETURN_NOT_OK(number_builder->Append(200)); + ARROW_RETURN_NOT_OK(number_builder->Append(300)); + ARROW_ASSIGN_OR_RAISE(auto expected_record_batch, builder->Flush()); + std::vector> expected_record_batches{ + expected_record_batch}; + ARROW_ASSIGN_OR_RAISE(auto expected_table, + Table::FromRecordBatches(expected_record_batches)); + + // Check read data + if (!table->Equals(*expected_table)) { + return Status::Invalid("Read data isn't expected\n", "Expected:\n", + expected_table->ToString(), "Actual:\n", table->ToString()); + } + return Status::OK(); + } +}; + /// \brief Schema to be returned for mocking the statement/prepared statement results. /// /// Must be the same across all languages. @@ -382,8 +521,8 @@ class FlightSqlScenarioServer : public sql::FlightSqlServerBase { } ARROW_ASSIGN_OR_RAISE(auto handle, sql::CreateStatementQueryTicket(ticket)); std::vector endpoints{FlightEndpoint{{handle}, {}}}; - ARROW_ASSIGN_OR_RAISE(auto result, - FlightInfo::Make(*schema, descriptor, endpoints, -1, -1)); + ARROW_ASSIGN_OR_RAISE( + auto result, FlightInfo::Make(*schema, descriptor, endpoints, -1, -1, false)); return std::make_unique(result); } @@ -407,8 +546,8 @@ class FlightSqlScenarioServer : public sql::FlightSqlServerBase { } ARROW_ASSIGN_OR_RAISE(auto handle, sql::CreateStatementQueryTicket(ticket)); std::vector endpoints{FlightEndpoint{{handle}, {}}}; - ARROW_ASSIGN_OR_RAISE(auto result, - FlightInfo::Make(*schema, descriptor, endpoints, -1, -1)); + ARROW_ASSIGN_OR_RAISE( + auto result, FlightInfo::Make(*schema, descriptor, endpoints, -1, -1, false)); return std::make_unique(result); } @@ -851,7 +990,7 @@ class FlightSqlScenarioServer : public sql::FlightSqlServerBase { const FlightDescriptor& descriptor, const std::shared_ptr& schema) { std::vector endpoints{FlightEndpoint{{descriptor.cmd}, {}}}; ARROW_ASSIGN_OR_RAISE(auto result, - FlightInfo::Make(*schema, descriptor, endpoints, -1, -1)) + FlightInfo::Make(*schema, descriptor, endpoints, -1, -1, false)) return std::make_unique(result); } @@ -1330,6 +1469,9 @@ Status GetScenario(const std::string& scenario_name, std::shared_ptr* } else if (scenario_name == "middleware") { *out = std::make_shared(); return Status::OK(); + } else if (scenario_name == "ordered") { + *out = std::make_shared(); + return Status::OK(); } else if (scenario_name == "flight_sql") { *out = std::make_shared(); return Status::OK(); diff --git a/cpp/src/arrow/flight/perf_server.cc b/cpp/src/arrow/flight/perf_server.cc index db3e8b150e0..7e7882955e8 100644 --- a/cpp/src/arrow/flight/perf_server.cc +++ b/cpp/src/arrow/flight/perf_server.cc @@ -196,7 +196,7 @@ class FlightPerfServer : public FlightServerBase { perf_request.stream_count() * perf_request.records_per_stream(); *info = std::make_unique( - MakeFlightInfo(*perf_schema_, request, endpoints, total_records, -1)); + MakeFlightInfo(*perf_schema_, request, endpoints, total_records, -1, false)); return Status::OK(); } diff --git a/cpp/src/arrow/flight/serialization_internal.cc b/cpp/src/arrow/flight/serialization_internal.cc index fa21a934bd1..dae547c8938 100644 --- a/cpp/src/arrow/flight/serialization_internal.cc +++ b/cpp/src/arrow/flight/serialization_internal.cc @@ -196,6 +196,7 @@ Status FromProto(const pb::FlightInfo& pb_info, FlightInfo::Data* info) { info->total_records = pb_info.total_records(); info->total_bytes = pb_info.total_bytes(); + info->ordered = pb_info.ordered(); return Status::OK(); } @@ -236,6 +237,7 @@ Status ToProto(const FlightInfo& info, pb::FlightInfo* pb_info) { pb_info->set_total_records(info.total_records()); pb_info->set_total_bytes(info.total_bytes()); + pb_info->set_ordered(info.ordered()); return Status::OK(); } diff --git a/cpp/src/arrow/flight/sql/example/acero_server.cc b/cpp/src/arrow/flight/sql/example/acero_server.cc index 2f1e48b0bee..7c1d9f867ac 100644 --- a/cpp/src/arrow/flight/sql/example/acero_server.cc +++ b/cpp/src/arrow/flight/sql/example/acero_server.cc @@ -167,9 +167,10 @@ class AceroFlightSqlServer : public FlightSqlServerBase { ARROW_ASSIGN_OR_RAISE(auto ticket, CreateStatementQueryTicket(plan)); std::vector endpoints{ FlightEndpoint{Ticket{std::move(ticket)}, /*locations=*/{}}}; - ARROW_ASSIGN_OR_RAISE(auto info, - FlightInfo::Make(schema, descriptor, std::move(endpoints), - /*total_records=*/-1, /*total_bytes=*/-1)); + ARROW_ASSIGN_OR_RAISE( + auto info, + FlightInfo::Make(schema, descriptor, std::move(endpoints), + /*total_records=*/-1, /*total_bytes=*/-1, /*ordered=*/false)); return std::make_unique(std::move(info)); } diff --git a/cpp/src/arrow/flight/sql/example/sqlite_server.cc b/cpp/src/arrow/flight/sql/example/sqlite_server.cc index a02f825a9e6..896f5bf0041 100644 --- a/cpp/src/arrow/flight/sql/example/sqlite_server.cc +++ b/cpp/src/arrow/flight/sql/example/sqlite_server.cc @@ -127,7 +127,7 @@ arrow::Result> GetFlightInfoForCommand( const FlightDescriptor& descriptor, const std::shared_ptr& schema) { std::vector endpoints{FlightEndpoint{{descriptor.cmd}, {}}}; ARROW_ASSIGN_OR_RAISE(auto result, - FlightInfo::Make(*schema, descriptor, endpoints, -1, -1)) + FlightInfo::Make(*schema, descriptor, endpoints, -1, -1, false)) return std::make_unique(result); } @@ -304,8 +304,11 @@ class SQLiteFlightSqlServer::Impl { ARROW_ASSIGN_OR_RAISE(auto ticket, EncodeTransactionQuery(query, command.transaction_id)); std::vector endpoints{FlightEndpoint{std::move(ticket), {}}}; - ARROW_ASSIGN_OR_RAISE(auto result, - FlightInfo::Make(*schema, descriptor, endpoints, -1, -1)) + // TODO: Set true only when "ORDER BY" is used in a main "SELECT" + // in the given query. + const bool ordered = false; + ARROW_ASSIGN_OR_RAISE( + auto result, FlightInfo::Make(*schema, descriptor, endpoints, -1, -1, ordered)); return std::make_unique(result); } @@ -392,7 +395,7 @@ class SQLiteFlightSqlServer::Impl { auto result, FlightInfo::Make(include_schema ? *SqlSchema::GetTablesSchemaWithIncludedSchema() : *SqlSchema::GetTablesSchema(), - descriptor, endpoints, -1, -1)) + descriptor, endpoints, -1, -1, false)) return std::make_unique(std::move(result)); } diff --git a/cpp/src/arrow/flight/sql/server.cc b/cpp/src/arrow/flight/sql/server.cc index 7f6d9b75a88..7621711308c 100644 --- a/cpp/src/arrow/flight/sql/server.cc +++ b/cpp/src/arrow/flight/sql/server.cc @@ -890,8 +890,9 @@ arrow::Result> FlightSqlServerBase::GetFlightInfoSql } std::vector endpoints{FlightEndpoint{{descriptor.cmd}, {}}}; - ARROW_ASSIGN_OR_RAISE(auto result, FlightInfo::Make(*SqlSchema::GetSqlInfoSchema(), - descriptor, endpoints, -1, -1)) + ARROW_ASSIGN_OR_RAISE( + auto result, FlightInfo::Make(*SqlSchema::GetSqlInfoSchema(), descriptor, endpoints, + -1, -1, false)) return std::make_unique(result); } diff --git a/cpp/src/arrow/flight/test_util.cc b/cpp/src/arrow/flight/test_util.cc index 0d6c28b2968..18048670e6c 100644 --- a/cpp/src/arrow/flight/test_util.cc +++ b/cpp/src/arrow/flight/test_util.cc @@ -512,9 +512,9 @@ std::unique_ptr ExampleTestServer() { FlightInfo MakeFlightInfo(const Schema& schema, const FlightDescriptor& descriptor, const std::vector& endpoints, - int64_t total_records, int64_t total_bytes) { + int64_t total_records, int64_t total_bytes, bool ordered) { EXPECT_OK_AND_ASSIGN(auto info, FlightInfo::Make(schema, descriptor, endpoints, - total_records, total_bytes)); + total_records, total_bytes, ordered)); return info; } @@ -600,10 +600,10 @@ std::vector ExampleFlightInfo() { auto schema4 = ExampleFloatSchema(); return { - MakeFlightInfo(*schema1, descr1, {endpoint1, endpoint2}, 1000, 100000), - MakeFlightInfo(*schema2, descr2, {endpoint3}, 1000, 100000), - MakeFlightInfo(*schema3, descr3, {endpoint4}, -1, -1), - MakeFlightInfo(*schema4, descr4, {endpoint5}, 1000, 100000), + MakeFlightInfo(*schema1, descr1, {endpoint1, endpoint2}, 1000, 100000, false), + MakeFlightInfo(*schema2, descr2, {endpoint3}, 1000, 100000, false), + MakeFlightInfo(*schema3, descr3, {endpoint4}, -1, -1, false), + MakeFlightInfo(*schema4, descr4, {endpoint5}, 1000, 100000, false), }; } diff --git a/cpp/src/arrow/flight/test_util.h b/cpp/src/arrow/flight/test_util.h index 679e04fa1b1..9f3ef51826f 100644 --- a/cpp/src/arrow/flight/test_util.h +++ b/cpp/src/arrow/flight/test_util.h @@ -192,7 +192,7 @@ std::vector ExampleActionTypes(); ARROW_FLIGHT_EXPORT FlightInfo MakeFlightInfo(const Schema& schema, const FlightDescriptor& descriptor, const std::vector& endpoints, - int64_t total_records, int64_t total_bytes); + int64_t total_records, int64_t total_bytes, bool ordered); // ---------------------------------------------------------------------- // A pair of authentication handlers that check for a predefined password diff --git a/cpp/src/arrow/flight/types.cc b/cpp/src/arrow/flight/types.cc index b051ec7081a..806e616fdae 100644 --- a/cpp/src/arrow/flight/types.cc +++ b/cpp/src/arrow/flight/types.cc @@ -275,12 +275,14 @@ Status Ticket::Deserialize(const std::string& serialized, Ticket* out) { arrow::Result FlightInfo::Make(const Schema& schema, const FlightDescriptor& descriptor, const std::vector& endpoints, - int64_t total_records, int64_t total_bytes) { + int64_t total_records, int64_t total_bytes, + bool ordered) { FlightInfo::Data data; data.descriptor = descriptor; data.endpoints = endpoints; data.total_records = total_records; data.total_bytes = total_bytes; + data.ordered = ordered; RETURN_NOT_OK(internal::SchemaToString(schema, &data.schema)); return FlightInfo(data); } @@ -355,6 +357,7 @@ std::string FlightInfo::ToString() const { } ss << "] total_records=" << data_.total_records; ss << " total_bytes=" << data_.total_bytes; + ss << " ordered=" << (data_.ordered ? "true" : "false"); ss << '>'; return ss.str(); } @@ -364,7 +367,8 @@ bool FlightInfo::Equals(const FlightInfo& other) const { data_.descriptor == other.data_.descriptor && data_.endpoints == other.data_.endpoints && data_.total_records == other.data_.total_records && - data_.total_bytes == other.data_.total_bytes; + data_.total_bytes == other.data_.total_bytes && + data_.ordered == other.data_.ordered; } Location::Location() { uri_ = std::make_shared(); } diff --git a/cpp/src/arrow/flight/types.h b/cpp/src/arrow/flight/types.h index 39353bcb997..74a80ae2e17 100644 --- a/cpp/src/arrow/flight/types.h +++ b/cpp/src/arrow/flight/types.h @@ -516,6 +516,7 @@ class ARROW_FLIGHT_EXPORT FlightInfo { std::vector endpoints; int64_t total_records; int64_t total_bytes; + bool ordered; }; explicit FlightInfo(Data data) : data_(std::move(data)), reconstructed_schema_(false) {} @@ -524,7 +525,8 @@ class ARROW_FLIGHT_EXPORT FlightInfo { static arrow::Result Make(const Schema& schema, const FlightDescriptor& descriptor, const std::vector& endpoints, - int64_t total_records, int64_t total_bytes); + int64_t total_records, int64_t total_bytes, + bool ordered = false); /// \brief Deserialize the Arrow schema of the dataset. Populate any /// dictionary encoded fields into a DictionaryMemo for @@ -554,6 +556,9 @@ class ARROW_FLIGHT_EXPORT FlightInfo { /// The total number of bytes in the dataset. If unknown, set to -1 int64_t total_bytes() const { return data_.total_bytes; } + /// Whether endpoints are in the same order as the data. + bool ordered() const { return data_.ordered; } + /// \brief Get the wire-format representation of this type. /// /// Useful when interoperating with non-Flight systems (e.g. REST diff --git a/dev/archery/archery/integration/runner.py b/dev/archery/archery/integration/runner.py index 7d26b7a51c5..b733d329165 100644 --- a/dev/archery/archery/integration/runner.py +++ b/dev/archery/archery/integration/runner.py @@ -430,6 +430,11 @@ def run_all_tests(with_cpp=True, with_java=True, with_js=True, "middleware", description="Ensure headers are propagated via middleware.", ), + Scenario( + "ordered", + description="Ensure FlightInfo.ordered is supported.", + skip={"JS", "C#", "Rust"}, + ), Scenario( "flight_sql", description="Ensure Flight SQL protocol is working as expected.", diff --git a/docs/source/format/Flight.rst b/docs/source/format/Flight.rst index c7cfcea2779..c21f13b1f93 100644 --- a/docs/source/format/Flight.rst +++ b/docs/source/format/Flight.rst @@ -90,9 +90,21 @@ A client that wishes to download the data would: An endpoint contains a list of locations (server addresses) where this data can be retrieved from, and a ``Ticket``, an opaque binary token that the server will use to identify the data being - requested. There is no ordering defined on endpoints or the data - within, so if the dataset is sorted, applications should return - data in a single endpoint. + requested. + + If ``FlightInfo.ordered`` is true, this signals there is some order + between data from different endpoints. Clients should produce the + same results as if the data returned from each of the endpoints was + concatenated, in order, from front to back. + + If ``FlightInfo.ordered`` is false, the client may return data + from any of the endpoints in arbitrary order. Data from any + specific endpoint must be returned in order, but the data from + different endpoints may be interleaved to allow parallel fetches. + + Note that since some clients may ignore ``FlightInfo.ordered``, if + ordering is important and client support can not be ensured, + servers should return a single endpoint. The response also contains other metadata, like the schema, and optionally an estimate of the dataset size. @@ -117,7 +129,9 @@ A client that wishes to download the data would: The client must consume all endpoints to retrieve the complete data set. The client can consume endpoints in any order, or even in parallel, or distribute the endpoints among multiple machines for - consumption; this is up to the application to implement. + consumption; this is up to the application to implement. The client + can also use ``FlightInfo.ordered``. See the previous item for + details of ``FlightInfo.ordered``. Uploading Data -------------- @@ -216,7 +230,7 @@ Flight is primarily defined in terms of its Protobuf and gRPC specification below, but Arrow implementations may also support alternative transports (see :ref:`status-flight-rpc`). In that case, implementations should use the following URI schemes for the given -transport implemenatations: +transport implementations: +----------------------------+----------------------------+ | Transport | URI Scheme | diff --git a/format/Flight.proto b/format/Flight.proto index 635b1793d2b..8d1187976c4 100644 --- a/format/Flight.proto +++ b/format/Flight.proto @@ -266,14 +266,32 @@ message FlightInfo { * In other words, an application can use multiple endpoints to * represent partitioned data. * - * There is no ordering defined on endpoints. Hence, if the returned - * data has an ordering, it should be returned in a single endpoint. + * If the returned data has an ordering, an application can use + * "FlightInfo.ordered = true" or should return the all data in a + * single endpoint. Otherwise, there is no ordering defined on + * endpoints or the data within. + * + * A client can read ordered data by reading data from returned + * endpoints, in order, from front to back. + * + * Note that a client may ignore "FlightInfo.ordered = true". If an + * ordering is important for an application, an application must + * choose one of them: + * + * * An application requires that all clients must read data in + * returned endpoints order. + * * An application must return the all data in a single endpoint. */ repeated FlightEndpoint endpoint = 3; // Set these to -1 if unknown. int64 total_records = 4; int64 total_bytes = 5; + + /* + * FlightEndpoints are in the same order as the data. + */ + bool ordered = 6; } /* diff --git a/go/arrow/flight/internal/flight/Flight.pb.go b/go/arrow/flight/internal/flight/Flight.pb.go index b7be492acd4..c0f4ad9e85e 100644 --- a/go/arrow/flight/internal/flight/Flight.pb.go +++ b/go/arrow/flight/internal/flight/Flight.pb.go @@ -18,7 +18,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: // protoc-gen-go v1.28.1 -// protoc v3.12.4 +// protoc v3.21.12 // source: Flight.proto package flight @@ -37,19 +37,17 @@ const ( _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) ) -// // Describes what type of descriptor is defined. type FlightDescriptor_DescriptorType int32 const ( // Protobuf pattern, not used. FlightDescriptor_UNKNOWN FlightDescriptor_DescriptorType = 0 - // // A named path that identifies a dataset. A path is composed of a string // or list of strings describing a particular dataset. This is conceptually - // similar to a path inside a filesystem. - FlightDescriptor_PATH FlightDescriptor_DescriptorType = 1 // + // similar to a path inside a filesystem. + FlightDescriptor_PATH FlightDescriptor_DescriptorType = 1 // An opaque command to generate a dataset. FlightDescriptor_CMD FlightDescriptor_DescriptorType = 2 ) @@ -95,17 +93,14 @@ func (FlightDescriptor_DescriptorType) EnumDescriptor() ([]byte, []int) { return file_Flight_proto_rawDescGZIP(), []int{9, 0} } -// // The request that a client provides to a server on handshake. type HandshakeRequest struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache unknownFields protoimpl.UnknownFields - // // A defined protocol version ProtocolVersion uint64 `protobuf:"varint,1,opt,name=protocol_version,json=protocolVersion,proto3" json:"protocol_version,omitempty"` - // // Arbitrary auth/handshake info. Payload []byte `protobuf:"bytes,2,opt,name=payload,proto3" json:"payload,omitempty"` } @@ -161,10 +156,8 @@ type HandshakeResponse struct { sizeCache protoimpl.SizeCache unknownFields protoimpl.UnknownFields - // // A defined protocol version ProtocolVersion uint64 `protobuf:"varint,1,opt,name=protocol_version,json=protocolVersion,proto3" json:"protocol_version,omitempty"` - // // Arbitrary auth/handshake info. Payload []byte `protobuf:"bytes,2,opt,name=payload,proto3" json:"payload,omitempty"` } @@ -215,7 +208,6 @@ func (x *HandshakeResponse) GetPayload() []byte { return nil } -// // A message for doing simple auth. type BasicAuth struct { state protoimpl.MessageState @@ -310,7 +302,6 @@ func (*Empty) Descriptor() ([]byte, []int) { return file_Flight_proto_rawDescGZIP(), []int{3} } -// // Describes an available action, including both the name used for execution // along with a short description of the purpose of the action. type ActionType struct { @@ -368,7 +359,6 @@ func (x *ActionType) GetDescription() string { return "" } -// // A service specific expression that can be used to return a limited set // of available Arrow Flight streams. type Criteria struct { @@ -418,7 +408,6 @@ func (x *Criteria) GetExpression() []byte { return nil } -// // An opaque action specific for the service. type Action struct { state protoimpl.MessageState @@ -475,7 +464,6 @@ func (x *Action) GetBody() []byte { return nil } -// // An opaque result returned after executing an action. type Result struct { state protoimpl.MessageState @@ -524,7 +512,6 @@ func (x *Result) GetBody() []byte { return nil } -// // Wrap the result of a getSchema call type SchemaResult struct { state protoimpl.MessageState @@ -532,9 +519,10 @@ type SchemaResult struct { unknownFields protoimpl.UnknownFields // The schema of the dataset in its IPC form: - // 4 bytes - an optional IPC_CONTINUATION_TOKEN prefix - // 4 bytes - the byte length of the payload - // a flatbuffer Message whose header is the Schema + // + // 4 bytes - an optional IPC_CONTINUATION_TOKEN prefix + // 4 bytes - the byte length of the payload + // a flatbuffer Message whose header is the Schema Schema []byte `protobuf:"bytes,1,opt,name=schema,proto3" json:"schema,omitempty"` } @@ -577,7 +565,6 @@ func (x *SchemaResult) GetSchema() []byte { return nil } -// // The name or tag for a Flight. May be used as a way to retrieve or generate // a flight or be used to expose a set of previously defined flights. type FlightDescriptor struct { @@ -586,11 +573,9 @@ type FlightDescriptor struct { unknownFields protoimpl.UnknownFields Type FlightDescriptor_DescriptorType `protobuf:"varint,1,opt,name=type,proto3,enum=arrow.flight.protocol.FlightDescriptor_DescriptorType" json:"type,omitempty"` - // // Opaque value used to express a command. Should only be defined when // type = CMD. Cmd []byte `protobuf:"bytes,2,opt,name=cmd,proto3" json:"cmd,omitempty"` - // // List of strings identifying a particular dataset. Should only be defined // when type = PATH. Path []string `protobuf:"bytes,3,rep,name=path,proto3" json:"path,omitempty"` @@ -649,7 +634,6 @@ func (x *FlightDescriptor) GetPath() []string { return nil } -// // The access coordinates for retrieval of a dataset. With a FlightInfo, a // consumer is able to determine how to retrieve a dataset. type FlightInfo struct { @@ -658,14 +642,13 @@ type FlightInfo struct { unknownFields protoimpl.UnknownFields // The schema of the dataset in its IPC form: - // 4 bytes - an optional IPC_CONTINUATION_TOKEN prefix - // 4 bytes - the byte length of the payload - // a flatbuffer Message whose header is the Schema - Schema []byte `protobuf:"bytes,1,opt,name=schema,proto3" json:"schema,omitempty"` // + // 4 bytes - an optional IPC_CONTINUATION_TOKEN prefix + // 4 bytes - the byte length of the payload + // a flatbuffer Message whose header is the Schema + Schema []byte `protobuf:"bytes,1,opt,name=schema,proto3" json:"schema,omitempty"` // The descriptor associated with this info. FlightDescriptor *FlightDescriptor `protobuf:"bytes,2,opt,name=flight_descriptor,json=flightDescriptor,proto3" json:"flight_descriptor,omitempty"` - // // A list of endpoints associated with the flight. To consume the // whole flight, all endpoints (and hence all Tickets) must be // consumed. Endpoints can be consumed in any order. @@ -673,12 +656,27 @@ type FlightInfo struct { // In other words, an application can use multiple endpoints to // represent partitioned data. // - // There is no ordering defined on endpoints. Hence, if the returned - // data has an ordering, it should be returned in a single endpoint. + // If the returned data has an ordering, an application can use + // "FlightInfo.ordered = true" or should return the all data in a + // single endpoint. Otherwise, there is no ordering defined on + // endpoints or the data within. + // + // A client can read ordered data by reading data from returned + // endpoints in order from front to back. + // + // Note that a client may ignore "FlightInfo.ordered = true". If an + // ordering is important for an application, an application must + // choose one of them: + // + // - An application requires that all clients must read data in + // returned endpoints order. + // - An application must return the all data in a single endpoint. Endpoint []*FlightEndpoint `protobuf:"bytes,3,rep,name=endpoint,proto3" json:"endpoint,omitempty"` // Set these to -1 if unknown. TotalRecords int64 `protobuf:"varint,4,opt,name=total_records,json=totalRecords,proto3" json:"total_records,omitempty"` TotalBytes int64 `protobuf:"varint,5,opt,name=total_bytes,json=totalBytes,proto3" json:"total_bytes,omitempty"` + // FlightEndpoints are in the same order as the data. + Ordered bool `protobuf:"varint,6,opt,name=ordered,proto3" json:"ordered,omitempty"` } func (x *FlightInfo) Reset() { @@ -748,17 +746,21 @@ func (x *FlightInfo) GetTotalBytes() int64 { return 0 } -// +func (x *FlightInfo) GetOrdered() bool { + if x != nil { + return x.Ordered + } + return false +} + // A particular stream or split associated with a flight. type FlightEndpoint struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache unknownFields protoimpl.UnknownFields - // // Token used to retrieve this stream. Ticket *Ticket `protobuf:"bytes,1,opt,name=ticket,proto3" json:"ticket,omitempty"` - // // A list of URIs where this ticket can be redeemed via DoGet(). // // If the list is empty, the expectation is that the ticket can only @@ -822,7 +824,6 @@ func (x *FlightEndpoint) GetLocation() []*Location { return nil } -// // A location where a Flight service will accept retrieval of a particular // stream given a ticket. type Location struct { @@ -872,7 +873,6 @@ func (x *Location) GetUri() string { return "" } -// // An opaque identifier that the service can use to retrieve a particular // portion of a stream. // @@ -925,24 +925,19 @@ func (x *Ticket) GetTicket() []byte { return nil } -// // A batch of Arrow data as part of a stream of batches. type FlightData struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache unknownFields protoimpl.UnknownFields - // // The descriptor of the data. This is only relevant when a client is // starting a new DoPut stream. FlightDescriptor *FlightDescriptor `protobuf:"bytes,1,opt,name=flight_descriptor,json=flightDescriptor,proto3" json:"flight_descriptor,omitempty"` - // // Header for message data as described in Message.fbs::Message. DataHeader []byte `protobuf:"bytes,2,opt,name=data_header,json=dataHeader,proto3" json:"data_header,omitempty"` - // // Application-defined metadata. AppMetadata []byte `protobuf:"bytes,3,opt,name=app_metadata,json=appMetadata,proto3" json:"app_metadata,omitempty"` - // // The actual batch of Arrow data. Preferably handled with minimal-copies // coming last in the definition to help with sidecar patterns (it is // expected that some implementations will fetch this field off the wire @@ -1010,7 +1005,7 @@ func (x *FlightData) GetDataBody() []byte { return nil } -//* +// * // The response message associated with the submission of a DoPut. type PutResult struct { state protoimpl.MessageState @@ -1106,7 +1101,7 @@ var file_Flight_proto_rawDesc = []byte{ 0x04, 0x70, 0x61, 0x74, 0x68, 0x22, 0x30, 0x0a, 0x0e, 0x44, 0x65, 0x73, 0x63, 0x72, 0x69, 0x70, 0x74, 0x6f, 0x72, 0x54, 0x79, 0x70, 0x65, 0x12, 0x0b, 0x0a, 0x07, 0x55, 0x4e, 0x4b, 0x4e, 0x4f, 0x57, 0x4e, 0x10, 0x00, 0x12, 0x08, 0x0a, 0x04, 0x50, 0x41, 0x54, 0x48, 0x10, 0x01, 0x12, 0x07, - 0x0a, 0x03, 0x43, 0x4d, 0x44, 0x10, 0x02, 0x22, 0x83, 0x02, 0x0a, 0x0a, 0x46, 0x6c, 0x69, 0x67, + 0x0a, 0x03, 0x43, 0x4d, 0x44, 0x10, 0x02, 0x22, 0x9d, 0x02, 0x0a, 0x0a, 0x46, 0x6c, 0x69, 0x67, 0x68, 0x74, 0x49, 0x6e, 0x66, 0x6f, 0x12, 0x16, 0x0a, 0x06, 0x73, 0x63, 0x68, 0x65, 0x6d, 0x61, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x06, 0x73, 0x63, 0x68, 0x65, 0x6d, 0x61, 0x12, 0x54, 0x0a, 0x11, 0x66, 0x6c, 0x69, 0x67, 0x68, 0x74, 0x5f, 0x64, 0x65, 0x73, 0x63, 0x72, 0x69, 0x70, @@ -1122,94 +1117,95 @@ var file_Flight_proto_rawDesc = []byte{ 0x5f, 0x72, 0x65, 0x63, 0x6f, 0x72, 0x64, 0x73, 0x18, 0x04, 0x20, 0x01, 0x28, 0x03, 0x52, 0x0c, 0x74, 0x6f, 0x74, 0x61, 0x6c, 0x52, 0x65, 0x63, 0x6f, 0x72, 0x64, 0x73, 0x12, 0x1f, 0x0a, 0x0b, 0x74, 0x6f, 0x74, 0x61, 0x6c, 0x5f, 0x62, 0x79, 0x74, 0x65, 0x73, 0x18, 0x05, 0x20, 0x01, 0x28, - 0x03, 0x52, 0x0a, 0x74, 0x6f, 0x74, 0x61, 0x6c, 0x42, 0x79, 0x74, 0x65, 0x73, 0x22, 0x84, 0x01, - 0x0a, 0x0e, 0x46, 0x6c, 0x69, 0x67, 0x68, 0x74, 0x45, 0x6e, 0x64, 0x70, 0x6f, 0x69, 0x6e, 0x74, - 0x12, 0x35, 0x0a, 0x06, 0x74, 0x69, 0x63, 0x6b, 0x65, 0x74, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, - 0x32, 0x1d, 0x2e, 0x61, 0x72, 0x72, 0x6f, 0x77, 0x2e, 0x66, 0x6c, 0x69, 0x67, 0x68, 0x74, 0x2e, - 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x63, 0x6f, 0x6c, 0x2e, 0x54, 0x69, 0x63, 0x6b, 0x65, 0x74, 0x52, - 0x06, 0x74, 0x69, 0x63, 0x6b, 0x65, 0x74, 0x12, 0x3b, 0x0a, 0x08, 0x6c, 0x6f, 0x63, 0x61, 0x74, - 0x69, 0x6f, 0x6e, 0x18, 0x02, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x1f, 0x2e, 0x61, 0x72, 0x72, 0x6f, - 0x77, 0x2e, 0x66, 0x6c, 0x69, 0x67, 0x68, 0x74, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x63, 0x6f, - 0x6c, 0x2e, 0x4c, 0x6f, 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x52, 0x08, 0x6c, 0x6f, 0x63, 0x61, - 0x74, 0x69, 0x6f, 0x6e, 0x22, 0x1c, 0x0a, 0x08, 0x4c, 0x6f, 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, - 0x12, 0x10, 0x0a, 0x03, 0x75, 0x72, 0x69, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x75, - 0x72, 0x69, 0x22, 0x20, 0x0a, 0x06, 0x54, 0x69, 0x63, 0x6b, 0x65, 0x74, 0x12, 0x16, 0x0a, 0x06, - 0x74, 0x69, 0x63, 0x6b, 0x65, 0x74, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x06, 0x74, 0x69, - 0x63, 0x6b, 0x65, 0x74, 0x22, 0xc4, 0x01, 0x0a, 0x0a, 0x46, 0x6c, 0x69, 0x67, 0x68, 0x74, 0x44, - 0x61, 0x74, 0x61, 0x12, 0x54, 0x0a, 0x11, 0x66, 0x6c, 0x69, 0x67, 0x68, 0x74, 0x5f, 0x64, 0x65, - 0x73, 0x63, 0x72, 0x69, 0x70, 0x74, 0x6f, 0x72, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x27, - 0x2e, 0x61, 0x72, 0x72, 0x6f, 0x77, 0x2e, 0x66, 0x6c, 0x69, 0x67, 0x68, 0x74, 0x2e, 0x70, 0x72, - 0x6f, 0x74, 0x6f, 0x63, 0x6f, 0x6c, 0x2e, 0x46, 0x6c, 0x69, 0x67, 0x68, 0x74, 0x44, 0x65, 0x73, - 0x63, 0x72, 0x69, 0x70, 0x74, 0x6f, 0x72, 0x52, 0x10, 0x66, 0x6c, 0x69, 0x67, 0x68, 0x74, 0x44, - 0x65, 0x73, 0x63, 0x72, 0x69, 0x70, 0x74, 0x6f, 0x72, 0x12, 0x1f, 0x0a, 0x0b, 0x64, 0x61, 0x74, - 0x61, 0x5f, 0x68, 0x65, 0x61, 0x64, 0x65, 0x72, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x0a, - 0x64, 0x61, 0x74, 0x61, 0x48, 0x65, 0x61, 0x64, 0x65, 0x72, 0x12, 0x21, 0x0a, 0x0c, 0x61, 0x70, - 0x70, 0x5f, 0x6d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0c, - 0x52, 0x0b, 0x61, 0x70, 0x70, 0x4d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0x12, 0x1c, 0x0a, - 0x09, 0x64, 0x61, 0x74, 0x61, 0x5f, 0x62, 0x6f, 0x64, 0x79, 0x18, 0xe8, 0x07, 0x20, 0x01, 0x28, - 0x0c, 0x52, 0x08, 0x64, 0x61, 0x74, 0x61, 0x42, 0x6f, 0x64, 0x79, 0x22, 0x2e, 0x0a, 0x09, 0x50, - 0x75, 0x74, 0x52, 0x65, 0x73, 0x75, 0x6c, 0x74, 0x12, 0x21, 0x0a, 0x0c, 0x61, 0x70, 0x70, 0x5f, - 0x6d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x0b, - 0x61, 0x70, 0x70, 0x4d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0x32, 0xa7, 0x06, 0x0a, 0x0d, - 0x46, 0x6c, 0x69, 0x67, 0x68, 0x74, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x12, 0x64, 0x0a, - 0x09, 0x48, 0x61, 0x6e, 0x64, 0x73, 0x68, 0x61, 0x6b, 0x65, 0x12, 0x27, 0x2e, 0x61, 0x72, 0x72, + 0x03, 0x52, 0x0a, 0x74, 0x6f, 0x74, 0x61, 0x6c, 0x42, 0x79, 0x74, 0x65, 0x73, 0x12, 0x18, 0x0a, + 0x07, 0x6f, 0x72, 0x64, 0x65, 0x72, 0x65, 0x64, 0x18, 0x06, 0x20, 0x01, 0x28, 0x08, 0x52, 0x07, + 0x6f, 0x72, 0x64, 0x65, 0x72, 0x65, 0x64, 0x22, 0x84, 0x01, 0x0a, 0x0e, 0x46, 0x6c, 0x69, 0x67, + 0x68, 0x74, 0x45, 0x6e, 0x64, 0x70, 0x6f, 0x69, 0x6e, 0x74, 0x12, 0x35, 0x0a, 0x06, 0x74, 0x69, + 0x63, 0x6b, 0x65, 0x74, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1d, 0x2e, 0x61, 0x72, 0x72, 0x6f, 0x77, 0x2e, 0x66, 0x6c, 0x69, 0x67, 0x68, 0x74, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x63, - 0x6f, 0x6c, 0x2e, 0x48, 0x61, 0x6e, 0x64, 0x73, 0x68, 0x61, 0x6b, 0x65, 0x52, 0x65, 0x71, 0x75, - 0x65, 0x73, 0x74, 0x1a, 0x28, 0x2e, 0x61, 0x72, 0x72, 0x6f, 0x77, 0x2e, 0x66, 0x6c, 0x69, 0x67, - 0x68, 0x74, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x63, 0x6f, 0x6c, 0x2e, 0x48, 0x61, 0x6e, 0x64, - 0x73, 0x68, 0x61, 0x6b, 0x65, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x28, - 0x01, 0x30, 0x01, 0x12, 0x55, 0x0a, 0x0b, 0x4c, 0x69, 0x73, 0x74, 0x46, 0x6c, 0x69, 0x67, 0x68, - 0x74, 0x73, 0x12, 0x1f, 0x2e, 0x61, 0x72, 0x72, 0x6f, 0x77, 0x2e, 0x66, 0x6c, 0x69, 0x67, 0x68, - 0x74, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x63, 0x6f, 0x6c, 0x2e, 0x43, 0x72, 0x69, 0x74, 0x65, - 0x72, 0x69, 0x61, 0x1a, 0x21, 0x2e, 0x61, 0x72, 0x72, 0x6f, 0x77, 0x2e, 0x66, 0x6c, 0x69, 0x67, - 0x68, 0x74, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x63, 0x6f, 0x6c, 0x2e, 0x46, 0x6c, 0x69, 0x67, - 0x68, 0x74, 0x49, 0x6e, 0x66, 0x6f, 0x22, 0x00, 0x30, 0x01, 0x12, 0x5d, 0x0a, 0x0d, 0x47, 0x65, - 0x74, 0x46, 0x6c, 0x69, 0x67, 0x68, 0x74, 0x49, 0x6e, 0x66, 0x6f, 0x12, 0x27, 0x2e, 0x61, 0x72, + 0x6f, 0x6c, 0x2e, 0x54, 0x69, 0x63, 0x6b, 0x65, 0x74, 0x52, 0x06, 0x74, 0x69, 0x63, 0x6b, 0x65, + 0x74, 0x12, 0x3b, 0x0a, 0x08, 0x6c, 0x6f, 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x18, 0x02, 0x20, + 0x03, 0x28, 0x0b, 0x32, 0x1f, 0x2e, 0x61, 0x72, 0x72, 0x6f, 0x77, 0x2e, 0x66, 0x6c, 0x69, 0x67, + 0x68, 0x74, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x63, 0x6f, 0x6c, 0x2e, 0x4c, 0x6f, 0x63, 0x61, + 0x74, 0x69, 0x6f, 0x6e, 0x52, 0x08, 0x6c, 0x6f, 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x22, 0x1c, + 0x0a, 0x08, 0x4c, 0x6f, 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x12, 0x10, 0x0a, 0x03, 0x75, 0x72, + 0x69, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x75, 0x72, 0x69, 0x22, 0x20, 0x0a, 0x06, + 0x54, 0x69, 0x63, 0x6b, 0x65, 0x74, 0x12, 0x16, 0x0a, 0x06, 0x74, 0x69, 0x63, 0x6b, 0x65, 0x74, + 0x18, 0x01, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x06, 0x74, 0x69, 0x63, 0x6b, 0x65, 0x74, 0x22, 0xc4, + 0x01, 0x0a, 0x0a, 0x46, 0x6c, 0x69, 0x67, 0x68, 0x74, 0x44, 0x61, 0x74, 0x61, 0x12, 0x54, 0x0a, + 0x11, 0x66, 0x6c, 0x69, 0x67, 0x68, 0x74, 0x5f, 0x64, 0x65, 0x73, 0x63, 0x72, 0x69, 0x70, 0x74, + 0x6f, 0x72, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x27, 0x2e, 0x61, 0x72, 0x72, 0x6f, 0x77, + 0x2e, 0x66, 0x6c, 0x69, 0x67, 0x68, 0x74, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x63, 0x6f, 0x6c, + 0x2e, 0x46, 0x6c, 0x69, 0x67, 0x68, 0x74, 0x44, 0x65, 0x73, 0x63, 0x72, 0x69, 0x70, 0x74, 0x6f, + 0x72, 0x52, 0x10, 0x66, 0x6c, 0x69, 0x67, 0x68, 0x74, 0x44, 0x65, 0x73, 0x63, 0x72, 0x69, 0x70, + 0x74, 0x6f, 0x72, 0x12, 0x1f, 0x0a, 0x0b, 0x64, 0x61, 0x74, 0x61, 0x5f, 0x68, 0x65, 0x61, 0x64, + 0x65, 0x72, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x0a, 0x64, 0x61, 0x74, 0x61, 0x48, 0x65, + 0x61, 0x64, 0x65, 0x72, 0x12, 0x21, 0x0a, 0x0c, 0x61, 0x70, 0x70, 0x5f, 0x6d, 0x65, 0x74, 0x61, + 0x64, 0x61, 0x74, 0x61, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x0b, 0x61, 0x70, 0x70, 0x4d, + 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0x12, 0x1c, 0x0a, 0x09, 0x64, 0x61, 0x74, 0x61, 0x5f, + 0x62, 0x6f, 0x64, 0x79, 0x18, 0xe8, 0x07, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x08, 0x64, 0x61, 0x74, + 0x61, 0x42, 0x6f, 0x64, 0x79, 0x22, 0x2e, 0x0a, 0x09, 0x50, 0x75, 0x74, 0x52, 0x65, 0x73, 0x75, + 0x6c, 0x74, 0x12, 0x21, 0x0a, 0x0c, 0x61, 0x70, 0x70, 0x5f, 0x6d, 0x65, 0x74, 0x61, 0x64, 0x61, + 0x74, 0x61, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x0b, 0x61, 0x70, 0x70, 0x4d, 0x65, 0x74, + 0x61, 0x64, 0x61, 0x74, 0x61, 0x32, 0xa7, 0x06, 0x0a, 0x0d, 0x46, 0x6c, 0x69, 0x67, 0x68, 0x74, + 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x12, 0x64, 0x0a, 0x09, 0x48, 0x61, 0x6e, 0x64, 0x73, + 0x68, 0x61, 0x6b, 0x65, 0x12, 0x27, 0x2e, 0x61, 0x72, 0x72, 0x6f, 0x77, 0x2e, 0x66, 0x6c, 0x69, + 0x67, 0x68, 0x74, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x63, 0x6f, 0x6c, 0x2e, 0x48, 0x61, 0x6e, + 0x64, 0x73, 0x68, 0x61, 0x6b, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x28, 0x2e, + 0x61, 0x72, 0x72, 0x6f, 0x77, 0x2e, 0x66, 0x6c, 0x69, 0x67, 0x68, 0x74, 0x2e, 0x70, 0x72, 0x6f, + 0x74, 0x6f, 0x63, 0x6f, 0x6c, 0x2e, 0x48, 0x61, 0x6e, 0x64, 0x73, 0x68, 0x61, 0x6b, 0x65, 0x52, + 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x28, 0x01, 0x30, 0x01, 0x12, 0x55, 0x0a, + 0x0b, 0x4c, 0x69, 0x73, 0x74, 0x46, 0x6c, 0x69, 0x67, 0x68, 0x74, 0x73, 0x12, 0x1f, 0x2e, 0x61, + 0x72, 0x72, 0x6f, 0x77, 0x2e, 0x66, 0x6c, 0x69, 0x67, 0x68, 0x74, 0x2e, 0x70, 0x72, 0x6f, 0x74, + 0x6f, 0x63, 0x6f, 0x6c, 0x2e, 0x43, 0x72, 0x69, 0x74, 0x65, 0x72, 0x69, 0x61, 0x1a, 0x21, 0x2e, + 0x61, 0x72, 0x72, 0x6f, 0x77, 0x2e, 0x66, 0x6c, 0x69, 0x67, 0x68, 0x74, 0x2e, 0x70, 0x72, 0x6f, + 0x74, 0x6f, 0x63, 0x6f, 0x6c, 0x2e, 0x46, 0x6c, 0x69, 0x67, 0x68, 0x74, 0x49, 0x6e, 0x66, 0x6f, + 0x22, 0x00, 0x30, 0x01, 0x12, 0x5d, 0x0a, 0x0d, 0x47, 0x65, 0x74, 0x46, 0x6c, 0x69, 0x67, 0x68, + 0x74, 0x49, 0x6e, 0x66, 0x6f, 0x12, 0x27, 0x2e, 0x61, 0x72, 0x72, 0x6f, 0x77, 0x2e, 0x66, 0x6c, + 0x69, 0x67, 0x68, 0x74, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x63, 0x6f, 0x6c, 0x2e, 0x46, 0x6c, + 0x69, 0x67, 0x68, 0x74, 0x44, 0x65, 0x73, 0x63, 0x72, 0x69, 0x70, 0x74, 0x6f, 0x72, 0x1a, 0x21, + 0x2e, 0x61, 0x72, 0x72, 0x6f, 0x77, 0x2e, 0x66, 0x6c, 0x69, 0x67, 0x68, 0x74, 0x2e, 0x70, 0x72, + 0x6f, 0x74, 0x6f, 0x63, 0x6f, 0x6c, 0x2e, 0x46, 0x6c, 0x69, 0x67, 0x68, 0x74, 0x49, 0x6e, 0x66, + 0x6f, 0x22, 0x00, 0x12, 0x5b, 0x0a, 0x09, 0x47, 0x65, 0x74, 0x53, 0x63, 0x68, 0x65, 0x6d, 0x61, + 0x12, 0x27, 0x2e, 0x61, 0x72, 0x72, 0x6f, 0x77, 0x2e, 0x66, 0x6c, 0x69, 0x67, 0x68, 0x74, 0x2e, + 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x63, 0x6f, 0x6c, 0x2e, 0x46, 0x6c, 0x69, 0x67, 0x68, 0x74, 0x44, + 0x65, 0x73, 0x63, 0x72, 0x69, 0x70, 0x74, 0x6f, 0x72, 0x1a, 0x23, 0x2e, 0x61, 0x72, 0x72, 0x6f, + 0x77, 0x2e, 0x66, 0x6c, 0x69, 0x67, 0x68, 0x74, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x63, 0x6f, + 0x6c, 0x2e, 0x53, 0x63, 0x68, 0x65, 0x6d, 0x61, 0x52, 0x65, 0x73, 0x75, 0x6c, 0x74, 0x22, 0x00, + 0x12, 0x4d, 0x0a, 0x05, 0x44, 0x6f, 0x47, 0x65, 0x74, 0x12, 0x1d, 0x2e, 0x61, 0x72, 0x72, 0x6f, + 0x77, 0x2e, 0x66, 0x6c, 0x69, 0x67, 0x68, 0x74, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x63, 0x6f, + 0x6c, 0x2e, 0x54, 0x69, 0x63, 0x6b, 0x65, 0x74, 0x1a, 0x21, 0x2e, 0x61, 0x72, 0x72, 0x6f, 0x77, + 0x2e, 0x66, 0x6c, 0x69, 0x67, 0x68, 0x74, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x63, 0x6f, 0x6c, + 0x2e, 0x46, 0x6c, 0x69, 0x67, 0x68, 0x74, 0x44, 0x61, 0x74, 0x61, 0x22, 0x00, 0x30, 0x01, 0x12, + 0x52, 0x0a, 0x05, 0x44, 0x6f, 0x50, 0x75, 0x74, 0x12, 0x21, 0x2e, 0x61, 0x72, 0x72, 0x6f, 0x77, + 0x2e, 0x66, 0x6c, 0x69, 0x67, 0x68, 0x74, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x63, 0x6f, 0x6c, + 0x2e, 0x46, 0x6c, 0x69, 0x67, 0x68, 0x74, 0x44, 0x61, 0x74, 0x61, 0x1a, 0x20, 0x2e, 0x61, 0x72, 0x72, 0x6f, 0x77, 0x2e, 0x66, 0x6c, 0x69, 0x67, 0x68, 0x74, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, - 0x63, 0x6f, 0x6c, 0x2e, 0x46, 0x6c, 0x69, 0x67, 0x68, 0x74, 0x44, 0x65, 0x73, 0x63, 0x72, 0x69, - 0x70, 0x74, 0x6f, 0x72, 0x1a, 0x21, 0x2e, 0x61, 0x72, 0x72, 0x6f, 0x77, 0x2e, 0x66, 0x6c, 0x69, + 0x63, 0x6f, 0x6c, 0x2e, 0x50, 0x75, 0x74, 0x52, 0x65, 0x73, 0x75, 0x6c, 0x74, 0x22, 0x00, 0x28, + 0x01, 0x30, 0x01, 0x12, 0x58, 0x0a, 0x0a, 0x44, 0x6f, 0x45, 0x78, 0x63, 0x68, 0x61, 0x6e, 0x67, + 0x65, 0x12, 0x21, 0x2e, 0x61, 0x72, 0x72, 0x6f, 0x77, 0x2e, 0x66, 0x6c, 0x69, 0x67, 0x68, 0x74, + 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x63, 0x6f, 0x6c, 0x2e, 0x46, 0x6c, 0x69, 0x67, 0x68, 0x74, + 0x44, 0x61, 0x74, 0x61, 0x1a, 0x21, 0x2e, 0x61, 0x72, 0x72, 0x6f, 0x77, 0x2e, 0x66, 0x6c, 0x69, 0x67, 0x68, 0x74, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x63, 0x6f, 0x6c, 0x2e, 0x46, 0x6c, 0x69, - 0x67, 0x68, 0x74, 0x49, 0x6e, 0x66, 0x6f, 0x22, 0x00, 0x12, 0x5b, 0x0a, 0x09, 0x47, 0x65, 0x74, - 0x53, 0x63, 0x68, 0x65, 0x6d, 0x61, 0x12, 0x27, 0x2e, 0x61, 0x72, 0x72, 0x6f, 0x77, 0x2e, 0x66, - 0x6c, 0x69, 0x67, 0x68, 0x74, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x63, 0x6f, 0x6c, 0x2e, 0x46, - 0x6c, 0x69, 0x67, 0x68, 0x74, 0x44, 0x65, 0x73, 0x63, 0x72, 0x69, 0x70, 0x74, 0x6f, 0x72, 0x1a, - 0x23, 0x2e, 0x61, 0x72, 0x72, 0x6f, 0x77, 0x2e, 0x66, 0x6c, 0x69, 0x67, 0x68, 0x74, 0x2e, 0x70, - 0x72, 0x6f, 0x74, 0x6f, 0x63, 0x6f, 0x6c, 0x2e, 0x53, 0x63, 0x68, 0x65, 0x6d, 0x61, 0x52, 0x65, - 0x73, 0x75, 0x6c, 0x74, 0x22, 0x00, 0x12, 0x4d, 0x0a, 0x05, 0x44, 0x6f, 0x47, 0x65, 0x74, 0x12, - 0x1d, 0x2e, 0x61, 0x72, 0x72, 0x6f, 0x77, 0x2e, 0x66, 0x6c, 0x69, 0x67, 0x68, 0x74, 0x2e, 0x70, - 0x72, 0x6f, 0x74, 0x6f, 0x63, 0x6f, 0x6c, 0x2e, 0x54, 0x69, 0x63, 0x6b, 0x65, 0x74, 0x1a, 0x21, - 0x2e, 0x61, 0x72, 0x72, 0x6f, 0x77, 0x2e, 0x66, 0x6c, 0x69, 0x67, 0x68, 0x74, 0x2e, 0x70, 0x72, - 0x6f, 0x74, 0x6f, 0x63, 0x6f, 0x6c, 0x2e, 0x46, 0x6c, 0x69, 0x67, 0x68, 0x74, 0x44, 0x61, 0x74, - 0x61, 0x22, 0x00, 0x30, 0x01, 0x12, 0x52, 0x0a, 0x05, 0x44, 0x6f, 0x50, 0x75, 0x74, 0x12, 0x21, - 0x2e, 0x61, 0x72, 0x72, 0x6f, 0x77, 0x2e, 0x66, 0x6c, 0x69, 0x67, 0x68, 0x74, 0x2e, 0x70, 0x72, - 0x6f, 0x74, 0x6f, 0x63, 0x6f, 0x6c, 0x2e, 0x46, 0x6c, 0x69, 0x67, 0x68, 0x74, 0x44, 0x61, 0x74, - 0x61, 0x1a, 0x20, 0x2e, 0x61, 0x72, 0x72, 0x6f, 0x77, 0x2e, 0x66, 0x6c, 0x69, 0x67, 0x68, 0x74, - 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x63, 0x6f, 0x6c, 0x2e, 0x50, 0x75, 0x74, 0x52, 0x65, 0x73, - 0x75, 0x6c, 0x74, 0x22, 0x00, 0x28, 0x01, 0x30, 0x01, 0x12, 0x58, 0x0a, 0x0a, 0x44, 0x6f, 0x45, - 0x78, 0x63, 0x68, 0x61, 0x6e, 0x67, 0x65, 0x12, 0x21, 0x2e, 0x61, 0x72, 0x72, 0x6f, 0x77, 0x2e, - 0x66, 0x6c, 0x69, 0x67, 0x68, 0x74, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x63, 0x6f, 0x6c, 0x2e, - 0x46, 0x6c, 0x69, 0x67, 0x68, 0x74, 0x44, 0x61, 0x74, 0x61, 0x1a, 0x21, 0x2e, 0x61, 0x72, 0x72, + 0x67, 0x68, 0x74, 0x44, 0x61, 0x74, 0x61, 0x22, 0x00, 0x28, 0x01, 0x30, 0x01, 0x12, 0x4c, 0x0a, + 0x08, 0x44, 0x6f, 0x41, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x12, 0x1d, 0x2e, 0x61, 0x72, 0x72, 0x6f, + 0x77, 0x2e, 0x66, 0x6c, 0x69, 0x67, 0x68, 0x74, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x63, 0x6f, + 0x6c, 0x2e, 0x41, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x1a, 0x1d, 0x2e, 0x61, 0x72, 0x72, 0x6f, 0x77, + 0x2e, 0x66, 0x6c, 0x69, 0x67, 0x68, 0x74, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x63, 0x6f, 0x6c, + 0x2e, 0x52, 0x65, 0x73, 0x75, 0x6c, 0x74, 0x22, 0x00, 0x30, 0x01, 0x12, 0x52, 0x0a, 0x0b, 0x4c, + 0x69, 0x73, 0x74, 0x41, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x12, 0x1c, 0x2e, 0x61, 0x72, 0x72, 0x6f, 0x77, 0x2e, 0x66, 0x6c, 0x69, 0x67, 0x68, 0x74, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x63, - 0x6f, 0x6c, 0x2e, 0x46, 0x6c, 0x69, 0x67, 0x68, 0x74, 0x44, 0x61, 0x74, 0x61, 0x22, 0x00, 0x28, - 0x01, 0x30, 0x01, 0x12, 0x4c, 0x0a, 0x08, 0x44, 0x6f, 0x41, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x12, - 0x1d, 0x2e, 0x61, 0x72, 0x72, 0x6f, 0x77, 0x2e, 0x66, 0x6c, 0x69, 0x67, 0x68, 0x74, 0x2e, 0x70, - 0x72, 0x6f, 0x74, 0x6f, 0x63, 0x6f, 0x6c, 0x2e, 0x41, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x1a, 0x1d, - 0x2e, 0x61, 0x72, 0x72, 0x6f, 0x77, 0x2e, 0x66, 0x6c, 0x69, 0x67, 0x68, 0x74, 0x2e, 0x70, 0x72, - 0x6f, 0x74, 0x6f, 0x63, 0x6f, 0x6c, 0x2e, 0x52, 0x65, 0x73, 0x75, 0x6c, 0x74, 0x22, 0x00, 0x30, - 0x01, 0x12, 0x52, 0x0a, 0x0b, 0x4c, 0x69, 0x73, 0x74, 0x41, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x73, - 0x12, 0x1c, 0x2e, 0x61, 0x72, 0x72, 0x6f, 0x77, 0x2e, 0x66, 0x6c, 0x69, 0x67, 0x68, 0x74, 0x2e, - 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x63, 0x6f, 0x6c, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x1a, 0x21, - 0x2e, 0x61, 0x72, 0x72, 0x6f, 0x77, 0x2e, 0x66, 0x6c, 0x69, 0x67, 0x68, 0x74, 0x2e, 0x70, 0x72, - 0x6f, 0x74, 0x6f, 0x63, 0x6f, 0x6c, 0x2e, 0x41, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x54, 0x79, 0x70, - 0x65, 0x22, 0x00, 0x30, 0x01, 0x42, 0x76, 0x0a, 0x1c, 0x6f, 0x72, 0x67, 0x2e, 0x61, 0x70, 0x61, - 0x63, 0x68, 0x65, 0x2e, 0x61, 0x72, 0x72, 0x6f, 0x77, 0x2e, 0x66, 0x6c, 0x69, 0x67, 0x68, 0x74, - 0x2e, 0x69, 0x6d, 0x70, 0x6c, 0x5a, 0x37, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, - 0x6d, 0x2f, 0x61, 0x70, 0x61, 0x63, 0x68, 0x65, 0x2f, 0x61, 0x72, 0x72, 0x6f, 0x77, 0x2f, 0x67, - 0x6f, 0x2f, 0x61, 0x72, 0x72, 0x6f, 0x77, 0x2f, 0x66, 0x6c, 0x69, 0x67, 0x68, 0x74, 0x2f, 0x69, - 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x2f, 0x66, 0x6c, 0x69, 0x67, 0x68, 0x74, 0xaa, 0x02, - 0x1c, 0x41, 0x70, 0x61, 0x63, 0x68, 0x65, 0x2e, 0x41, 0x72, 0x72, 0x6f, 0x77, 0x2e, 0x46, 0x6c, - 0x69, 0x67, 0x68, 0x74, 0x2e, 0x50, 0x72, 0x6f, 0x74, 0x6f, 0x63, 0x6f, 0x6c, 0x62, 0x06, 0x70, - 0x72, 0x6f, 0x74, 0x6f, 0x33, + 0x6f, 0x6c, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x1a, 0x21, 0x2e, 0x61, 0x72, 0x72, 0x6f, 0x77, + 0x2e, 0x66, 0x6c, 0x69, 0x67, 0x68, 0x74, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x63, 0x6f, 0x6c, + 0x2e, 0x41, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x54, 0x79, 0x70, 0x65, 0x22, 0x00, 0x30, 0x01, 0x42, + 0x76, 0x0a, 0x1c, 0x6f, 0x72, 0x67, 0x2e, 0x61, 0x70, 0x61, 0x63, 0x68, 0x65, 0x2e, 0x61, 0x72, + 0x72, 0x6f, 0x77, 0x2e, 0x66, 0x6c, 0x69, 0x67, 0x68, 0x74, 0x2e, 0x69, 0x6d, 0x70, 0x6c, 0x5a, + 0x37, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x61, 0x70, 0x61, 0x63, + 0x68, 0x65, 0x2f, 0x61, 0x72, 0x72, 0x6f, 0x77, 0x2f, 0x67, 0x6f, 0x2f, 0x61, 0x72, 0x72, 0x6f, + 0x77, 0x2f, 0x66, 0x6c, 0x69, 0x67, 0x68, 0x74, 0x2f, 0x69, 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x61, + 0x6c, 0x2f, 0x66, 0x6c, 0x69, 0x67, 0x68, 0x74, 0xaa, 0x02, 0x1c, 0x41, 0x70, 0x61, 0x63, 0x68, + 0x65, 0x2e, 0x41, 0x72, 0x72, 0x6f, 0x77, 0x2e, 0x46, 0x6c, 0x69, 0x67, 0x68, 0x74, 0x2e, 0x50, + 0x72, 0x6f, 0x74, 0x6f, 0x63, 0x6f, 0x6c, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, } var ( diff --git a/go/arrow/flight/internal/flight/Flight_grpc.pb.go b/go/arrow/flight/internal/flight/Flight_grpc.pb.go index 96131144487..10fd285a5c1 100644 --- a/go/arrow/flight/internal/flight/Flight_grpc.pb.go +++ b/go/arrow/flight/internal/flight/Flight_grpc.pb.go @@ -1,8 +1,4 @@ // Code generated by protoc-gen-go-grpc. DO NOT EDIT. -// versions: -// - protoc-gen-go-grpc v1.2.0 -// - protoc v3.12.4 -// source: Flight.proto package flight @@ -15,20 +11,17 @@ import ( // This is a compile-time assertion to ensure that this generated file // is compatible with the grpc package it is being compiled against. -// Requires gRPC-Go v1.32.0 or later. const _ = grpc.SupportPackageIsVersion7 // FlightServiceClient is the client API for FlightService service. // // For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream. type FlightServiceClient interface { - // // Handshake between client and server. Depending on the server, the // handshake may be required to determine the token that should be used for // future operations. Both request and response are streams to allow multiple // round-trips depending on auth mechanism. Handshake(ctx context.Context, opts ...grpc.CallOption) (FlightService_HandshakeClient, error) - // // Get a list of available streams given a particular criteria. Most flight // services will expose one or more streams that are readily available for // retrieval. This api allows listing the streams available for @@ -36,7 +29,6 @@ type FlightServiceClient interface { // the subset of streams that can be listed via this interface. Each flight // service allows its own definition of how to consume criteria. ListFlights(ctx context.Context, in *Criteria, opts ...grpc.CallOption) (FlightService_ListFlightsClient, error) - // // For a given FlightDescriptor, get information about how the flight can be // consumed. This is a useful interface if the consumer of the interface // already can identify the specific flight to consume. This interface can @@ -48,19 +40,16 @@ type FlightServiceClient interface { // available for consumption for the duration defined by the specific flight // service. GetFlightInfo(ctx context.Context, in *FlightDescriptor, opts ...grpc.CallOption) (*FlightInfo, error) - // // For a given FlightDescriptor, get the Schema as described in Schema.fbs::Schema // This is used when a consumer needs the Schema of flight stream. Similar to // GetFlightInfo this interface may generate a new flight that was not previously // available in ListFlights. GetSchema(ctx context.Context, in *FlightDescriptor, opts ...grpc.CallOption) (*SchemaResult, error) - // // Retrieve a single stream associated with a particular descriptor // associated with the referenced ticket. A Flight can be composed of one or // more streams where each stream can be retrieved using a separate opaque // ticket that the flight service uses for managing a collection of streams. DoGet(ctx context.Context, in *Ticket, opts ...grpc.CallOption) (FlightService_DoGetClient, error) - // // Push a stream to the flight service associated with a particular // flight stream. This allows a client of a flight service to upload a stream // of data. Depending on the particular flight service, a client consumer @@ -68,14 +57,12 @@ type FlightServiceClient interface { // number. In the latter, the service might implement a 'seal' action that // can be applied to a descriptor once all streams are uploaded. DoPut(ctx context.Context, opts ...grpc.CallOption) (FlightService_DoPutClient, error) - // // Open a bidirectional data channel for a given descriptor. This // allows clients to send and receive arbitrary Arrow data and // application-specific metadata in a single logical stream. In // contrast to DoGet/DoPut, this is more suited for clients // offloading computation (rather than storage) to a Flight service. DoExchange(ctx context.Context, opts ...grpc.CallOption) (FlightService_DoExchangeClient, error) - // // Flight services can support an arbitrary number of simple actions in // addition to the possible ListFlights, GetFlightInfo, DoGet, DoPut // operations that are potentially available. DoAction allows a flight client @@ -83,7 +70,6 @@ type FlightServiceClient interface { // opaque request and response objects that are specific to the type action // being undertaken. DoAction(ctx context.Context, in *Action, opts ...grpc.CallOption) (FlightService_DoActionClient, error) - // // A flight service exposes all of the available action types that it has // along with descriptions. This allows different flight consumers to // understand the capabilities of the flight service. @@ -99,7 +85,7 @@ func NewFlightServiceClient(cc grpc.ClientConnInterface) FlightServiceClient { } func (c *flightServiceClient) Handshake(ctx context.Context, opts ...grpc.CallOption) (FlightService_HandshakeClient, error) { - stream, err := c.cc.NewStream(ctx, &FlightService_ServiceDesc.Streams[0], "/arrow.flight.protocol.FlightService/Handshake", opts...) + stream, err := c.cc.NewStream(ctx, &_FlightService_serviceDesc.Streams[0], "/arrow.flight.protocol.FlightService/Handshake", opts...) if err != nil { return nil, err } @@ -130,7 +116,7 @@ func (x *flightServiceHandshakeClient) Recv() (*HandshakeResponse, error) { } func (c *flightServiceClient) ListFlights(ctx context.Context, in *Criteria, opts ...grpc.CallOption) (FlightService_ListFlightsClient, error) { - stream, err := c.cc.NewStream(ctx, &FlightService_ServiceDesc.Streams[1], "/arrow.flight.protocol.FlightService/ListFlights", opts...) + stream, err := c.cc.NewStream(ctx, &_FlightService_serviceDesc.Streams[1], "/arrow.flight.protocol.FlightService/ListFlights", opts...) if err != nil { return nil, err } @@ -180,7 +166,7 @@ func (c *flightServiceClient) GetSchema(ctx context.Context, in *FlightDescripto } func (c *flightServiceClient) DoGet(ctx context.Context, in *Ticket, opts ...grpc.CallOption) (FlightService_DoGetClient, error) { - stream, err := c.cc.NewStream(ctx, &FlightService_ServiceDesc.Streams[2], "/arrow.flight.protocol.FlightService/DoGet", opts...) + stream, err := c.cc.NewStream(ctx, &_FlightService_serviceDesc.Streams[2], "/arrow.flight.protocol.FlightService/DoGet", opts...) if err != nil { return nil, err } @@ -212,7 +198,7 @@ func (x *flightServiceDoGetClient) Recv() (*FlightData, error) { } func (c *flightServiceClient) DoPut(ctx context.Context, opts ...grpc.CallOption) (FlightService_DoPutClient, error) { - stream, err := c.cc.NewStream(ctx, &FlightService_ServiceDesc.Streams[3], "/arrow.flight.protocol.FlightService/DoPut", opts...) + stream, err := c.cc.NewStream(ctx, &_FlightService_serviceDesc.Streams[3], "/arrow.flight.protocol.FlightService/DoPut", opts...) if err != nil { return nil, err } @@ -243,7 +229,7 @@ func (x *flightServiceDoPutClient) Recv() (*PutResult, error) { } func (c *flightServiceClient) DoExchange(ctx context.Context, opts ...grpc.CallOption) (FlightService_DoExchangeClient, error) { - stream, err := c.cc.NewStream(ctx, &FlightService_ServiceDesc.Streams[4], "/arrow.flight.protocol.FlightService/DoExchange", opts...) + stream, err := c.cc.NewStream(ctx, &_FlightService_serviceDesc.Streams[4], "/arrow.flight.protocol.FlightService/DoExchange", opts...) if err != nil { return nil, err } @@ -274,7 +260,7 @@ func (x *flightServiceDoExchangeClient) Recv() (*FlightData, error) { } func (c *flightServiceClient) DoAction(ctx context.Context, in *Action, opts ...grpc.CallOption) (FlightService_DoActionClient, error) { - stream, err := c.cc.NewStream(ctx, &FlightService_ServiceDesc.Streams[5], "/arrow.flight.protocol.FlightService/DoAction", opts...) + stream, err := c.cc.NewStream(ctx, &_FlightService_serviceDesc.Streams[5], "/arrow.flight.protocol.FlightService/DoAction", opts...) if err != nil { return nil, err } @@ -306,7 +292,7 @@ func (x *flightServiceDoActionClient) Recv() (*Result, error) { } func (c *flightServiceClient) ListActions(ctx context.Context, in *Empty, opts ...grpc.CallOption) (FlightService_ListActionsClient, error) { - stream, err := c.cc.NewStream(ctx, &FlightService_ServiceDesc.Streams[6], "/arrow.flight.protocol.FlightService/ListActions", opts...) + stream, err := c.cc.NewStream(ctx, &_FlightService_serviceDesc.Streams[6], "/arrow.flight.protocol.FlightService/ListActions", opts...) if err != nil { return nil, err } @@ -341,13 +327,11 @@ func (x *flightServiceListActionsClient) Recv() (*ActionType, error) { // All implementations must embed UnimplementedFlightServiceServer // for forward compatibility type FlightServiceServer interface { - // // Handshake between client and server. Depending on the server, the // handshake may be required to determine the token that should be used for // future operations. Both request and response are streams to allow multiple // round-trips depending on auth mechanism. Handshake(FlightService_HandshakeServer) error - // // Get a list of available streams given a particular criteria. Most flight // services will expose one or more streams that are readily available for // retrieval. This api allows listing the streams available for @@ -355,7 +339,6 @@ type FlightServiceServer interface { // the subset of streams that can be listed via this interface. Each flight // service allows its own definition of how to consume criteria. ListFlights(*Criteria, FlightService_ListFlightsServer) error - // // For a given FlightDescriptor, get information about how the flight can be // consumed. This is a useful interface if the consumer of the interface // already can identify the specific flight to consume. This interface can @@ -367,19 +350,16 @@ type FlightServiceServer interface { // available for consumption for the duration defined by the specific flight // service. GetFlightInfo(context.Context, *FlightDescriptor) (*FlightInfo, error) - // // For a given FlightDescriptor, get the Schema as described in Schema.fbs::Schema // This is used when a consumer needs the Schema of flight stream. Similar to // GetFlightInfo this interface may generate a new flight that was not previously // available in ListFlights. GetSchema(context.Context, *FlightDescriptor) (*SchemaResult, error) - // // Retrieve a single stream associated with a particular descriptor // associated with the referenced ticket. A Flight can be composed of one or // more streams where each stream can be retrieved using a separate opaque // ticket that the flight service uses for managing a collection of streams. DoGet(*Ticket, FlightService_DoGetServer) error - // // Push a stream to the flight service associated with a particular // flight stream. This allows a client of a flight service to upload a stream // of data. Depending on the particular flight service, a client consumer @@ -387,14 +367,12 @@ type FlightServiceServer interface { // number. In the latter, the service might implement a 'seal' action that // can be applied to a descriptor once all streams are uploaded. DoPut(FlightService_DoPutServer) error - // // Open a bidirectional data channel for a given descriptor. This // allows clients to send and receive arbitrary Arrow data and // application-specific metadata in a single logical stream. In // contrast to DoGet/DoPut, this is more suited for clients // offloading computation (rather than storage) to a Flight service. DoExchange(FlightService_DoExchangeServer) error - // // Flight services can support an arbitrary number of simple actions in // addition to the possible ListFlights, GetFlightInfo, DoGet, DoPut // operations that are potentially available. DoAction allows a flight client @@ -402,7 +380,6 @@ type FlightServiceServer interface { // opaque request and response objects that are specific to the type action // being undertaken. DoAction(*Action, FlightService_DoActionServer) error - // // A flight service exposes all of the available action types that it has // along with descriptions. This allows different flight consumers to // understand the capabilities of the flight service. @@ -450,8 +427,8 @@ type UnsafeFlightServiceServer interface { mustEmbedUnimplementedFlightServiceServer() } -func RegisterFlightServiceServer(s grpc.ServiceRegistrar, srv FlightServiceServer) { - s.RegisterService(&FlightService_ServiceDesc, srv) +func RegisterFlightServiceServer(s *grpc.Server, srv FlightServiceServer) { + s.RegisterService(&_FlightService_serviceDesc, srv) } func _FlightService_Handshake_Handler(srv interface{}, stream grpc.ServerStream) error { @@ -652,10 +629,7 @@ func (x *flightServiceListActionsServer) Send(m *ActionType) error { return x.ServerStream.SendMsg(m) } -// FlightService_ServiceDesc is the grpc.ServiceDesc for FlightService service. -// It's only intended for direct use with grpc.RegisterService, -// and not to be introspected or modified (even as a copy) -var FlightService_ServiceDesc = grpc.ServiceDesc{ +var _FlightService_serviceDesc = grpc.ServiceDesc{ ServiceName: "arrow.flight.protocol.FlightService", HandlerType: (*FlightServiceServer)(nil), Methods: []grpc.MethodDesc{ diff --git a/go/arrow/flight/server.go b/go/arrow/flight/server.go index 860dc64d8e1..b1f7f18db25 100644 --- a/go/arrow/flight/server.go +++ b/go/arrow/flight/server.go @@ -52,14 +52,9 @@ type ( Empty = flight.Empty ) -// FlightService_ServiceDesc is the grpc.ServiceDesc for the FlightService -// server. It should only be used for direct call of grpc.RegisterService, -// and not introspected or modified (even as a copy). -var FlightService_ServiceDesc = flight.FlightService_ServiceDesc - // RegisterFlightServiceServer registers an existing flight server onto an // existing grpc server, or anything that is a grpc service registrar. -func RegisterFlightServiceServer(s grpc.ServiceRegistrar, srv FlightServer) { +func RegisterFlightServiceServer(s *grpc.Server, srv FlightServer) { flight.RegisterFlightServiceServer(s, srv) } diff --git a/go/arrow/internal/flight_integration/scenario.go b/go/arrow/internal/flight_integration/scenario.go index b8b214ac7d5..c52e264806d 100644 --- a/go/arrow/internal/flight_integration/scenario.go +++ b/go/arrow/internal/flight_integration/scenario.go @@ -55,6 +55,8 @@ func GetScenario(name string, args ...string) Scenario { return &authBasicProtoTester{} case "middleware": return &middlewareScenarioTester{} + case "ordered": + return &orderedScenarioTester{} case "flight_sql": return &flightSqlScenarioTester{} case "flight_sql:extension": @@ -526,6 +528,168 @@ func (m *middlewareScenarioTester) GetFlightInfo(ctx context.Context, desc *flig }, nil } +type orderedScenarioTester struct { + flight.BaseFlightServer +} + +func (m *orderedScenarioTester) RunClient(addr string, opts ...grpc.DialOption) error { + client, err := flight.NewClientWithMiddleware(addr, nil, nil, opts...) + if err != nil { + return err + } + defer client.Close() + + ctx := context.Background() + info, err := client.GetFlightInfo(ctx, &flight.FlightDescriptor{Type: flight.DescriptorCMD, Cmd: []byte("ordered")}) + if err != nil { + return err + } + + if !info.GetOrdered() { + return fmt.Errorf("expected to server return FlightInfo.ordered = true") + } + + recs := make([]arrow.Record, len(info.Endpoint)) + for i, ep := range info.Endpoint { + if len(ep.Location) != 0 { + return fmt.Errorf("expected to receive empty locations to use the original service: %s", + ep.Location) + } + + stream, err := client.DoGet(ctx, ep.Ticket) + if err != nil { + return err + } + + rdr, err := flight.NewRecordReader(stream) + if err != nil { + return err + } + defer rdr.Release() + + for rdr.Next() { + record := rdr.Record() + record.Retain() + defer record.Release() + recs[i] = record + } + if rdr.Err() != nil { + return rdr.Err() + } + } + + // Build expected records + mem := memory.DefaultAllocator + schema := arrow.NewSchema( + []arrow.Field{ + {Name: "number", Type: arrow.PrimitiveTypes.Int32}, + }, + nil, + ) + expected_table, _ := array.TableFromJSON(mem, schema, []string{ + `[ + {"number": 1}, + {"number": 2}, + {"number": 3} + ]`, + `[ + {"number": 10}, + {"number": 20}, + {"number": 30} + ]`, + `[ + {"number": 100}, + {"number": 200}, + {"number": 300} + ]`, + }) + defer expected_table.Release() + + table := array.NewTableFromRecords(schema, recs) + defer table.Release() + if !array.TableEqual(table, expected_table) { + return fmt.Errorf("read data isn't expected\n" + + "Expected:\n" + + "%s\n" + + "num-rows: %d\n" + + "num-cols: %d\n" + + "Actual:\n" + + "%s\n" + + "num-rows: %d\n" + + "num-cols: %d", + expected_table.Schema(), + expected_table.NumRows(), + expected_table.NumCols(), + table.Schema(), + table.NumRows(), + table.NumCols()) + } + + return nil +} + +func (m *orderedScenarioTester) MakeServer(port int) flight.Server { + srv := flight.NewServerWithMiddleware(nil) + srv.RegisterFlightService(m) + initServer(port, srv) + return srv +} + +func (m *orderedScenarioTester) GetFlightInfo(ctx context.Context, desc *flight.FlightDescriptor) (*flight.FlightInfo, error) { + ordered := desc.Type == flight.DescriptorCMD && string(desc.Cmd) == "ordered" + schema := arrow.NewSchema( + []arrow.Field{ + {Name: "number", Type: arrow.PrimitiveTypes.Int32}, + }, + nil, + ) + return &flight.FlightInfo{ + Schema: flight.SerializeSchema(schema, memory.DefaultAllocator), + FlightDescriptor: desc, + Endpoint: []*flight.FlightEndpoint{ + { + Ticket: &flight.Ticket{Ticket: []byte("1")}, + Location: []*flight.Location{}, + }, + { + Ticket: &flight.Ticket{Ticket: []byte("2")}, + Location: []*flight.Location{}, + }, + { + Ticket: &flight.Ticket{Ticket: []byte("3")}, + Location: []*flight.Location{}, + }, + }, + TotalRecords: -1, + TotalBytes: -1, + Ordered: ordered, + }, nil +} + +func (m *orderedScenarioTester) DoGet(tkt *flight.Ticket, fs flight.FlightService_DoGetServer) error { + schema := arrow.NewSchema( + []arrow.Field{ + {Name: "number", Type: arrow.PrimitiveTypes.Int32}, + }, + nil, + ) + b := array.NewRecordBuilder(memory.DefaultAllocator, schema) + defer b.Release() + if string(tkt.GetTicket()) == "1" { + b.Field(0).(*array.Int32Builder).AppendValues([]int32{1, 2, 3}, nil) + } else if string(tkt.GetTicket()) == "2" { + b.Field(0).(*array.Int32Builder).AppendValues([]int32{10, 20, 30}, nil) + } else if string(tkt.GetTicket()) == "3" { + b.Field(0).(*array.Int32Builder).AppendValues([]int32{100, 200, 300}, nil) + } + w := flight.NewRecordWriter(fs, ipc.WithSchema(schema)) + rec := b.NewRecord() + defer rec.Release() + w.Write(rec) + + return nil +} + const ( updateStatementExpectedRows int64 = 10000 updateStatementWithTransactionExpectedRows int64 = 15000 diff --git a/java/flight/flight-core/src/main/java/org/apache/arrow/flight/FlightInfo.java b/java/flight/flight-core/src/main/java/org/apache/arrow/flight/FlightInfo.java index e57b311c2e5..888c7293ea2 100644 --- a/java/flight/flight-core/src/main/java/org/apache/arrow/flight/FlightInfo.java +++ b/java/flight/flight-core/src/main/java/org/apache/arrow/flight/FlightInfo.java @@ -48,6 +48,7 @@ public class FlightInfo { private final List endpoints; private final long bytes; private final long records; + private final boolean ordered; private final IpcOption option; /** @@ -61,7 +62,7 @@ public class FlightInfo { */ public FlightInfo(Schema schema, FlightDescriptor descriptor, List endpoints, long bytes, long records) { - this(schema, descriptor, endpoints, bytes, records, IpcOption.DEFAULT); + this(schema, descriptor, endpoints, bytes, records, /*ordered*/ false, IpcOption.DEFAULT); } /** @@ -76,6 +77,22 @@ public FlightInfo(Schema schema, FlightDescriptor descriptor, List endpoints, long bytes, long records, IpcOption option) { + this(schema, descriptor, endpoints, bytes, records, /*ordered*/ false, option); + } + + /** + * Constructs a new instance. + * + * @param schema The schema of the Flight + * @param descriptor An identifier for the Flight. + * @param endpoints A list of endpoints that have the flight available. + * @param bytes The number of bytes in the flight + * @param records The number of records in the flight. + * @param ordered Whether the endpoints in this flight are ordered. + * @param option IPC write options. + */ + public FlightInfo(Schema schema, FlightDescriptor descriptor, List endpoints, long bytes, + long records, boolean ordered, IpcOption option) { Objects.requireNonNull(schema); Objects.requireNonNull(descriptor); Objects.requireNonNull(endpoints); @@ -85,6 +102,7 @@ public FlightInfo(Schema schema, FlightDescriptor descriptor, List getEndpoints() { return endpoints; } + public boolean getOrdered() { + return ordered; + } + /** * Converts to the protocol buffer representation. */ @@ -148,6 +171,7 @@ Flight.FlightInfo toProtocol() { .setFlightDescriptor(descriptor.toProtocol()) .setTotalBytes(FlightInfo.this.bytes) .setTotalRecords(records) + .setOrdered(ordered) .build(); } @@ -187,12 +211,13 @@ public boolean equals(Object o) { records == that.records && schema.equals(that.schema) && descriptor.equals(that.descriptor) && - endpoints.equals(that.endpoints); + endpoints.equals(that.endpoints) && + ordered == that.ordered; } @Override public int hashCode() { - return Objects.hash(schema, descriptor, endpoints, bytes, records); + return Objects.hash(schema, descriptor, endpoints, bytes, records, ordered); } @Override @@ -203,6 +228,7 @@ public String toString() { ", endpoints=" + endpoints + ", bytes=" + bytes + ", records=" + records + + ", ordered=" + ordered + '}'; } } diff --git a/java/flight/flight-core/src/test/java/org/apache/arrow/flight/TestBasicOperation.java b/java/flight/flight-core/src/test/java/org/apache/arrow/flight/TestBasicOperation.java index f9caeca22e3..40337b2de5a 100644 --- a/java/flight/flight-core/src/test/java/org/apache/arrow/flight/TestBasicOperation.java +++ b/java/flight/flight-core/src/test/java/org/apache/arrow/flight/TestBasicOperation.java @@ -119,10 +119,25 @@ public void roundTripInfo() throws Exception { new Ticket(new byte[10]), Location.forGrpcDomainSocket("/tmp/test.sock"), forGrpcInsecure("localhost", 50051)) ), 200, 500); + final FlightInfo info4 = new FlightInfo(schema, FlightDescriptor.path("a", "b"), + Arrays.asList(new FlightEndpoint( + new Ticket(new byte[10]), Location.forGrpcDomainSocket("/tmp/test.sock")), + new FlightEndpoint( + new Ticket(new byte[10]), Location.forGrpcDomainSocket("/tmp/test.sock"), + forGrpcInsecure("localhost", 50051)) + ), 200, 500, /*ordered*/ true, IpcOption.DEFAULT); Assertions.assertEquals(info1, FlightInfo.deserialize(info1.serialize())); Assertions.assertEquals(info2, FlightInfo.deserialize(info2.serialize())); Assertions.assertEquals(info3, FlightInfo.deserialize(info3.serialize())); + Assertions.assertEquals(info4, FlightInfo.deserialize(info4.serialize())); + + Assertions.assertNotEquals(info3, info4); + + Assertions.assertFalse(info1.getOrdered()); + Assertions.assertFalse(info2.getOrdered()); + Assertions.assertFalse(info3.getOrdered()); + Assertions.assertTrue(info4.getOrdered()); } @Test diff --git a/java/flight/flight-integration-tests/src/main/java/org/apache/arrow/flight/integration/tests/OrderedScenario.java b/java/flight/flight-integration-tests/src/main/java/org/apache/arrow/flight/integration/tests/OrderedScenario.java new file mode 100644 index 00000000000..b8aa46fb567 --- /dev/null +++ b/java/flight/flight-integration-tests/src/main/java/org/apache/arrow/flight/integration/tests/OrderedScenario.java @@ -0,0 +1,162 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.flight.integration.tests; + +import java.nio.charset.StandardCharsets; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Objects; + +import org.apache.arrow.flight.CallStatus; +import org.apache.arrow.flight.FlightClient; +import org.apache.arrow.flight.FlightDescriptor; +import org.apache.arrow.flight.FlightEndpoint; +import org.apache.arrow.flight.FlightInfo; +import org.apache.arrow.flight.FlightProducer; +import org.apache.arrow.flight.FlightServer; +import org.apache.arrow.flight.FlightStream; +import org.apache.arrow.flight.Location; +import org.apache.arrow.flight.NoOpFlightProducer; +import org.apache.arrow.flight.Ticket; +import org.apache.arrow.memory.BufferAllocator; +import org.apache.arrow.vector.IntVector; +import org.apache.arrow.vector.VectorSchemaRoot; +import org.apache.arrow.vector.ipc.message.IpcOption; +import org.apache.arrow.vector.types.Types; +import org.apache.arrow.vector.types.pojo.Field; +import org.apache.arrow.vector.types.pojo.Schema; + +/** Test the 'ordered' flag in FlightInfo. */ +public class OrderedScenario implements Scenario { + private static final Schema SCHEMA = + new Schema( + Collections.singletonList(Field.notNullable("number", Types.MinorType.INT.getType()))); + private static final byte[] ORDERED_COMMAND = "ordered".getBytes(StandardCharsets.UTF_8); + + @Override + public FlightProducer producer(BufferAllocator allocator, Location location) throws Exception { + return new OrderedProducer(allocator); + } + + @Override + public void buildServer(FlightServer.Builder builder) throws Exception {} + + @Override + public void client(BufferAllocator allocator, Location location, FlightClient client) + throws Exception { + final FlightInfo info = client.getInfo(FlightDescriptor.command(ORDERED_COMMAND)); + IntegrationAssertions.assertTrue("ordered must be true", info.getOrdered()); + IntegrationAssertions.assertEquals(3, info.getEndpoints().size()); + + int offset = 0; + for (int multiplier : Arrays.asList(1, 10, 100)) { + FlightEndpoint endpoint = info.getEndpoints().get(offset); + + IntegrationAssertions.assertTrue( + "locations must be empty", endpoint.getLocations().isEmpty()); + + try (final FlightStream stream = client.getStream(endpoint.getTicket())) { + IntegrationAssertions.assertEquals(SCHEMA, stream.getSchema()); + IntegrationAssertions.assertTrue("stream must have a batch", stream.next()); + + IntVector number = (IntVector) stream.getRoot().getVector(0); + IntegrationAssertions.assertEquals(3, stream.getRoot().getRowCount()); + + IntegrationAssertions.assertFalse("value must be non-null", number.isNull(0)); + IntegrationAssertions.assertFalse("value must be non-null", number.isNull(1)); + IntegrationAssertions.assertFalse("value must be non-null", number.isNull(2)); + IntegrationAssertions.assertEquals(multiplier, number.get(0)); + IntegrationAssertions.assertEquals(2 * multiplier, number.get(1)); + IntegrationAssertions.assertEquals(3 * multiplier, number.get(2)); + + IntegrationAssertions.assertFalse("stream must have one batch", stream.next()); + } + + offset++; + } + } + + private static class OrderedProducer extends NoOpFlightProducer { + private static final byte[] TICKET_1 = "1".getBytes(StandardCharsets.UTF_8); + private static final byte[] TICKET_2 = "2".getBytes(StandardCharsets.UTF_8); + private static final byte[] TICKET_3 = "3".getBytes(StandardCharsets.UTF_8); + + private final BufferAllocator allocator; + + OrderedProducer(BufferAllocator allocator) { + this.allocator = Objects.requireNonNull(allocator); + } + + @Override + public void getStream(CallContext context, Ticket ticket, ServerStreamListener listener) { + try (final VectorSchemaRoot root = VectorSchemaRoot.create(SCHEMA, allocator)) { + IntVector number = (IntVector) root.getVector(0); + + if (Arrays.equals(ticket.getBytes(), TICKET_1)) { + number.setSafe(0, 1); + number.setSafe(1, 2); + number.setSafe(2, 3); + } else if (Arrays.equals(ticket.getBytes(), TICKET_2)) { + number.setSafe(0, 10); + number.setSafe(1, 20); + number.setSafe(2, 30); + } else if (Arrays.equals(ticket.getBytes(), TICKET_3)) { + number.setSafe(0, 100); + number.setSafe(1, 200); + number.setSafe(2, 300); + } else { + listener.error( + CallStatus.INVALID_ARGUMENT + .withDescription( + "Could not find flight: " + new String(ticket.getBytes(), StandardCharsets.UTF_8)) + .toRuntimeException()); + return; + } + + root.setRowCount(3); + + listener.start(root); + listener.putNext(); + listener.completed(); + } + } + + @Override + public FlightInfo getFlightInfo(CallContext context, FlightDescriptor descriptor) { + final boolean ordered = + descriptor.isCommand() && Arrays.equals(descriptor.getCommand(), ORDERED_COMMAND); + List endpoints; + if (ordered) { + endpoints = + Arrays.asList( + new FlightEndpoint(new Ticket(TICKET_1)), + new FlightEndpoint(new Ticket(TICKET_2)), + new FlightEndpoint(new Ticket(TICKET_3))); + } else { + endpoints = + Arrays.asList( + new FlightEndpoint(new Ticket(TICKET_1)), + new FlightEndpoint(new Ticket(TICKET_3)), + new FlightEndpoint(new Ticket(TICKET_2))); + } + return new FlightInfo( + SCHEMA, descriptor, endpoints, /*bytes*/ -1, /*records*/ -1, ordered, IpcOption.DEFAULT); + } + } +} diff --git a/java/flight/flight-integration-tests/src/main/java/org/apache/arrow/flight/integration/tests/Scenarios.java b/java/flight/flight-integration-tests/src/main/java/org/apache/arrow/flight/integration/tests/Scenarios.java index 77f7ab0006d..c2e10fcf47e 100644 --- a/java/flight/flight-integration-tests/src/main/java/org/apache/arrow/flight/integration/tests/Scenarios.java +++ b/java/flight/flight-integration-tests/src/main/java/org/apache/arrow/flight/integration/tests/Scenarios.java @@ -41,6 +41,7 @@ private Scenarios() { scenarios = new TreeMap<>(); scenarios.put("auth:basic_proto", AuthBasicProtoScenario::new); scenarios.put("middleware", MiddlewareScenario::new); + scenarios.put("ordered", OrderedScenario::new); scenarios.put("flight_sql", FlightSqlScenario::new); scenarios.put("flight_sql:extension", FlightSqlExtensionScenario::new); } diff --git a/java/flight/flight-integration-tests/src/test/java/org/apache/arrow/flight/integration/tests/IntegrationTest.java b/java/flight/flight-integration-tests/src/test/java/org/apache/arrow/flight/integration/tests/IntegrationTest.java index 0751e1d7a89..4507dfb1292 100644 --- a/java/flight/flight-integration-tests/src/test/java/org/apache/arrow/flight/integration/tests/IntegrationTest.java +++ b/java/flight/flight-integration-tests/src/test/java/org/apache/arrow/flight/integration/tests/IntegrationTest.java @@ -38,6 +38,11 @@ void middleware() throws Exception { testScenario("middleware"); } + @Test + void ordered() throws Exception { + testScenario("ordered"); + } + @Test void flightSql() throws Exception { testScenario("flight_sql");