diff --git a/cpp/cmake_modules/FindArrow.cmake b/cpp/cmake_modules/FindArrow.cmake index f4b0a81afb8..3d5fd8ed70f 100644 --- a/cpp/cmake_modules/FindArrow.cmake +++ b/cpp/cmake_modules/FindArrow.cmake @@ -79,6 +79,14 @@ find_library(ARROW_PYTHON_LIB_PATH NAMES arrow_python NO_DEFAULT_PATH) get_filename_component(ARROW_PYTHON_LIBS ${ARROW_PYTHON_LIB_PATH} DIRECTORY) +if (PYARROW_BUILD_FLIGHT) + find_library(ARROW_FLIGHT_LIB_PATH NAMES arrow_flight + PATHS + ${ARROW_SEARCH_LIB_PATH} + NO_DEFAULT_PATH) + get_filename_component(ARROW_FLIGHT_LIBS ${ARROW_FLIGHT_LIB_PATH} DIRECTORY) +endif() + if (MSVC) SET(CMAKE_FIND_LIBRARY_SUFFIXES ".lib" ".dll") @@ -101,19 +109,25 @@ if (ARROW_INCLUDE_DIR AND ARROW_LIBS) set(ARROW_FOUND TRUE) set(ARROW_LIB_NAME arrow) set(ARROW_PYTHON_LIB_NAME arrow_python) + set(ARROW_FLIGHT_LIB_NAME arrow_flight) if (MSVC) set(ARROW_STATIC_LIB ${ARROW_LIBS}/${ARROW_LIB_NAME}${ARROW_MSVC_STATIC_LIB_SUFFIX}${CMAKE_STATIC_LIBRARY_SUFFIX}) set(ARROW_PYTHON_STATIC_LIB ${ARROW_PYTHON_LIBS}/${ARROW_PYTHON_LIB_NAME}${ARROW_MSVC_STATIC_LIB_SUFFIX}${CMAKE_STATIC_LIBRARY_SUFFIX}) + set(ARROW_FLIGHT_STATIC_LIB ${ARROW_FLIGHT_LIBS}/${ARROW_FLIGHT_LIB_NAME}${ARROW_MSVC_STATIC_LIB_SUFFIX}${CMAKE_STATIC_LIBRARY_SUFFIX}) set(ARROW_SHARED_LIB ${ARROW_SHARED_LIBS}/${ARROW_LIB_NAME}${CMAKE_SHARED_LIBRARY_SUFFIX}) set(ARROW_PYTHON_SHARED_LIB ${ARROW_PYTHON_SHARED_LIBS}/${ARROW_PYTHON_LIB_NAME}${CMAKE_SHARED_LIBRARY_SUFFIX}) + set(ARROW_FLIGHT_SHARED_LIB ${ARROW_FLIGHT_SHARED_LIBS}/${ARROW_FLIGHT_LIB_NAME}${CMAKE_SHARED_LIBRARY_SUFFIX}) set(ARROW_SHARED_IMP_LIB ${ARROW_LIBS}/${ARROW_LIB_NAME}.lib) set(ARROW_PYTHON_SHARED_IMP_LIB ${ARROW_PYTHON_LIBS}/${ARROW_PYTHON_LIB_NAME}.lib) + set(ARROW_FLIGHT_SHARED_IMP_LIB ${ARROW_FLIGHT_LIBS}/${ARROW_FLIGHT_LIB_NAME}.lib) else() set(ARROW_STATIC_LIB ${ARROW_LIBS}/lib${ARROW_LIB_NAME}.a) set(ARROW_PYTHON_STATIC_LIB ${ARROW_LIBS}/lib${ARROW_PYTHON_LIB_NAME}.a) + set(ARROW_FLIGHT_STATIC_LIB ${ARROW_LIBS}/lib${ARROW_FLIGHT_LIB_NAME}.a) set(ARROW_SHARED_LIB ${ARROW_LIBS}/lib${ARROW_LIB_NAME}${CMAKE_SHARED_LIBRARY_SUFFIX}) set(ARROW_PYTHON_SHARED_LIB ${ARROW_LIBS}/lib${ARROW_PYTHON_LIB_NAME}${CMAKE_SHARED_LIBRARY_SUFFIX}) + set(ARROW_FLIGHT_SHARED_LIB ${ARROW_LIBS}/lib${ARROW_FLIGHT_LIB_NAME}${CMAKE_SHARED_LIBRARY_SUFFIX}) endif() endif() diff --git a/cpp/src/arrow/flight/CMakeLists.txt b/cpp/src/arrow/flight/CMakeLists.txt index a32a5fa03c0..4650969eca2 100644 --- a/cpp/src/arrow/flight/CMakeLists.txt +++ b/cpp/src/arrow/flight/CMakeLists.txt @@ -21,7 +21,7 @@ add_custom_target(arrow_flight) arrow_install_all_headers("arrow/flight") set(ARROW_FLIGHT_STATIC_LINK_LIBS - protobuf_static + ${PROTOBUF_LIBRARY} grpc_grpcpp_static grpc_grpc_static grpc_gpr_static diff --git a/cpp/src/arrow/python/CMakeLists.txt b/cpp/src/arrow/python/CMakeLists.txt index 93dbd668655..9cf7eeb630f 100644 --- a/cpp/src/arrow/python/CMakeLists.txt +++ b/cpp/src/arrow/python/CMakeLists.txt @@ -44,12 +44,25 @@ set(ARROW_PYTHON_SRCS pyarrow.cc serialize.cc) +if(ARROW_FLIGHT) + set(ARROW_PYTHON_SRCS ${ARROW_PYTHON_SRCS} flight.cc) +endif() + if("${COMPILER_FAMILY}" STREQUAL "clang") set_property(SOURCE pyarrow.cc APPEND_STRING PROPERTY COMPILE_FLAGS " -Wno-cast-qual ") endif() set(ARROW_PYTHON_SHARED_LINK_LIBS arrow_shared ${PYTHON_OTHER_LIBS}) +if(ARROW_FLIGHT) + # Must link shared: we don't want to link more than one copy of gRPC + # into the eventual Cython shared object, otherwise gRPC calls fail + # with weird errors due to multiple copies of global static state + # (The other solution is to link gRPC shared everywhere instead of + # statically only in Flight) + set(ARROW_PYTHON_SHARED_LINK_LIBS ${ARROW_PYTHON_SHARED_LINK_LIBS} arrow_flight_shared) +endif() + if(WIN32) set(ARROW_PYTHON_SHARED_LINK_LIBS ${ARROW_PYTHON_SHARED_LINK_LIBS} ${PYTHON_LIBRARIES}) endif() diff --git a/cpp/src/arrow/python/flight.cc b/cpp/src/arrow/python/flight.cc new file mode 100644 index 00000000000..555033305cf --- /dev/null +++ b/cpp/src/arrow/python/flight.cc @@ -0,0 +1,86 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include + +#include "arrow/flight/internal.h" +#include "arrow/python/flight.h" + +namespace arrow { +namespace py { +namespace flight { + +PyFlightServer::PyFlightServer(PyObject* server, PyFlightServerVtable vtable) + : vtable_(vtable) { + Py_INCREF(server); + server_.reset(server); +} + +Status PyFlightServer::ListFlights( + const arrow::flight::Criteria* criteria, + std::unique_ptr* listings) { + return Status::NotImplemented("NYI"); +} + +Status PyFlightServer::GetFlightInfo(const arrow::flight::FlightDescriptor& request, + std::unique_ptr* info) { + PyAcquireGIL lock; + vtable_.get_flight_info(server_.obj(), request, info); + return CheckPyError(); +} + +Status PyFlightServer::DoGet(const arrow::flight::Ticket& request, + std::unique_ptr* stream) { + PyAcquireGIL lock; + vtable_.do_get(server_.obj(), request, stream); + return CheckPyError(); +} + +Status PyFlightServer::DoPut(std::unique_ptr reader) { + PyAcquireGIL lock; + vtable_.do_put(server_.obj(), std::move(reader)); + return CheckPyError(); +} + +Status PyFlightServer::DoAction(const arrow::flight::Action& action, + std::unique_ptr* result) { + return Status::NotImplemented("NYI"); +} + +Status PyFlightServer::ListActions(std::vector* actions) { + return Status::NotImplemented("NYI"); +} + +Status CreateFlightInfo(const std::shared_ptr& schema, + const arrow::flight::FlightDescriptor& descriptor, + const std::vector& endpoints, + uint64_t total_records, uint64_t total_bytes, + std::unique_ptr* out) { + arrow::flight::FlightInfo::Data flight_data; + RETURN_NOT_OK(arrow::flight::internal::SchemaToString(*schema, &flight_data.schema)); + flight_data.descriptor = descriptor; + flight_data.endpoints = endpoints; + flight_data.total_records = total_records; + flight_data.total_bytes = total_bytes; + arrow::flight::FlightInfo value(flight_data); + *out = std::unique_ptr(new arrow::flight::FlightInfo(value)); + return Status::OK(); +} + +} // namespace flight +} // namespace py +} // namespace arrow diff --git a/cpp/src/arrow/python/flight.h b/cpp/src/arrow/python/flight.h new file mode 100644 index 00000000000..774cf700041 --- /dev/null +++ b/cpp/src/arrow/python/flight.h @@ -0,0 +1,79 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#ifndef PYARROW_FLIGHT_H +#define PYARROW_FLIGHT_H + +#include +#include + +#include "arrow/flight/api.h" +#include "arrow/python/common.h" +#include "arrow/python/config.h" + +namespace arrow { + +namespace py { + +namespace flight { + +/// \brief A table of function pointers for calling from C++ into +/// Python. +class ARROW_PYTHON_EXPORT PyFlightServerVtable { + public: + std::function*)> + get_flight_info; + std::function)> + do_put; + std::function*)> + do_get; +}; + +class ARROW_PYTHON_EXPORT PyFlightServer : public arrow::flight::FlightServerBase { + public: + explicit PyFlightServer(PyObject* server, PyFlightServerVtable vtable); + + Status ListFlights(const arrow::flight::Criteria* criteria, + std::unique_ptr* listings) override; + Status GetFlightInfo(const arrow::flight::FlightDescriptor& request, + std::unique_ptr* info) override; + Status DoGet(const arrow::flight::Ticket& request, + std::unique_ptr* stream) override; + Status DoPut(std::unique_ptr reader) override; + Status DoAction(const arrow::flight::Action& action, + std::unique_ptr* result) override; + Status ListActions(std::vector* actions) override; + + private: + OwnedRefNoGIL server_; + PyFlightServerVtable vtable_; +}; + +ARROW_PYTHON_EXPORT +Status CreateFlightInfo(const std::shared_ptr& schema, + const arrow::flight::FlightDescriptor& descriptor, + const std::vector& endpoints, + uint64_t total_records, uint64_t total_bytes, + std::unique_ptr* out); + +} // namespace flight +} // namespace py +} // namespace arrow + +#endif // PYARROW_FLIGHT_H diff --git a/python/CMakeLists.txt b/python/CMakeLists.txt index 0559261d5bc..63a8cd09cbc 100644 --- a/python/CMakeLists.txt +++ b/python/CMakeLists.txt @@ -59,6 +59,7 @@ endif() # Top level cmake dir if("${CMAKE_SOURCE_DIR}" STREQUAL "${CMAKE_CURRENT_SOURCE_DIR}") option(PYARROW_BUILD_CUDA "Build the PyArrow CUDA support" OFF) + option(PYARROW_BUILD_FLIGHT "Build the PyArrow Flight integration" OFF) option(PYARROW_BUILD_GANDIVA "Build the PyArrow Gandiva integration" OFF) option(PYARROW_BUILD_PARQUET "Build the PyArrow Parquet integration" OFF) option(PYARROW_PARQUET_USE_SHARED "Rely on parquet shared libraries where relevant" ON) @@ -191,6 +192,10 @@ include_directories(SYSTEM ${NUMPY_INCLUDE_DIRS} ${PYTHON_INCLUDE_DIRS} src) # Dependencies # +if(PYARROW_BUILD_FLIGHT) + set(ARROW_FLIGHT TRUE) +endif() + # Arrow find_package(Arrow REQUIRED) include_directories(SYSTEM ${ARROW_INCLUDE_DIR}) @@ -352,9 +357,15 @@ endif() if(MSVC) add_thirdparty_lib(arrow SHARED_LIB ${ARROW_SHARED_IMP_LIB}) add_thirdparty_lib(arrow_python SHARED_LIB ${ARROW_PYTHON_SHARED_IMP_LIB}) + if(PYARROW_BUILD_FLIGHT) + add_thirdparty_lib(arrow_flight SHARED_LIB ${ARROW_FLIGHT_SHARED_IMP_LIB}) + endif() else() add_thirdparty_lib(arrow SHARED_LIB ${ARROW_SHARED_LIB}) add_thirdparty_lib(arrow_python SHARED_LIB ${ARROW_PYTHON_SHARED_LIB}) + if(PYARROW_BUILD_FLIGHT) + add_thirdparty_lib(arrow_flight SHARED_LIB ${ARROW_FLIGHT_SHARED_LIB}) + endif() endif() # @@ -474,6 +485,20 @@ if(PYARROW_BUILD_ORC) set(CYTHON_EXTENSIONS ${CYTHON_EXTENSIONS} _orc) endif() +# Flight +if(PYARROW_BUILD_FLIGHT) + if(PYARROW_BUNDLE_ARROW_CPP) + # TODO: + message(FATAL_ERROR "Not yet implemented: bundling arrow-flight in pyarrow") + endif() + # We do NOT want to link gRPC or any other Flight dependency + # here. Linking more than one copy leads to odd runtime errors due + # to multiple copies of static global state. Thus we also need to + # link Flight as a shared object. + set(LINK_LIBS ${LINK_LIBS} arrow_flight_shared) + set(CYTHON_EXTENSIONS ${CYTHON_EXTENSIONS} _flight) +endif() + # Gandiva if(PYARROW_BUILD_GANDIVA) find_package(Gandiva) diff --git a/python/examples/flight/client.py b/python/examples/flight/client.py new file mode 100644 index 00000000000..df9acc1cb89 --- /dev/null +++ b/python/examples/flight/client.py @@ -0,0 +1,139 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +"""An example Flight CLI client.""" + +import argparse +import sys + +import pyarrow +import pyarrow.flight + + +def list_flights(args, client): + print('Flights\n=======') + for flight in client.list_flights(): + descriptor = flight.descriptor + if descriptor.descriptor_type == pyarrow.flight.DescriptorType.PATH: + print("Path:", descriptor.path) + elif descriptor.descriptor_type == pyarrow.flight.DescriptorType.CMD: + print("Command:", descriptor.command) + else: + print("Unknown descriptor type") + + print("Total records:", end=" ") + if flight.total_records >= 0: + print(flight.total_records) + else: + print("Unknown") + + print("Total bytes:", end=" ") + if flight.total_bytes >= 0: + print(flight.total_bytes) + else: + print("Unknown") + + print("Number of endpoints:", len(flight.endpoints)) + + if args.list: + print(flight.schema) + + print('---') + + print('\nActions\n=======') + for action in client.list_actions(): + print("Type:", action.type) + print("Description:", action.description) + print('---') + + +def do_action(args, client): + try: + buf = pyarrow.allocate_buffer(0) + action = pyarrow.flight.Action(args.action_type, buf) + print('Running action', args.action_type) + for result in client.do_action(action): + print("Got result", result.body.to_pybytes()) + except pyarrow.lib.ArrowIOError as e: + print("Error calling action:", e) + + +def get_flight(args, client): + if args.path: + descriptor = pyarrow.flight.FlightDescriptor.for_path(*args.path) + else: + descriptor = pyarrow.flight.FlightDescriptor.for_command(args.command) + + info = client.get_flight_info(descriptor) + for endpoint in info.endpoints: + print('Ticket:', endpoint.ticket) + for location in endpoint.locations: + print(location) + get_client = pyarrow.flight.FlightClient.connect(location) + reader = get_client.do_get(endpoint.ticket, info.schema) + df = reader.read_pandas() + print(df) + + +def _add_common_arguments(parser): + parser.add_argument('host', type=str, + help="The host to connect to.") + + +def main(): + parser = argparse.ArgumentParser() + subcommands = parser.add_subparsers() + + cmd_list = subcommands.add_parser('list') + cmd_list.set_defaults(action='list') + _add_common_arguments(cmd_list) + cmd_list.add_argument('-l', '--list', action='store_true', + help="Print more details.") + + cmd_do = subcommands.add_parser('do') + cmd_do.set_defaults(action='do') + _add_common_arguments(cmd_do) + cmd_do.add_argument('action_type', type=str, + help="The action type to run.") + + cmd_get = subcommands.add_parser('get') + cmd_get.set_defaults(action='get') + _add_common_arguments(cmd_get) + cmd_get_descriptor = cmd_get.add_mutually_exclusive_group(required=True) + cmd_get_descriptor.add_argument('-p', '--path', type=str, action='append', + help="The path for the descriptor.") + cmd_get_descriptor.add_argument('-c', '--command', type=str, + help="The command for the descriptor.") + + args = parser.parse_args() + if not hasattr(args, 'action'): + parser.print_help() + sys.exit(1) + + commands = { + 'list': list_flights, + 'do': do_action, + 'get': get_flight, + } + host, port = args.host.split(':') + port = int(port) + client = pyarrow.flight.FlightClient.connect(host, port) + commands[args.action](args, client) + + +if __name__ == '__main__': + main() diff --git a/python/examples/flight/server.py b/python/examples/flight/server.py new file mode 100644 index 00000000000..cf8668a58ec --- /dev/null +++ b/python/examples/flight/server.py @@ -0,0 +1,56 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +"""An example Flight Python server.""" + +import pyarrow +import pyarrow.flight + + +class FlightServer(pyarrow.flight.FlightServerBase): + def __init__(self): + super(FlightServer, self).__init__() + self.flights = {} + + @classmethod + def descriptor_to_key(self, descriptor): + return (descriptor.descriptor_type, descriptor.command, + tuple(descriptor.path or tuple())) + + def get_flight_info(self, descriptor): + key = FlightServer.descriptor_to_key(descriptor) + if key in self.flights: + table = self.flights[key] + return pyarrow.flight.FlightInfo(table.schema, + descriptor, [], + table.num_rows, 0) + raise KeyError('Flight not found.') + + def do_put(self, descriptor, reader): + key = FlightServer.descriptor_to_key(descriptor) + print(key) + self.flights[key] = reader.read_all() + print(self.flights[key]) + + +def main(): + server = FlightServer() + server.run(5005) + + +if __name__ == '__main__': + main() diff --git a/python/pyarrow/_flight.pyx b/python/pyarrow/_flight.pyx new file mode 100644 index 00000000000..48191710414 --- /dev/null +++ b/python/pyarrow/_flight.pyx @@ -0,0 +1,476 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# cython: language_level = 3 + +import collections +import enum + +from cython.operator cimport dereference as deref + +from pyarrow.compat import frombytes, tobytes +from pyarrow.lib cimport * +from pyarrow.lib import as_buffer +from pyarrow.includes.libarrow_flight cimport * +from pyarrow.ipc import _ReadPandasOption +import pyarrow.lib as lib + + +cdef class Action: + """An action executable on a Flight service.""" + cdef: + CAction action + + def __init__(self, action_type, buf): + self.action.type = tobytes(action_type) + self.action.body = pyarrow_unwrap_buffer(as_buffer(buf)) + + @property + def type(self): + return frombytes(self.action.type) + + def body(self): + return pyarrow_wrap_buffer(self.action.body) + + +cdef class ActionType: + """A type of action executable on a Flight service.""" + cdef: + CActionType action_type + + @property + def type(self): + return frombytes(self.action_type.type) + + @property + def description(self): + return frombytes(self.action_type.description) + + def make_action(self, buf): + """Create an Action with this type.""" + return Action(self.type, buf) + + def __repr__(self): + return ''.format( + self.type, self.description) + + +cdef class Result: + """A result from executing an Action.""" + cdef: + unique_ptr[CResult] result + + @property + def body(self): + """Get the Buffer containing the result.""" + return pyarrow_wrap_buffer(self.result.get().body) + + +class DescriptorType(enum.Enum): + UNKNOWN = 0 + PATH = 1 + CMD = 2 + + +cdef class FlightDescriptor: + """A description of a data stream available from a Flight service.""" + cdef: + CFlightDescriptor descriptor + + def __init__(self): + raise TypeError("Do not call {}'s constructor directly, use " + "`pyarrow.flight.FlightDescriptor.for_{path,command}` " + "function instead." + .format(self.__class__.__name__)) + + @staticmethod + def for_path(*path): + """Create a FlightDescriptor for a resource path.""" + cdef FlightDescriptor result = \ + FlightDescriptor.__new__(FlightDescriptor) + result.descriptor.type = CDescriptorTypePath + result.descriptor.path = [tobytes(p) for p in path] + return result + + @staticmethod + def for_command(command): + """Create a FlightDescriptor for an opaque command.""" + cdef FlightDescriptor result = \ + FlightDescriptor.__new__(FlightDescriptor) + result.descriptor.type = CDescriptorTypeCmd + result.descriptor.cmd = tobytes(command) + return result + + @property + def descriptor_type(self): + if self.descriptor.type == CDescriptorTypeUnknown: + return DescriptorType.UNKNOWN + elif self.descriptor.type == CDescriptorTypePath: + return DescriptorType.PATH + elif self.descriptor.type == CDescriptorTypeCmd: + return DescriptorType.CMD + raise RuntimeError("Invalid descriptor type!") + + @property + def command(self): + """Get the command for this descriptor.""" + if self.descriptor_type != DescriptorType.CMD: + return None + return self.descriptor.cmd + + @property + def path(self): + """Get the path for this descriptor.""" + if self.descriptor_type != DescriptorType.PATH: + return None + return self.descriptor.path + + def __repr__(self): + return "".format(self.descriptor_type()) + + +class Ticket: + """A ticket for requesting a Flight stream.""" + def __init__(self, ticket): + self.ticket = ticket + + def __repr__(self): + return ''.format(self.ticket) + + +class Location(collections.namedtuple('Location', ['host', 'port'])): + """A location where a Flight stream is available.""" + + +cdef class FlightEndpoint: + """A Flight stream, along with the ticket and locations to access it.""" + cdef: + CFlightEndpoint endpoint + + def __init__(self, ticket, locations): + """Create a FlightEndpoint from a ticket and list of locations. + + Parameters + ---------- + ticket : Ticket or bytes + the ticket needed to access this flight + locations : list of Location or tuples of (host, port) + locations where this flight is available + """ + cdef: + CLocation c_location = CLocation() + + if isinstance(ticket, Ticket): + self.endpoint.ticket.ticket = ticket.ticket + else: + self.endpoint.ticket.ticket = ticket + + for location in locations: + # Accepts Location namedtuple or tuple + c_location.host = tobytes(location[0]) + c_location.port = location[1] + self.endpoint.locations.push_back(c_location) + + @property + def ticket(self): + return Ticket(self.endpoint.ticket.ticket) + + @property + def locations(self): + return [Location(frombytes(location.host), location.port) + for location in self.endpoint.locations] + + +cdef class FlightInfo: + """A description of a Flight stream.""" + cdef: + unique_ptr[CFlightInfo] info + + def __init__(self, Schema schema, FlightDescriptor descriptor, endpoints, + total_records, total_bytes): + """Create a FlightInfo object from a schema, descriptor, and endpoints. + + Parameters + ---------- + schema : Schema + the schema of the data in this flight. + descriptor : FlightDescriptor + the descriptor for this flight. + endpoints : list of FlightEndpoint + a list of endpoints where this flight is available. + total_records : int + the total records in this flight, or -1 if unknown + total_bytes : int + the total bytes in this flight, or -1 if unknown + """ + cdef: + shared_ptr[CSchema] c_schema = pyarrow_unwrap_schema(schema) + vector[CFlightEndpoint] c_endpoints + + for endpoint in endpoints: + if isinstance(endpoint, FlightEndpoint): + c_endpoints.push_back(( endpoint).endpoint) + else: + raise TypeError('Endpoint {} is not instance of' + ' FlightEndpoint'.format(endpoint)) + + check_status(CreateFlightInfo(c_schema, + descriptor.descriptor, + c_endpoints, + total_records, + total_bytes, &self.info)) + + @property + def total_records(self): + """The total record count of this flight, or -1 if unknown.""" + return self.info.get().total_records() + + @property + def total_bytes(self): + """The size in bytes of the data in this flight, or -1 if unknown.""" + return self.info.get().total_bytes() + + @property + def schema(self): + """The schema of the data in this flight.""" + cdef: + shared_ptr[CSchema] schema + check_status(self.info.get().GetSchema(&schema)) + return pyarrow_wrap_schema(schema) + + @property + def descriptor(self): + """The descriptor of the data in this flight.""" + cdef FlightDescriptor result = \ + FlightDescriptor.__new__(FlightDescriptor) + result.descriptor = self.info.get().descriptor() + return result + + @property + def endpoints(self): + """The endpoints where this flight is available.""" + # TODO: get Cython to iterate over reference directly + cdef: + vector[CFlightEndpoint] endpoints = self.info.get().endpoints() + FlightEndpoint py_endpoint + + result = [] + for endpoint in endpoints: + py_endpoint = FlightEndpoint.__new__() + py_endpoint.endpoint = endpoint + result.append(py_endpoint) + return result + + +cdef class FlightRecordBatchReader(_CRecordBatchReader, _ReadPandasOption): + cdef dict __dict__ + + +cdef class FlightRecordBatchWriter(_CRecordBatchWriter): + pass + + +cdef class FlightClient: + """A client to a Flight service.""" + cdef: + unique_ptr[CFlightClient] client + + def __init__(self): + raise TypeError("Do not call {}'s constructor directly, use " + "`pyarrow.flight.FlightClient.connect` instead." + .format(self.__class__.__name__)) + + @staticmethod + def connect(*args): + """Connect to a Flight service on the given host and port.""" + cdef: + FlightClient result = FlightClient.__new__(FlightClient) + int c_port = 0 + c_string c_host + + if len(args) == 1: + # Accept namedtuple or plain tuple + c_host = tobytes(args[0][0]) + c_port = args[0][1] + elif len(args) == 2: + # Accept separate host, port + c_host = tobytes(args[0]) + c_port = args[1] + else: + raise TypeError("FlightClient.connect() takes 1 " + "or 2 arguments ({} given)".format(len(args))) + + with nogil: + check_status(CFlightClient.Connect(c_host, c_port, &result.client)) + + return result + + def list_actions(self): + """List the actions available on a service.""" + cdef: + vector[CActionType] results + + with nogil: + check_status(self.client.get().ListActions(&results)) + + result = [] + for action_type in results: + py_action = ActionType() + py_action.action_type = action_type + result.append(py_action) + + return result + + def do_action(self, action: Action): + """Execute an action on a service.""" + cdef: + unique_ptr[CResultStream] results + with nogil: + check_status(self.client.get().DoAction(action.action, &results)) + + while True: + result = Result() + with nogil: + check_status(results.get().Next(&result.result)) + if result.result == NULL: + break + yield result + + def list_flights(self): + """List the flights available on a service.""" + cdef: + unique_ptr[CFlightListing] listing + FlightInfo result + + with nogil: + check_status(self.client.get().ListFlights(&listing)) + + while True: + result = FlightInfo.__new__(FlightInfo) + with nogil: + check_status(listing.get().Next(&result.info)) + if result.info == NULL: + break + yield result + + def get_flight_info(self, descriptor: FlightDescriptor): + """Request information about an available flight.""" + cdef: + FlightInfo result = FlightInfo.__new__(FlightInfo) + + with nogil: + check_status(self.client.get().GetFlightInfo( + descriptor.descriptor, &result.info)) + + return result + + def do_get(self, ticket: Ticket, schema: Schema): + """Request the data for a flight.""" + cdef: + # TODO: introduce unwrap + CTicket c_ticket + shared_ptr[CSchema] c_schema = pyarrow_unwrap_schema(schema) + unique_ptr[CRecordBatchReader] reader + + c_ticket.ticket = ticket.ticket + with nogil: + check_status(self.client.get().DoGet(c_ticket, c_schema, &reader)) + result = FlightRecordBatchReader() + result.reader.reset(reader.release()) + return result + + def do_put(self, descriptor: FlightDescriptor, schema: Schema): + """Upload data to a flight.""" + cdef: + shared_ptr[CSchema] c_schema = pyarrow_unwrap_schema(schema) + unique_ptr[CRecordBatchWriter] writer + + with nogil: + check_status(self.client.get().DoPut( + descriptor.descriptor, c_schema, &writer)) + result = FlightRecordBatchWriter() + result.writer.reset(writer.release()) + return result + + +cdef class FlightDataStream: + cdef: + unique_ptr[CFlightDataStream] stream + + +cdef class RecordBatchStream(FlightDataStream): + def __init__(self, reader): + # TODO: we don't really expose the readers in Python. + pass + + +cdef void _get_flight_info(void* self, CFlightDescriptor c_descriptor, + unique_ptr[CFlightInfo]* info): + """Callback for implementing Flight servers in Python.""" + raise NotImplementedError("GetFlightInfo is not implemented") + + +cdef void _do_put(void* self, unique_ptr[CFlightMessageReader] reader): + """Callback for implementing Flight servers in Python.""" + cdef: + FlightRecordBatchReader py_reader = FlightRecordBatchReader() + FlightDescriptor descriptor = \ + FlightDescriptor.__new__(FlightDescriptor) + + descriptor.descriptor = reader.get().descriptor() + py_reader.reader.reset(reader.release()) + ( self).do_put(descriptor, py_reader) + + +cdef void _do_get(void* self, CTicket ticket, + unique_ptr[CFlightDataStream]* stream): + """Callback for implementing Flight servers in Python.""" + py_ticket = Ticket() + py_ticket.ticket = ticket.ticket + result = ( self).do_get(py_ticket) + if not isinstance(result, FlightDataStream): + raise TypeError("FlightServerBase.do_get must return " + "a FlightDataStream") + stream[0] = move(( result).stream) + + +cdef class FlightServerBase: + """A Flight service definition.""" + + cdef: + unique_ptr[PyFlightServer] server + + def run(self, port): + cdef: + PyFlightServerVtable vtable = PyFlightServerVtable() + int c_port = port + vtable.get_flight_info = &_get_flight_info + vtable.do_put = &_do_put + vtable.do_get = &_do_get + self.server.reset(new PyFlightServer(self, vtable)) + with nogil: + self.server.get().Run(c_port) + + def get_flight_info(self, descriptor): + raise NotImplementedError + + def do_put(self, descriptor, reader): + raise NotImplementedError + + def shutdown(self): + if self.server.get() != NULL: + self.server.get().Shutdown() diff --git a/python/pyarrow/flight.py b/python/pyarrow/flight.py new file mode 100644 index 00000000000..d238c3715af --- /dev/null +++ b/python/pyarrow/flight.py @@ -0,0 +1,20 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from pyarrow._flight import (FlightClient, Action, ActionType, # noqa + FlightDescriptor, FlightInfo, Ticket, Location, + FlightServerBase, DescriptorType) diff --git a/python/pyarrow/includes/common.pxd b/python/pyarrow/includes/common.pxd index 1b13ff0c127..97e23f98504 100644 --- a/python/pyarrow/includes/common.pxd +++ b/python/pyarrow/includes/common.pxd @@ -41,8 +41,12 @@ cdef extern from "numpy/halffloat.h": cdef extern from "arrow/api.h" namespace "arrow" nogil: # We can later add more of the common status factory methods as needed - cdef CStatus CStatus_OK "Status::OK"() - cdef CStatus CStatus_Invalid "Status::Invalid"() + cdef CStatus CStatus_OK "arrow::Status::OK"() + cdef CStatus CStatus_Invalid "arrow::Status::Invalid"() + cdef CStatus CStatus_NotImplemented \ + "arrow::Status::NotImplemented"(const c_string& msg) + cdef CStatus CStatus_UnknownError \ + "arrow::Status::UnknownError"(const c_string& msg) cdef cppclass CStatus "arrow::Status": CStatus() diff --git a/python/pyarrow/includes/libarrow_flight.pxd b/python/pyarrow/includes/libarrow_flight.pxd new file mode 100644 index 00000000000..4d64f521e11 --- /dev/null +++ b/python/pyarrow/includes/libarrow_flight.pxd @@ -0,0 +1,142 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# distutils: language = c++ + +from libcpp.functional cimport function + +from pyarrow.includes.common cimport * +from pyarrow.includes.libarrow cimport * + + +cdef extern from "arrow/flight/api.h" namespace "arrow" nogil: + cdef cppclass CActionType" arrow::flight::ActionType": + c_string type + c_string description + + cdef cppclass CAction" arrow::flight::Action": + c_string type + shared_ptr[CBuffer] body + + cdef cppclass CResult" arrow::flight::Result": + shared_ptr[CBuffer] body + + cdef cppclass CResultStream" arrow::flight::ResultStream": + CStatus Next(unique_ptr[CResult]* result) + + cdef cppclass CDescriptorType \ + " arrow::flight::FlightDescriptor::DescriptorType": + bint operator==(CDescriptorType) + + CDescriptorType CDescriptorTypeUnknown\ + " arrow::flight::FlightDescriptor::UNKNOWN" + CDescriptorType CDescriptorTypePath\ + " arrow::flight::FlightDescriptor::PATH" + CDescriptorType CDescriptorTypeCmd\ + " arrow::flight::FlightDescriptor::CMD" + + cdef cppclass CFlightDescriptor" arrow::flight::FlightDescriptor": + CDescriptorType type + c_string cmd + vector[c_string] path + + cdef cppclass CTicket" arrow::flight::Ticket": + CTicket() + c_string ticket + + cdef cppclass CLocation" arrow::flight::Location": + CLocation() + + c_string host + int32_t port + + cdef cppclass CFlightEndpoint" arrow::flight::FlightEndpoint": + CFlightEndpoint() + + CTicket ticket + vector[CLocation] locations + + cdef cppclass CFlightInfo" arrow::flight::FlightInfo": + uint64_t total_records() + uint64_t total_bytes() + CStatus GetSchema(shared_ptr[CSchema]* out) + CFlightDescriptor& descriptor() + const vector[CFlightEndpoint]& endpoints() + + cdef cppclass CFlightListing" arrow::flight::FlightListing": + CStatus Next(unique_ptr[CFlightInfo]* info) + + cdef cppclass CFlightMessageReader \ + " arrow::flight::FlightMessageReader"(CRecordBatchReader): + CFlightDescriptor& descriptor() + + cdef cppclass CFlightDataStream" arrow::flight::FlightDataStream": + pass + + cdef cppclass CRecordBatchStream \ + " arrow::flight::RecordBatchStream"(CFlightDataStream): + CRecordBatchStream(shared_ptr[CRecordBatchReader]& reader) + + cdef cppclass CFlightClient" arrow::flight::FlightClient": + @staticmethod + CStatus Connect(const c_string& host, int port, + unique_ptr[CFlightClient]* client) + + CStatus DoAction(CAction& action, unique_ptr[CResultStream]* results) + CStatus ListActions(vector[CActionType]* actions) + + CStatus ListFlights(unique_ptr[CFlightListing]* listing) + CStatus GetFlightInfo(CFlightDescriptor& descriptor, + unique_ptr[CFlightInfo]* info) + + CStatus DoGet(CTicket& ticket, shared_ptr[CSchema]& schema, + unique_ptr[CRecordBatchReader]* stream) + CStatus DoPut(CFlightDescriptor& descriptor, + shared_ptr[CSchema]& schema, + unique_ptr[CRecordBatchWriter]* stream) + + +# Callbacks for implementing Flight servers +# Use typedef to emulate syntax for std::function +ctypedef void cb_get_flight_info(object, const CFlightDescriptor&, + unique_ptr[CFlightInfo]*) +ctypedef void cb_do_put(object, unique_ptr[CFlightMessageReader]) +ctypedef void cb_do_get(object, const CTicket&, + unique_ptr[CFlightDataStream]*) + +cdef extern from "arrow/python/flight.h" namespace "arrow::py::flight" nogil: + cdef cppclass PyFlightServerVtable: + PyFlightServerVtable() + function[cb_get_flight_info] get_flight_info + function[cb_do_put] do_put + function[cb_do_get] do_get + + cdef cppclass PyFlightServer: + PyFlightServer(object server, PyFlightServerVtable vtable) + void Run(int port) + void Shutdown() + + cdef CStatus CreateFlightInfo" arrow::py::flight::CreateFlightInfo"( + shared_ptr[CSchema] schema, + CFlightDescriptor& descriptor, + vector[CFlightEndpoint] endpoints, + uint64_t total_records, + uint64_t total_bytes, + unique_ptr[CFlightInfo]* out) + +cdef extern from "" namespace "std": + unique_ptr[CFlightDataStream] move(unique_ptr[CFlightDataStream]) diff --git a/python/pyarrow/ipc.pxi b/python/pyarrow/ipc.pxi index 137d5261d24..e8573022bb4 100644 --- a/python/pyarrow/ipc.pxi +++ b/python/pyarrow/ipc.pxi @@ -148,26 +148,14 @@ cdef class MessageReader: # ---------------------------------------------------------------------- # File and stream readers and writers -cdef class _RecordBatchWriter: - cdef: - shared_ptr[CRecordBatchWriter] writer - shared_ptr[OutputStream] sink - bint closed - - def __cinit__(self): - pass - - def __dealloc__(self): - pass +cdef class _CRecordBatchWriter: + """The base RecordBatchWriter wrapper. - def _open(self, sink, Schema schema): - get_writer(sink, &self.sink) + Provides common implementations of convenience methods. Should not + be instantiated directly by user code. + """ - with nogil: - check_status( - CRecordBatchStreamWriter.Open(self.sink.get(), - schema.sp_schema, - &self.writer)) + # cdef block is in lib.pxd def write(self, table_or_batch): """ @@ -222,6 +210,33 @@ cdef class _RecordBatchWriter: with nogil: check_status(self.writer.get().Close()) + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + self.close() + + +cdef class _RecordBatchStreamWriter(_CRecordBatchWriter): + cdef: + shared_ptr[OutputStream] sink + bint closed + + def __cinit__(self): + pass + + def __dealloc__(self): + pass + + def _open(self, sink, Schema schema): + get_writer(sink, &self.sink) + + with nogil: + check_status( + CRecordBatchStreamWriter.Open(self.sink.get(), + schema.sp_schema, + &self.writer)) + cdef _get_input_stream(object source, shared_ptr[InputStream]* out): cdef: @@ -237,24 +252,14 @@ cdef _get_input_stream(object source, shared_ptr[InputStream]* out): out[0] = file_handle -cdef class _RecordBatchReader: - cdef: - shared_ptr[CRecordBatchReader] reader - shared_ptr[InputStream] in_stream - - cdef readonly: - Schema schema - - def __cinit__(self): - pass +cdef class _CRecordBatchReader: + """The base RecordBatchReader wrapper. - def _open(self, source): - _get_input_stream(source, &self.in_stream) - with nogil: - check_status(CRecordBatchStreamReader.Open( - self.in_stream.get(), &self.reader)) + Provides common implementations of convenience methods. Should not + be instantiated directly by user code. + """ - self.schema = pyarrow_wrap_schema(self.reader.get().schema()) + # cdef block is in lib.pxd def __iter__(self): while True: @@ -291,7 +296,26 @@ cdef class _RecordBatchReader: return pyarrow_wrap_table(table) -cdef class _RecordBatchFileWriter(_RecordBatchWriter): +cdef class _RecordBatchStreamReader(_CRecordBatchReader): + cdef: + shared_ptr[InputStream] in_stream + + cdef readonly: + Schema schema + + def __cinit__(self): + pass + + def _open(self, source): + _get_input_stream(source, &self.in_stream) + with nogil: + check_status(CRecordBatchStreamReader.Open( + self.in_stream.get(), &self.reader)) + + self.schema = pyarrow_wrap_schema(self.reader.get().schema()) + + +cdef class _RecordBatchFileWriter(_RecordBatchStreamWriter): def _open(self, sink, Schema schema): get_writer(sink, &self.sink) diff --git a/python/pyarrow/ipc.py b/python/pyarrow/ipc.py index a79cafe2471..78bb347da48 100644 --- a/python/pyarrow/ipc.py +++ b/python/pyarrow/ipc.py @@ -45,7 +45,7 @@ def read_pandas(self, **options): return table.to_pandas(**options) -class RecordBatchStreamReader(lib._RecordBatchReader, _ReadPandasOption): +class RecordBatchStreamReader(lib._RecordBatchStreamReader, _ReadPandasOption): """ Reader for the Arrow streaming binary format @@ -58,7 +58,7 @@ def __init__(self, source): self._open(source) -class RecordBatchStreamWriter(lib._RecordBatchWriter): +class RecordBatchStreamWriter(lib._RecordBatchStreamWriter): """ Writer for the Arrow streaming binary format diff --git a/python/pyarrow/lib.pxd b/python/pyarrow/lib.pxd index 8cd8f401a27..6f14e767ee6 100644 --- a/python/pyarrow/lib.pxd +++ b/python/pyarrow/lib.pxd @@ -383,6 +383,16 @@ cdef class NativeFile: cdef shared_ptr[OutputStream] get_output_stream(self) except * +cdef class _CRecordBatchWriter: + cdef: + shared_ptr[CRecordBatchWriter] writer + + +cdef class _CRecordBatchReader: + cdef: + shared_ptr[CRecordBatchReader] reader + + cdef get_input_stream(object source, c_bool use_memory_map, shared_ptr[InputStream]* reader) cdef get_reader(object source, c_bool use_memory_map, diff --git a/python/requirements.txt b/python/requirements.txt index 3a23d1dacf8..ba67f6b5802 100644 --- a/python/requirements.txt +++ b/python/requirements.txt @@ -1,3 +1,4 @@ six>=1.0.0 numpy>=1.14 futures; python_version < "3.2" +enum34 >= 1.1.6; python_version < "3.4" diff --git a/python/setup.py b/python/setup.py index e4a793686d0..0fc89c0f7c6 100755 --- a/python/setup.py +++ b/python/setup.py @@ -99,6 +99,7 @@ def run(self): ('boost-namespace=', None, 'namespace of boost (default: boost)'), ('with-cuda', None, 'build the Cuda extension'), + ('with-flight', None, 'build the Flight extension'), ('with-parquet', None, 'build the Parquet extension'), ('with-static-parquet', None, 'link parquet statically'), ('with-static-boost', None, 'link boost statically'), @@ -136,6 +137,8 @@ def initialize_options(self): self.with_cuda = strtobool( os.environ.get('PYARROW_WITH_CUDA', '0')) + self.with_flight = strtobool( + os.environ.get('PYARROW_WITH_FLIGHT', '0')) self.with_parquet = strtobool( os.environ.get('PYARROW_WITH_PARQUET', '0')) self.with_static_parquet = strtobool( @@ -161,6 +164,7 @@ def initialize_options(self): 'lib', '_csv', '_cuda', + '_flight', '_parquet', '_orc', '_plasma', @@ -200,6 +204,8 @@ def _run_cmake(self): cmake_options += ['-G', self.cmake_generator] if self.with_cuda: cmake_options.append('-DPYARROW_BUILD_CUDA=on') + if self.with_flight: + cmake_options.append('-DPYARROW_BUILD_FLIGHT=on') if self.with_parquet: cmake_options.append('-DPYARROW_BUILD_PARQUET=on') if self.with_static_parquet: @@ -344,6 +350,8 @@ def _run_cmake(self): move_shared_libs(build_prefix, build_lib, "arrow_python") if self.with_cuda: move_shared_libs(build_prefix, build_lib, "arrow_gpu") + if self.with_flight: + move_shared_libs(build_prefix, build_lib, "arrow_flight") if self.with_plasma: move_shared_libs(build_prefix, build_lib, "plasma") if self.with_gandiva: @@ -384,6 +392,8 @@ def _failure_permitted(self, name): return True if name == '_orc' and not self.with_orc: return True + if name == '_flight' and not self.with_flight: + return True if name == '_cuda' and not self.with_cuda: return True if name == 'gandiva' and not self.with_gandiva: @@ -530,7 +540,8 @@ def has_ext_modules(foo): install_requires = ( 'numpy >= 1.14', 'six >= 1.0.0', - 'futures; python_version < "3.2"' + 'futures; python_version < "3.2"', + 'enum34 >= 1.1.6; python_version < "3.4"', )