Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 11 additions & 10 deletions cpp/src/arrow/flight/flight_internals_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -213,26 +213,27 @@ TEST(FlightTypes, FlightInfo) {
auto endpoint1 = FlightEndpoint{Ticket{"foo"}, {}};
auto endpoint2 = FlightEndpoint{Ticket{"foo"}, {location}};
std::vector<FlightInfo> values = {
MakeFlightInfo(schema1, desc1, {}, -1, -1),
MakeFlightInfo(schema1, desc2, {}, -1, -1),
MakeFlightInfo(schema2, desc1, {}, -1, -1),
MakeFlightInfo(schema1, desc1, {endpoint1}, -1, 42),
MakeFlightInfo(schema1, desc2, {endpoint1, endpoint2}, 64, -1),
MakeFlightInfo(schema1, desc1, {}, -1, -1, false),
MakeFlightInfo(schema1, desc2, {}, -1, -1, true),
MakeFlightInfo(schema2, desc1, {}, -1, -1, false),
MakeFlightInfo(schema1, desc1, {endpoint1}, -1, 42, true),
MakeFlightInfo(schema1, desc2, {endpoint1, endpoint2}, 64, -1, false),
};
std::vector<std::string> reprs = {
"<FlightInfo schema=(serialized) descriptor=<FlightDescriptor cmd='foo'> "
"endpoints=[] total_records=-1 total_bytes=-1>",
"endpoints=[] total_records=-1 total_bytes=-1 ordered=false>",
"<FlightInfo schema=(serialized) descriptor=<FlightDescriptor cmd='bar'> "
"endpoints=[] total_records=-1 total_bytes=-1>",
"endpoints=[] total_records=-1 total_bytes=-1 ordered=true>",
"<FlightInfo schema=(serialized) descriptor=<FlightDescriptor cmd='foo'> "
"endpoints=[] total_records=-1 total_bytes=-1>",
"endpoints=[] total_records=-1 total_bytes=-1 ordered=false>",
"<FlightInfo schema=(serialized) descriptor=<FlightDescriptor cmd='foo'> "
"endpoints=[<FlightEndpoint ticket=<Ticket ticket='foo'> locations=[]>] "
"total_records=-1 total_bytes=42>",
"total_records=-1 total_bytes=42 ordered=true>",
"<FlightInfo schema=(serialized) descriptor=<FlightDescriptor cmd='bar'> "
"endpoints=[<FlightEndpoint ticket=<Ticket ticket='foo'> locations=[]>, "
"<FlightEndpoint ticket=<Ticket ticket='foo'> locations="
"[grpc+tcp://localhost:1234]>] total_records=64 total_bytes=-1>",
"[grpc+tcp://localhost:1234]>] total_records=64 total_bytes=-1 "
"ordered=false>",
};

ASSERT_NO_FATAL_FAILURE(TestRoundtrip<pb::FlightInfo>(values, reprs));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ TEST(FlightIntegration, AuthBasicProto) { ASSERT_OK(RunScenario("auth:basic_prot

TEST(FlightIntegration, Middleware) { ASSERT_OK(RunScenario("middleware")); }

TEST(FlightIntegration, Ordered) { ASSERT_OK(RunScenario("ordered")); }

TEST(FlightIntegration, FlightSql) { ASSERT_OK(RunScenario("flight_sql")); }

TEST(FlightIntegration, FlightSqlExtension) {
Expand Down
156 changes: 149 additions & 7 deletions cpp/src/arrow/flight/integration_tests/test_integration.cc
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
#include "arrow/array/array_binary.h"
#include "arrow/array/array_nested.h"
#include "arrow/array/array_primitive.h"
#include "arrow/array/builder_primitive.h"
#include "arrow/flight/client_middleware.h"
#include "arrow/flight/server_middleware.h"
#include "arrow/flight/sql/client.h"
Expand All @@ -37,6 +38,8 @@
#include "arrow/flight/types.h"
#include "arrow/ipc/dictionary.h"
#include "arrow/status.h"
#include "arrow/table.h"
#include "arrow/table_builder.h"
#include "arrow/testing/gtest_util.h"
#include "arrow/util/checked_cast.h"

Expand Down Expand Up @@ -210,8 +213,8 @@ class MiddlewareServer : public FlightServerBase {
// Return a fake location - the test doesn't read it
ARROW_ASSIGN_OR_RAISE(auto location, Location::ForGrpcTcp("localhost", 10010));
std::vector<FlightEndpoint> endpoints{FlightEndpoint{{"foo"}, {location}}};
ARROW_ASSIGN_OR_RAISE(auto info,
FlightInfo::Make(*schema, descriptor, endpoints, -1, -1));
ARROW_ASSIGN_OR_RAISE(
auto info, FlightInfo::Make(*schema, descriptor, endpoints, -1, -1, false));
*result = std::make_unique<FlightInfo>(info);
return Status::OK();
}
Expand Down Expand Up @@ -271,6 +274,142 @@ class MiddlewareScenario : public Scenario {
std::shared_ptr<TestClientMiddlewareFactory> client_middleware_;
};

/// \brief The server used for testing FlightInfo.ordered.
///
/// If the given command is "ordered", the server sets
/// FlightInfo.ordered. The client that supports FlightInfo.ordered
/// must read data from endpoints from front to back. The client that
/// doesn't support FlightInfo.ordered may read data from endpoints in
/// random order.
///
/// This scenario is passed only when the client supports
/// FlightInfo.ordered.
class OrderedServer : public FlightServerBase {
Status GetFlightInfo(const ServerCallContext& context,
const FlightDescriptor& descriptor,
std::unique_ptr<FlightInfo>* result) override {
const auto ordered = (descriptor.type == FlightDescriptor::DescriptorType::CMD &&
descriptor.cmd == "ordered");
auto schema = BuildSchema();
std::vector<FlightEndpoint> endpoints;
if (ordered) {
endpoints.push_back(FlightEndpoint{{"1"}, {}});
endpoints.push_back(FlightEndpoint{{"2"}, {}});
endpoints.push_back(FlightEndpoint{{"3"}, {}});
} else {
endpoints.push_back(FlightEndpoint{{"1"}, {}});
endpoints.push_back(FlightEndpoint{{"3"}, {}});
endpoints.push_back(FlightEndpoint{{"2"}, {}});
}
ARROW_ASSIGN_OR_RAISE(
auto info, FlightInfo::Make(*schema, descriptor, endpoints, -1, -1, ordered));
*result = std::make_unique<FlightInfo>(info);
return Status::OK();
}

Status DoGet(const ServerCallContext& context, const Ticket& request,
std::unique_ptr<FlightDataStream>* stream) override {
ARROW_ASSIGN_OR_RAISE(auto builder, RecordBatchBuilder::Make(
BuildSchema(), arrow::default_memory_pool()));
auto number_builder = builder->GetFieldAs<Int32Builder>(0);
if (request.ticket == "1") {
ARROW_RETURN_NOT_OK(number_builder->Append(1));
ARROW_RETURN_NOT_OK(number_builder->Append(2));
ARROW_RETURN_NOT_OK(number_builder->Append(3));
} else if (request.ticket == "2") {
ARROW_RETURN_NOT_OK(number_builder->Append(10));
ARROW_RETURN_NOT_OK(number_builder->Append(20));
ARROW_RETURN_NOT_OK(number_builder->Append(30));
} else if (request.ticket == "3") {
ARROW_RETURN_NOT_OK(number_builder->Append(100));
ARROW_RETURN_NOT_OK(number_builder->Append(200));
ARROW_RETURN_NOT_OK(number_builder->Append(300));
} else {
return Status::KeyError("Could not find flight: ", request.ticket);
}
ARROW_ASSIGN_OR_RAISE(auto record_batch, builder->Flush());
std::vector<std::shared_ptr<RecordBatch>> record_batches{record_batch};
ARROW_ASSIGN_OR_RAISE(auto record_batch_reader,
RecordBatchReader::Make(record_batches));
*stream = std::make_unique<RecordBatchStream>(record_batch_reader);
return Status::OK();
}

private:
std::shared_ptr<Schema> BuildSchema() {
return arrow::schema({arrow::field("number", arrow::int32(), false)});
}
};

/// \brief The ordered scenario.
///
/// This tests that the server and client get expected header values.
class OrderedScenario : public Scenario {
Status MakeServer(std::unique_ptr<FlightServerBase>* server,
FlightServerOptions* options) override {
server->reset(new OrderedServer());
return Status::OK();
}

Status MakeClient(FlightClientOptions* options) override { return Status::OK(); }

Status RunClient(std::unique_ptr<FlightClient> client) override {
ARROW_ASSIGN_OR_RAISE(auto info,
client->GetFlightInfo(FlightDescriptor::Command("ordered")));
if (!info->ordered()) {
return Status::Invalid("Server must return FlightInfo.ordered = true");
}
std::vector<std::shared_ptr<arrow::Table>> tables;
for (const auto& endpoint : info->endpoints()) {
if (!endpoint.locations.empty()) {
std::stringstream ss;
ss << "[";
for (const auto& location : endpoint.locations) {
if (ss.str().size() != 1) {
ss << ", ";
}
ss << location.ToString();
}
ss << "]";
return Status::Invalid(
"Expected to receive empty locations to use the original service: ",
ss.str());
}
ARROW_ASSIGN_OR_RAISE(auto reader, client->DoGet(endpoint.ticket));
ARROW_ASSIGN_OR_RAISE(auto table, reader->ToTable());
tables.push_back(table);
}
ARROW_ASSIGN_OR_RAISE(auto table, ConcatenateTables(tables));

// Build expected table
auto schema = arrow::schema({arrow::field("number", arrow::int32(), false)});
ARROW_ASSIGN_OR_RAISE(auto builder,
RecordBatchBuilder::Make(schema, arrow::default_memory_pool()));
auto number_builder = builder->GetFieldAs<Int32Builder>(0);
ARROW_RETURN_NOT_OK(number_builder->Append(1));
ARROW_RETURN_NOT_OK(number_builder->Append(2));
ARROW_RETURN_NOT_OK(number_builder->Append(3));
ARROW_RETURN_NOT_OK(number_builder->Append(10));
ARROW_RETURN_NOT_OK(number_builder->Append(20));
ARROW_RETURN_NOT_OK(number_builder->Append(30));
ARROW_RETURN_NOT_OK(number_builder->Append(100));
ARROW_RETURN_NOT_OK(number_builder->Append(200));
ARROW_RETURN_NOT_OK(number_builder->Append(300));
ARROW_ASSIGN_OR_RAISE(auto expected_record_batch, builder->Flush());
std::vector<std::shared_ptr<RecordBatch>> expected_record_batches{
expected_record_batch};
ARROW_ASSIGN_OR_RAISE(auto expected_table,
Table::FromRecordBatches(expected_record_batches));

// Check read data
if (!table->Equals(*expected_table)) {
return Status::Invalid("Read data isn't expected\n", "Expected:\n",
expected_table->ToString(), "Actual:\n", table->ToString());
}
return Status::OK();
}
};

/// \brief Schema to be returned for mocking the statement/prepared statement results.
///
/// Must be the same across all languages.
Expand Down Expand Up @@ -382,8 +521,8 @@ class FlightSqlScenarioServer : public sql::FlightSqlServerBase {
}
ARROW_ASSIGN_OR_RAISE(auto handle, sql::CreateStatementQueryTicket(ticket));
std::vector<FlightEndpoint> endpoints{FlightEndpoint{{handle}, {}}};
ARROW_ASSIGN_OR_RAISE(auto result,
FlightInfo::Make(*schema, descriptor, endpoints, -1, -1));
ARROW_ASSIGN_OR_RAISE(
auto result, FlightInfo::Make(*schema, descriptor, endpoints, -1, -1, false));
return std::make_unique<FlightInfo>(result);
}

Expand All @@ -407,8 +546,8 @@ class FlightSqlScenarioServer : public sql::FlightSqlServerBase {
}
ARROW_ASSIGN_OR_RAISE(auto handle, sql::CreateStatementQueryTicket(ticket));
std::vector<FlightEndpoint> endpoints{FlightEndpoint{{handle}, {}}};
ARROW_ASSIGN_OR_RAISE(auto result,
FlightInfo::Make(*schema, descriptor, endpoints, -1, -1));
ARROW_ASSIGN_OR_RAISE(
auto result, FlightInfo::Make(*schema, descriptor, endpoints, -1, -1, false));
return std::make_unique<FlightInfo>(result);
}

Expand Down Expand Up @@ -851,7 +990,7 @@ class FlightSqlScenarioServer : public sql::FlightSqlServerBase {
const FlightDescriptor& descriptor, const std::shared_ptr<Schema>& schema) {
std::vector<FlightEndpoint> endpoints{FlightEndpoint{{descriptor.cmd}, {}}};
ARROW_ASSIGN_OR_RAISE(auto result,
FlightInfo::Make(*schema, descriptor, endpoints, -1, -1))
FlightInfo::Make(*schema, descriptor, endpoints, -1, -1, false))

return std::make_unique<FlightInfo>(result);
}
Expand Down Expand Up @@ -1330,6 +1469,9 @@ Status GetScenario(const std::string& scenario_name, std::shared_ptr<Scenario>*
} else if (scenario_name == "middleware") {
*out = std::make_shared<MiddlewareScenario>();
return Status::OK();
} else if (scenario_name == "ordered") {
*out = std::make_shared<OrderedScenario>();
return Status::OK();
} else if (scenario_name == "flight_sql") {
*out = std::make_shared<FlightSqlScenario>();
return Status::OK();
Expand Down
2 changes: 1 addition & 1 deletion cpp/src/arrow/flight/perf_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ class FlightPerfServer : public FlightServerBase {
perf_request.stream_count() * perf_request.records_per_stream();

*info = std::make_unique<FlightInfo>(
MakeFlightInfo(*perf_schema_, request, endpoints, total_records, -1));
MakeFlightInfo(*perf_schema_, request, endpoints, total_records, -1, false));
return Status::OK();
}

Expand Down
2 changes: 2 additions & 0 deletions cpp/src/arrow/flight/serialization_internal.cc
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,7 @@ Status FromProto(const pb::FlightInfo& pb_info, FlightInfo::Data* info) {

info->total_records = pb_info.total_records();
info->total_bytes = pb_info.total_bytes();
info->ordered = pb_info.ordered();
return Status::OK();
}

Expand Down Expand Up @@ -236,6 +237,7 @@ Status ToProto(const FlightInfo& info, pb::FlightInfo* pb_info) {

pb_info->set_total_records(info.total_records());
pb_info->set_total_bytes(info.total_bytes());
pb_info->set_ordered(info.ordered());
return Status::OK();
}

Expand Down
7 changes: 4 additions & 3 deletions cpp/src/arrow/flight/sql/example/acero_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -167,9 +167,10 @@ class AceroFlightSqlServer : public FlightSqlServerBase {
ARROW_ASSIGN_OR_RAISE(auto ticket, CreateStatementQueryTicket(plan));
std::vector<FlightEndpoint> endpoints{
FlightEndpoint{Ticket{std::move(ticket)}, /*locations=*/{}}};
ARROW_ASSIGN_OR_RAISE(auto info,
FlightInfo::Make(schema, descriptor, std::move(endpoints),
/*total_records=*/-1, /*total_bytes=*/-1));
ARROW_ASSIGN_OR_RAISE(
auto info,
FlightInfo::Make(schema, descriptor, std::move(endpoints),
/*total_records=*/-1, /*total_bytes=*/-1, /*ordered=*/false));
return std::make_unique<FlightInfo>(std::move(info));
}

Expand Down
11 changes: 7 additions & 4 deletions cpp/src/arrow/flight/sql/example/sqlite_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ arrow::Result<std::unique_ptr<FlightInfo>> GetFlightInfoForCommand(
const FlightDescriptor& descriptor, const std::shared_ptr<Schema>& schema) {
std::vector<FlightEndpoint> endpoints{FlightEndpoint{{descriptor.cmd}, {}}};
ARROW_ASSIGN_OR_RAISE(auto result,
FlightInfo::Make(*schema, descriptor, endpoints, -1, -1))
FlightInfo::Make(*schema, descriptor, endpoints, -1, -1, false))

return std::make_unique<FlightInfo>(result);
}
Expand Down Expand Up @@ -304,8 +304,11 @@ class SQLiteFlightSqlServer::Impl {
ARROW_ASSIGN_OR_RAISE(auto ticket,
EncodeTransactionQuery(query, command.transaction_id));
std::vector<FlightEndpoint> endpoints{FlightEndpoint{std::move(ticket), {}}};
ARROW_ASSIGN_OR_RAISE(auto result,
FlightInfo::Make(*schema, descriptor, endpoints, -1, -1))
// TODO: Set true only when "ORDER BY" is used in a main "SELECT"
// in the given query.
const bool ordered = false;
ARROW_ASSIGN_OR_RAISE(
auto result, FlightInfo::Make(*schema, descriptor, endpoints, -1, -1, ordered));

return std::make_unique<FlightInfo>(result);
}
Expand Down Expand Up @@ -392,7 +395,7 @@ class SQLiteFlightSqlServer::Impl {
auto result,
FlightInfo::Make(include_schema ? *SqlSchema::GetTablesSchemaWithIncludedSchema()
: *SqlSchema::GetTablesSchema(),
descriptor, endpoints, -1, -1))
descriptor, endpoints, -1, -1, false))

return std::make_unique<FlightInfo>(std::move(result));
}
Expand Down
5 changes: 3 additions & 2 deletions cpp/src/arrow/flight/sql/server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -890,8 +890,9 @@ arrow::Result<std::unique_ptr<FlightInfo>> FlightSqlServerBase::GetFlightInfoSql
}

std::vector<FlightEndpoint> endpoints{FlightEndpoint{{descriptor.cmd}, {}}};
ARROW_ASSIGN_OR_RAISE(auto result, FlightInfo::Make(*SqlSchema::GetSqlInfoSchema(),
descriptor, endpoints, -1, -1))
ARROW_ASSIGN_OR_RAISE(
auto result, FlightInfo::Make(*SqlSchema::GetSqlInfoSchema(), descriptor, endpoints,
-1, -1, false))

return std::make_unique<FlightInfo>(result);
}
Expand Down
12 changes: 6 additions & 6 deletions cpp/src/arrow/flight/test_util.cc
Original file line number Diff line number Diff line change
Expand Up @@ -512,9 +512,9 @@ std::unique_ptr<FlightServerBase> ExampleTestServer() {

FlightInfo MakeFlightInfo(const Schema& schema, const FlightDescriptor& descriptor,
const std::vector<FlightEndpoint>& endpoints,
int64_t total_records, int64_t total_bytes) {
int64_t total_records, int64_t total_bytes, bool ordered) {
EXPECT_OK_AND_ASSIGN(auto info, FlightInfo::Make(schema, descriptor, endpoints,
total_records, total_bytes));
total_records, total_bytes, ordered));
return info;
}

Expand Down Expand Up @@ -600,10 +600,10 @@ std::vector<FlightInfo> ExampleFlightInfo() {
auto schema4 = ExampleFloatSchema();

return {
MakeFlightInfo(*schema1, descr1, {endpoint1, endpoint2}, 1000, 100000),
MakeFlightInfo(*schema2, descr2, {endpoint3}, 1000, 100000),
MakeFlightInfo(*schema3, descr3, {endpoint4}, -1, -1),
MakeFlightInfo(*schema4, descr4, {endpoint5}, 1000, 100000),
MakeFlightInfo(*schema1, descr1, {endpoint1, endpoint2}, 1000, 100000, false),
MakeFlightInfo(*schema2, descr2, {endpoint3}, 1000, 100000, false),
MakeFlightInfo(*schema3, descr3, {endpoint4}, -1, -1, false),
MakeFlightInfo(*schema4, descr4, {endpoint5}, 1000, 100000, false),
};
}

Expand Down
2 changes: 1 addition & 1 deletion cpp/src/arrow/flight/test_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ std::vector<ActionType> ExampleActionTypes();
ARROW_FLIGHT_EXPORT
FlightInfo MakeFlightInfo(const Schema& schema, const FlightDescriptor& descriptor,
const std::vector<FlightEndpoint>& endpoints,
int64_t total_records, int64_t total_bytes);
int64_t total_records, int64_t total_bytes, bool ordered);

// ----------------------------------------------------------------------
// A pair of authentication handlers that check for a predefined password
Expand Down
Loading