From c6b02aa94f99ddf7549e2f19a268de501a3479bb Mon Sep 17 00:00:00 2001 From: David Li Date: Wed, 30 Jan 2019 11:29:30 -0500 Subject: [PATCH 1/5] Add basic Python bindings for Flight Add simple demo Flight client in Python Add basic docstrings for Python Flight client Move RecordBatchReader/Writer wrappers to lib.pxd Link to PROTOBUF_LIBRARY, not protobuf_static Update for FlightPutWriter Start binding Flight server to Python Allow Flight DoPut to be implemented in Python Revert unnecessary build process changes Fix failing pyarrow unit tests Lint Python flight code Lint Cython code Delete unnecessary Flight pxd Use Arrow utilities for interfacing with Python in Flight Simplify API of Python Flight bindings Add Python enum to mirror DescriptorType in Flight Replace static factory methods with constructors --- cpp/cmake_modules/FindArrow.cmake | 14 + cpp/src/arrow/flight/CMakeLists.txt | 2 +- cpp/src/arrow/python/CMakeLists.txt | 61 ++- cpp/src/arrow/python/flight.cc | 86 ++++ cpp/src/arrow/python/flight.h | 79 ++++ python/CMakeLists.txt | 67 ++- python/examples/flight/client.py | 139 ++++++ python/examples/flight/server.py | 56 +++ python/pyarrow/_flight.pyx | 486 ++++++++++++++++++++ python/pyarrow/flight.py | 20 + python/pyarrow/includes/common.pxd | 8 +- python/pyarrow/includes/libarrow_flight.pxd | 142 ++++++ python/pyarrow/ipc.pxi | 88 ++-- python/pyarrow/lib.pxd | 10 + python/setup.py | 13 +- 15 files changed, 1196 insertions(+), 75 deletions(-) create mode 100644 cpp/src/arrow/python/flight.cc create mode 100644 cpp/src/arrow/python/flight.h create mode 100644 python/examples/flight/client.py create mode 100644 python/examples/flight/server.py create mode 100644 python/pyarrow/_flight.pyx create mode 100644 python/pyarrow/flight.py create mode 100644 python/pyarrow/includes/libarrow_flight.pxd diff --git a/cpp/cmake_modules/FindArrow.cmake b/cpp/cmake_modules/FindArrow.cmake index f4b0a81afb8..3d5fd8ed70f 100644 --- a/cpp/cmake_modules/FindArrow.cmake +++ b/cpp/cmake_modules/FindArrow.cmake @@ -79,6 +79,14 @@ find_library(ARROW_PYTHON_LIB_PATH NAMES arrow_python NO_DEFAULT_PATH) get_filename_component(ARROW_PYTHON_LIBS ${ARROW_PYTHON_LIB_PATH} DIRECTORY) +if (PYARROW_BUILD_FLIGHT) + find_library(ARROW_FLIGHT_LIB_PATH NAMES arrow_flight + PATHS + ${ARROW_SEARCH_LIB_PATH} + NO_DEFAULT_PATH) + get_filename_component(ARROW_FLIGHT_LIBS ${ARROW_FLIGHT_LIB_PATH} DIRECTORY) +endif() + if (MSVC) SET(CMAKE_FIND_LIBRARY_SUFFIXES ".lib" ".dll") @@ -101,19 +109,25 @@ if (ARROW_INCLUDE_DIR AND ARROW_LIBS) set(ARROW_FOUND TRUE) set(ARROW_LIB_NAME arrow) set(ARROW_PYTHON_LIB_NAME arrow_python) + set(ARROW_FLIGHT_LIB_NAME arrow_flight) if (MSVC) set(ARROW_STATIC_LIB ${ARROW_LIBS}/${ARROW_LIB_NAME}${ARROW_MSVC_STATIC_LIB_SUFFIX}${CMAKE_STATIC_LIBRARY_SUFFIX}) set(ARROW_PYTHON_STATIC_LIB ${ARROW_PYTHON_LIBS}/${ARROW_PYTHON_LIB_NAME}${ARROW_MSVC_STATIC_LIB_SUFFIX}${CMAKE_STATIC_LIBRARY_SUFFIX}) + set(ARROW_FLIGHT_STATIC_LIB ${ARROW_FLIGHT_LIBS}/${ARROW_FLIGHT_LIB_NAME}${ARROW_MSVC_STATIC_LIB_SUFFIX}${CMAKE_STATIC_LIBRARY_SUFFIX}) set(ARROW_SHARED_LIB ${ARROW_SHARED_LIBS}/${ARROW_LIB_NAME}${CMAKE_SHARED_LIBRARY_SUFFIX}) set(ARROW_PYTHON_SHARED_LIB ${ARROW_PYTHON_SHARED_LIBS}/${ARROW_PYTHON_LIB_NAME}${CMAKE_SHARED_LIBRARY_SUFFIX}) + set(ARROW_FLIGHT_SHARED_LIB ${ARROW_FLIGHT_SHARED_LIBS}/${ARROW_FLIGHT_LIB_NAME}${CMAKE_SHARED_LIBRARY_SUFFIX}) set(ARROW_SHARED_IMP_LIB ${ARROW_LIBS}/${ARROW_LIB_NAME}.lib) set(ARROW_PYTHON_SHARED_IMP_LIB ${ARROW_PYTHON_LIBS}/${ARROW_PYTHON_LIB_NAME}.lib) + set(ARROW_FLIGHT_SHARED_IMP_LIB ${ARROW_FLIGHT_LIBS}/${ARROW_FLIGHT_LIB_NAME}.lib) else() set(ARROW_STATIC_LIB ${ARROW_LIBS}/lib${ARROW_LIB_NAME}.a) set(ARROW_PYTHON_STATIC_LIB ${ARROW_LIBS}/lib${ARROW_PYTHON_LIB_NAME}.a) + set(ARROW_FLIGHT_STATIC_LIB ${ARROW_LIBS}/lib${ARROW_FLIGHT_LIB_NAME}.a) set(ARROW_SHARED_LIB ${ARROW_LIBS}/lib${ARROW_LIB_NAME}${CMAKE_SHARED_LIBRARY_SUFFIX}) set(ARROW_PYTHON_SHARED_LIB ${ARROW_LIBS}/lib${ARROW_PYTHON_LIB_NAME}${CMAKE_SHARED_LIBRARY_SUFFIX}) + set(ARROW_FLIGHT_SHARED_LIB ${ARROW_LIBS}/lib${ARROW_FLIGHT_LIB_NAME}${CMAKE_SHARED_LIBRARY_SUFFIX}) endif() endif() diff --git a/cpp/src/arrow/flight/CMakeLists.txt b/cpp/src/arrow/flight/CMakeLists.txt index a32a5fa03c0..4650969eca2 100644 --- a/cpp/src/arrow/flight/CMakeLists.txt +++ b/cpp/src/arrow/flight/CMakeLists.txt @@ -21,7 +21,7 @@ add_custom_target(arrow_flight) arrow_install_all_headers("arrow/flight") set(ARROW_FLIGHT_STATIC_LINK_LIBS - protobuf_static + ${PROTOBUF_LIBRARY} grpc_grpcpp_static grpc_grpc_static grpc_gpr_static diff --git a/cpp/src/arrow/python/CMakeLists.txt b/cpp/src/arrow/python/CMakeLists.txt index 93dbd668655..df8f67a6276 100644 --- a/cpp/src/arrow/python/CMakeLists.txt +++ b/cpp/src/arrow/python/CMakeLists.txt @@ -28,30 +28,53 @@ add_custom_target(arrow_python-tests) add_dependencies(arrow_python-all arrow_python arrow_python-tests) set(ARROW_PYTHON_SRCS - arrow_to_pandas.cc - benchmark.cc - common.cc - config.cc - decimal.cc - deserialize.cc - helpers.cc - inference.cc - init.cc - io.cc - numpy_convert.cc - numpy_to_arrow.cc - python_to_arrow.cc - pyarrow.cc - serialize.cc) + arrow_to_pandas.cc + benchmark.cc + common.cc + config.cc + decimal.cc + deserialize.cc + helpers.cc + inference.cc + init.cc + io.cc + numpy_convert.cc + numpy_to_arrow.cc + python_to_arrow.cc + pyarrow.cc + serialize.cc +) + +if (ARROW_FLIGHT) + set(ARROW_PYTHON_SRCS ${ARROW_PYTHON_SRCS} flight.cc) +endif() -if("${COMPILER_FAMILY}" STREQUAL "clang") - set_property(SOURCE pyarrow.cc APPEND_STRING PROPERTY COMPILE_FLAGS " -Wno-cast-qual ") +if ("${COMPILER_FAMILY}" STREQUAL "clang") + set_property(SOURCE pyarrow.cc + APPEND_STRING + PROPERTY COMPILE_FLAGS + " -Wno-cast-qual ") endif() set(ARROW_PYTHON_SHARED_LINK_LIBS arrow_shared ${PYTHON_OTHER_LIBS}) -if(WIN32) - set(ARROW_PYTHON_SHARED_LINK_LIBS ${ARROW_PYTHON_SHARED_LINK_LIBS} ${PYTHON_LIBRARIES}) +if (ARROW_FLIGHT) + # Must link shared: we don't want to link more than one copy of gRPC + # into the eventual Cython shared object, otherwise gRPC calls fail + # with weird errors due to multiple copies of global static state + # (The other solution is to link gRPC shared everywhere instead of + # statically only in Flight) + set(ARROW_PYTHON_SHARED_LINK_LIBS + ${ARROW_PYTHON_SHARED_LINK_LIBS} + arrow_flight_shared + ) +endif() + +if (WIN32) + set(ARROW_PYTHON_SHARED_LINK_LIBS + ${ARROW_PYTHON_SHARED_LINK_LIBS} + ${PYTHON_LIBRARIES} + ) endif() set(ARROW_PYTHON_INCLUDES ${NUMPY_INCLUDE_DIRS} ${PYTHON_INCLUDE_DIRS}) diff --git a/cpp/src/arrow/python/flight.cc b/cpp/src/arrow/python/flight.cc new file mode 100644 index 00000000000..555033305cf --- /dev/null +++ b/cpp/src/arrow/python/flight.cc @@ -0,0 +1,86 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include + +#include "arrow/flight/internal.h" +#include "arrow/python/flight.h" + +namespace arrow { +namespace py { +namespace flight { + +PyFlightServer::PyFlightServer(PyObject* server, PyFlightServerVtable vtable) + : vtable_(vtable) { + Py_INCREF(server); + server_.reset(server); +} + +Status PyFlightServer::ListFlights( + const arrow::flight::Criteria* criteria, + std::unique_ptr* listings) { + return Status::NotImplemented("NYI"); +} + +Status PyFlightServer::GetFlightInfo(const arrow::flight::FlightDescriptor& request, + std::unique_ptr* info) { + PyAcquireGIL lock; + vtable_.get_flight_info(server_.obj(), request, info); + return CheckPyError(); +} + +Status PyFlightServer::DoGet(const arrow::flight::Ticket& request, + std::unique_ptr* stream) { + PyAcquireGIL lock; + vtable_.do_get(server_.obj(), request, stream); + return CheckPyError(); +} + +Status PyFlightServer::DoPut(std::unique_ptr reader) { + PyAcquireGIL lock; + vtable_.do_put(server_.obj(), std::move(reader)); + return CheckPyError(); +} + +Status PyFlightServer::DoAction(const arrow::flight::Action& action, + std::unique_ptr* result) { + return Status::NotImplemented("NYI"); +} + +Status PyFlightServer::ListActions(std::vector* actions) { + return Status::NotImplemented("NYI"); +} + +Status CreateFlightInfo(const std::shared_ptr& schema, + const arrow::flight::FlightDescriptor& descriptor, + const std::vector& endpoints, + uint64_t total_records, uint64_t total_bytes, + std::unique_ptr* out) { + arrow::flight::FlightInfo::Data flight_data; + RETURN_NOT_OK(arrow::flight::internal::SchemaToString(*schema, &flight_data.schema)); + flight_data.descriptor = descriptor; + flight_data.endpoints = endpoints; + flight_data.total_records = total_records; + flight_data.total_bytes = total_bytes; + arrow::flight::FlightInfo value(flight_data); + *out = std::unique_ptr(new arrow::flight::FlightInfo(value)); + return Status::OK(); +} + +} // namespace flight +} // namespace py +} // namespace arrow diff --git a/cpp/src/arrow/python/flight.h b/cpp/src/arrow/python/flight.h new file mode 100644 index 00000000000..774cf700041 --- /dev/null +++ b/cpp/src/arrow/python/flight.h @@ -0,0 +1,79 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#ifndef PYARROW_FLIGHT_H +#define PYARROW_FLIGHT_H + +#include +#include + +#include "arrow/flight/api.h" +#include "arrow/python/common.h" +#include "arrow/python/config.h" + +namespace arrow { + +namespace py { + +namespace flight { + +/// \brief A table of function pointers for calling from C++ into +/// Python. +class ARROW_PYTHON_EXPORT PyFlightServerVtable { + public: + std::function*)> + get_flight_info; + std::function)> + do_put; + std::function*)> + do_get; +}; + +class ARROW_PYTHON_EXPORT PyFlightServer : public arrow::flight::FlightServerBase { + public: + explicit PyFlightServer(PyObject* server, PyFlightServerVtable vtable); + + Status ListFlights(const arrow::flight::Criteria* criteria, + std::unique_ptr* listings) override; + Status GetFlightInfo(const arrow::flight::FlightDescriptor& request, + std::unique_ptr* info) override; + Status DoGet(const arrow::flight::Ticket& request, + std::unique_ptr* stream) override; + Status DoPut(std::unique_ptr reader) override; + Status DoAction(const arrow::flight::Action& action, + std::unique_ptr* result) override; + Status ListActions(std::vector* actions) override; + + private: + OwnedRefNoGIL server_; + PyFlightServerVtable vtable_; +}; + +ARROW_PYTHON_EXPORT +Status CreateFlightInfo(const std::shared_ptr& schema, + const arrow::flight::FlightDescriptor& descriptor, + const std::vector& endpoints, + uint64_t total_records, uint64_t total_bytes, + std::unique_ptr* out); + +} // namespace flight +} // namespace py +} // namespace arrow + +#endif // PYARROW_FLIGHT_H diff --git a/python/CMakeLists.txt b/python/CMakeLists.txt index 0559261d5bc..fb6eecab396 100644 --- a/python/CMakeLists.txt +++ b/python/CMakeLists.txt @@ -58,10 +58,21 @@ endif() # Top level cmake dir if("${CMAKE_SOURCE_DIR}" STREQUAL "${CMAKE_CURRENT_SOURCE_DIR}") - option(PYARROW_BUILD_CUDA "Build the PyArrow CUDA support" OFF) - option(PYARROW_BUILD_GANDIVA "Build the PyArrow Gandiva integration" OFF) - option(PYARROW_BUILD_PARQUET "Build the PyArrow Parquet integration" OFF) - option(PYARROW_PARQUET_USE_SHARED "Rely on parquet shared libraries where relevant" ON) + option(PYARROW_BUILD_CUDA + "Build the PyArrow CUDA support" + OFF) + option(PYARROW_BUILD_FLIGHT + "Build the PyArrow Flight integration" + OFF) + option(PYARROW_BUILD_GANDIVA + "Build the PyArrow Gandiva integration" + OFF) + option(PYARROW_BUILD_PARQUET + "Build the PyArrow Parquet integration" + OFF) + option(PYARROW_PARQUET_USE_SHARED + "Rely on parquet shared libraries where relevant" + ON) option(PYARROW_BOOST_USE_SHARED "Rely on boost shared libraries on linking static parquet" ON) option(PYARROW_BUILD_PLASMA "Build the PyArrow Plasma integration" OFF) @@ -191,7 +202,11 @@ include_directories(SYSTEM ${NUMPY_INCLUDE_DIRS} ${PYTHON_INCLUDE_DIRS} src) # Dependencies # -# Arrow +if(PYARROW_BUILD_FLIGHT) + set(ARROW_FLIGHT TRUE) +endif() + +## Arrow find_package(Arrow REQUIRED) include_directories(SYSTEM ${ARROW_INCLUDE_DIR}) @@ -349,12 +364,24 @@ if(PYARROW_BUNDLE_ARROW_CPP) endif() endif() -if(MSVC) - add_thirdparty_lib(arrow SHARED_LIB ${ARROW_SHARED_IMP_LIB}) - add_thirdparty_lib(arrow_python SHARED_LIB ${ARROW_PYTHON_SHARED_IMP_LIB}) +if (MSVC) + ADD_THIRDPARTY_LIB(arrow + SHARED_LIB ${ARROW_SHARED_IMP_LIB}) + ADD_THIRDPARTY_LIB(arrow_python + SHARED_LIB ${ARROW_PYTHON_SHARED_IMP_LIB}) + if (PYARROW_BUILD_FLIGHT) + ADD_THIRDPARTY_LIB(arrow_flight + SHARED_LIB ${ARROW_FLIGHT_SHARED_IMP_LIB}) + endif() else() - add_thirdparty_lib(arrow SHARED_LIB ${ARROW_SHARED_LIB}) - add_thirdparty_lib(arrow_python SHARED_LIB ${ARROW_PYTHON_SHARED_LIB}) + ADD_THIRDPARTY_LIB(arrow + SHARED_LIB ${ARROW_SHARED_LIB}) + ADD_THIRDPARTY_LIB(arrow_python + SHARED_LIB ${ARROW_PYTHON_SHARED_LIB}) + if (PYARROW_BUILD_FLIGHT) + ADD_THIRDPARTY_LIB(arrow_flight + SHARED_LIB ${ARROW_FLIGHT_SHARED_LIB}) + endif() endif() # @@ -474,8 +501,24 @@ if(PYARROW_BUILD_ORC) set(CYTHON_EXTENSIONS ${CYTHON_EXTENSIONS} _orc) endif() -# Gandiva -if(PYARROW_BUILD_GANDIVA) +## Flight +if (PYARROW_BUILD_FLIGHT) + if (PYARROW_BUNDLE_ARROW_CPP) + # TODO: + message(FATAL_ERROR "Not yet implemented: bundling arrow-flight in pyarrow") + endif() + # We do NOT want to link gRPC or any other Flight dependency + # here. Linking more than one copy leads to odd runtime errors due + # to multiple copies of static global state. Thus we also need to + # link Flight as a shared object. + set(LINK_LIBS ${LINK_LIBS} + arrow_flight_shared + ) + set(CYTHON_EXTENSIONS ${CYTHON_EXTENSIONS} _flight) +endif() + +## Gandiva +if (PYARROW_BUILD_GANDIVA) find_package(Gandiva) if(NOT GANDIVA_FOUND) diff --git a/python/examples/flight/client.py b/python/examples/flight/client.py new file mode 100644 index 00000000000..df9acc1cb89 --- /dev/null +++ b/python/examples/flight/client.py @@ -0,0 +1,139 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +"""An example Flight CLI client.""" + +import argparse +import sys + +import pyarrow +import pyarrow.flight + + +def list_flights(args, client): + print('Flights\n=======') + for flight in client.list_flights(): + descriptor = flight.descriptor + if descriptor.descriptor_type == pyarrow.flight.DescriptorType.PATH: + print("Path:", descriptor.path) + elif descriptor.descriptor_type == pyarrow.flight.DescriptorType.CMD: + print("Command:", descriptor.command) + else: + print("Unknown descriptor type") + + print("Total records:", end=" ") + if flight.total_records >= 0: + print(flight.total_records) + else: + print("Unknown") + + print("Total bytes:", end=" ") + if flight.total_bytes >= 0: + print(flight.total_bytes) + else: + print("Unknown") + + print("Number of endpoints:", len(flight.endpoints)) + + if args.list: + print(flight.schema) + + print('---') + + print('\nActions\n=======') + for action in client.list_actions(): + print("Type:", action.type) + print("Description:", action.description) + print('---') + + +def do_action(args, client): + try: + buf = pyarrow.allocate_buffer(0) + action = pyarrow.flight.Action(args.action_type, buf) + print('Running action', args.action_type) + for result in client.do_action(action): + print("Got result", result.body.to_pybytes()) + except pyarrow.lib.ArrowIOError as e: + print("Error calling action:", e) + + +def get_flight(args, client): + if args.path: + descriptor = pyarrow.flight.FlightDescriptor.for_path(*args.path) + else: + descriptor = pyarrow.flight.FlightDescriptor.for_command(args.command) + + info = client.get_flight_info(descriptor) + for endpoint in info.endpoints: + print('Ticket:', endpoint.ticket) + for location in endpoint.locations: + print(location) + get_client = pyarrow.flight.FlightClient.connect(location) + reader = get_client.do_get(endpoint.ticket, info.schema) + df = reader.read_pandas() + print(df) + + +def _add_common_arguments(parser): + parser.add_argument('host', type=str, + help="The host to connect to.") + + +def main(): + parser = argparse.ArgumentParser() + subcommands = parser.add_subparsers() + + cmd_list = subcommands.add_parser('list') + cmd_list.set_defaults(action='list') + _add_common_arguments(cmd_list) + cmd_list.add_argument('-l', '--list', action='store_true', + help="Print more details.") + + cmd_do = subcommands.add_parser('do') + cmd_do.set_defaults(action='do') + _add_common_arguments(cmd_do) + cmd_do.add_argument('action_type', type=str, + help="The action type to run.") + + cmd_get = subcommands.add_parser('get') + cmd_get.set_defaults(action='get') + _add_common_arguments(cmd_get) + cmd_get_descriptor = cmd_get.add_mutually_exclusive_group(required=True) + cmd_get_descriptor.add_argument('-p', '--path', type=str, action='append', + help="The path for the descriptor.") + cmd_get_descriptor.add_argument('-c', '--command', type=str, + help="The command for the descriptor.") + + args = parser.parse_args() + if not hasattr(args, 'action'): + parser.print_help() + sys.exit(1) + + commands = { + 'list': list_flights, + 'do': do_action, + 'get': get_flight, + } + host, port = args.host.split(':') + port = int(port) + client = pyarrow.flight.FlightClient.connect(host, port) + commands[args.action](args, client) + + +if __name__ == '__main__': + main() diff --git a/python/examples/flight/server.py b/python/examples/flight/server.py new file mode 100644 index 00000000000..cf8668a58ec --- /dev/null +++ b/python/examples/flight/server.py @@ -0,0 +1,56 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +"""An example Flight Python server.""" + +import pyarrow +import pyarrow.flight + + +class FlightServer(pyarrow.flight.FlightServerBase): + def __init__(self): + super(FlightServer, self).__init__() + self.flights = {} + + @classmethod + def descriptor_to_key(self, descriptor): + return (descriptor.descriptor_type, descriptor.command, + tuple(descriptor.path or tuple())) + + def get_flight_info(self, descriptor): + key = FlightServer.descriptor_to_key(descriptor) + if key in self.flights: + table = self.flights[key] + return pyarrow.flight.FlightInfo(table.schema, + descriptor, [], + table.num_rows, 0) + raise KeyError('Flight not found.') + + def do_put(self, descriptor, reader): + key = FlightServer.descriptor_to_key(descriptor) + print(key) + self.flights[key] = reader.read_all() + print(self.flights[key]) + + +def main(): + server = FlightServer() + server.run(5005) + + +if __name__ == '__main__': + main() diff --git a/python/pyarrow/_flight.pyx b/python/pyarrow/_flight.pyx new file mode 100644 index 00000000000..6e8b8fbe55b --- /dev/null +++ b/python/pyarrow/_flight.pyx @@ -0,0 +1,486 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# cython: language_level = 3 + +import collections +import enum + +from cython.operator cimport dereference as deref + +from pyarrow.compat import frombytes, tobytes +from pyarrow.lib cimport * +from pyarrow.lib import as_buffer +from pyarrow.includes.libarrow_flight cimport * +from pyarrow.ipc import _ReadPandasOption +import pyarrow.lib as lib + + +cdef class Action: + """An action executable on a Flight service.""" + cdef: + CAction action + + def __init__(self, action_type, buf): + self.action.type = tobytes(action_type) + self.action.body = pyarrow_unwrap_buffer(as_buffer(buf)) + + @property + def type(self): + return frombytes(self.action.type) + + def body(self): + return pyarrow_wrap_buffer(self.action.body) + + +cdef class ActionType: + """A type of action executable on a Flight service.""" + cdef: + CActionType action_type + + @property + def type(self): + return frombytes(self.action_type.type) + + @property + def description(self): + return frombytes(self.action_type.description) + + def make_action(self, buf): + """Create an Action with this type.""" + return Action(self.type, buf) + + def __repr__(self): + return ''.format( + self.type, self.description) + + +cdef class Result: + """A result from executing an Action.""" + cdef: + unique_ptr[CResult] result + + @property + def body(self): + """Get the Buffer containing the result.""" + return pyarrow_wrap_buffer(self.result.get().body) + + +class DescriptorType(enum.Enum): + UNKNOWN = 0 + PATH = 1 + CMD = 2 + + +cdef class FlightDescriptor: + """A description of a data stream available from a Flight service.""" + cdef: + CFlightDescriptor descriptor + + def __init__(self): + raise TypeError("Do not call {}'s constructor directly, use " + "`pyarrow.flight.FlightDescriptor.for_{path,command}` " + "function instead." + .format(self.__class__.__name__)) + + @staticmethod + def for_path(*path): + """Create a FlightDescriptor for a resource path.""" + cdef FlightDescriptor result = \ + FlightDescriptor.__new__(FlightDescriptor) + result.descriptor.type = CDescriptorTypePath + result.descriptor.path = [tobytes(p) for p in path] + return result + + @staticmethod + def for_command(command): + """Create a FlightDescriptor for an opaque command.""" + cdef FlightDescriptor result = \ + FlightDescriptor.__new__(FlightDescriptor) + result.descriptor.type = CDescriptorTypeCmd + result.descriptor.cmd = tobytes(command) + return result + + @property + def descriptor_type(self): + if self.descriptor.type == CDescriptorTypeUnknown: + return DescriptorType.UNKNOWN + elif self.descriptor.type == CDescriptorTypePath: + return DescriptorType.PATH + elif self.descriptor.type == CDescriptorTypeCmd: + return DescriptorType.CMD + raise RuntimeError("Invalid descriptor type!") + + @property + def command(self): + """Get the command for this descriptor.""" + if self.descriptor_type != DescriptorType.CMD: + return None + return self.descriptor.cmd + + @property + def path(self): + """Get the path for this descriptor.""" + if self.descriptor_type != DescriptorType.PATH: + return None + return self.descriptor.path + + def __repr__(self): + return "".format(self.descriptor_type()) + + +class Ticket: + """A ticket for requesting a Flight stream.""" + def __init__(self, ticket): + self.ticket = ticket + + def __repr__(self): + return ''.format(self.ticket) + + +class Location(collections.namedtuple('Location', ['host', 'port'])): + """A location where a Flight stream is available.""" + + +cdef class FlightEndpoint: + """A Flight stream, along with the ticket and locations to access it.""" + cdef: + CFlightEndpoint endpoint + + def __init__(self, ticket, locations): + """Create a FlightEndpoint from a ticket and list of locations. + + Parameters + ---------- + ticket : Ticket or bytes + the ticket needed to access this flight + locations : list of Location or tuples of (host, port) + locations where this flight is available + """ + cdef: + CLocation c_location = CLocation() + + if isinstance(ticket, Ticket): + self.endpoint.ticket.ticket = ticket.ticket + else: + self.endpoint.ticket.ticket = ticket + + for location in locations: + # Accepts Location namedtuple or tuple + c_location.host = tobytes(location[0]) + c_location.port = location[1] + self.endpoint.locations.push_back(c_location) + + @property + def ticket(self): + return Ticket(self.endpoint.ticket.ticket) + + @property + def locations(self): + return [Location(frombytes(location.host), location.port) + for location in self.endpoint.locations] + + +cdef class FlightInfo: + """A description of a Flight stream.""" + cdef: + unique_ptr[CFlightInfo] info + + def __init__(self, Schema schema, FlightDescriptor descriptor, endpoints, + total_records, total_bytes): + """Create a FlightInfo object from a schema, descriptor, and endpoints. + + Parameters + ---------- + schema : Schema + the schema of the data in this flight. + descriptor : FlightDescriptor + the descriptor for this flight. + endpoints : list of FlightEndpoint + a list of endpoints where this flight is available. + total_records : int + the total records in this flight, or -1 if unknown + total_bytes : int + the total bytes in this flight, or -1 if unknown + """ + cdef: + shared_ptr[CSchema] c_schema = pyarrow_unwrap_schema(schema) + vector[CFlightEndpoint] c_endpoints + + for endpoint in endpoints: + if isinstance(endpoint, FlightEndpoint): + c_endpoints.push_back(( endpoint).endpoint) + else: + raise TypeError('Endpoint {} is not instance of' + ' FlightEndpoint'.format(endpoint)) + + check_status(CreateFlightInfo(c_schema, + descriptor.descriptor, + c_endpoints, + total_records, + total_bytes, &self.info)) + + @property + def total_records(self): + """The total record count of this flight, or -1 if unknown.""" + return self.info.get().total_records() + + @property + def total_bytes(self): + """The size in bytes of the data in this flight, or -1 if unknown.""" + return self.info.get().total_bytes() + + @property + def schema(self): + """The schema of the data in this flight.""" + cdef: + shared_ptr[CSchema] schema + check_status(self.info.get().GetSchema(&schema)) + return pyarrow_wrap_schema(schema) + + @property + def descriptor(self): + """The descriptor of the data in this flight.""" + cdef FlightDescriptor result = \ + FlightDescriptor.__new__(FlightDescriptor) + result.descriptor = self.info.get().descriptor() + return result + + @property + def endpoints(self): + """The endpoints where this flight is available.""" + # TODO: get Cython to iterate over reference directly + cdef: + vector[CFlightEndpoint] endpoints = self.info.get().endpoints() + FlightEndpoint py_endpoint + + result = [] + for endpoint in endpoints: + py_endpoint = FlightEndpoint.__new__() + py_endpoint.endpoint = endpoint + result.append(py_endpoint) + return result + + +cdef class FlightRecordBatchReader(_CRecordBatchReader, _ReadPandasOption): + cdef dict __dict__ + + +cdef class FlightRecordBatchWriter(_CRecordBatchWriter): + pass + + +cdef class FlightClient: + """A client to a Flight service.""" + cdef: + unique_ptr[CFlightClient] client + + def __init__(self): + raise TypeError("Do not call {}'s constructor directly, use " + "`pyarrow.flight.FlightClient.connect` instead." + .format(self.__class__.__name__)) + + @staticmethod + def connect(*args): + """Connect to a Flight service on the given host and port.""" + cdef: + FlightClient result = FlightClient.__new__(FlightClient) + int c_port = 0 + c_string c_host + + if len(args) == 1: + # Accept namedtuple or plain tuple + c_host = tobytes(args[0][0]) + c_port = args[0][1] + elif len(args) == 2: + # Accept separate host, port + c_host = tobytes(args[0]) + c_port = args[1] + else: + raise TypeError("FlightClient.connect() takes 1 " + "or 2 arguments ({} given)".format(len(args))) + + with nogil: + check_status(CFlightClient.Connect(c_host, c_port, &result.client)) + + return result + + def list_actions(self): + """List the actions available on a service.""" + cdef: + vector[CActionType] results + + with nogil: + check_status(self.client.get().ListActions(&results)) + + result = [] + for action_type in results: + py_action = ActionType() + py_action.action_type = action_type + result.append(py_action) + + return result + + def do_action(self, action: Action): + """Execute an action on a service.""" + cdef: + unique_ptr[CResultStream] results + with nogil: + check_status(self.client.get().DoAction(action.action, &results)) + + while True: + result = Result() + with nogil: + check_status(results.get().Next(&result.result)) + if result.result == NULL: + break + yield result + + def list_flights(self): + """List the flights available on a service.""" + cdef: + unique_ptr[CFlightListing] listing + FlightInfo result + + with nogil: + check_status(self.client.get().ListFlights(&listing)) + + while True: + result = FlightInfo.__new__(FlightInfo) + with nogil: + check_status(listing.get().Next(&result.info)) + if result.info == NULL: + break + yield result + + def get_flight_info(self, descriptor: FlightDescriptor): + """Request information about an available flight.""" + cdef: + FlightInfo result = FlightInfo.__new__(FlightInfo) + + with nogil: + check_status(self.client.get().GetFlightInfo( + descriptor.descriptor, &result.info)) + + return result + + def do_get(self, ticket: Ticket, schema: Schema): + """Request the data for a flight.""" + cdef: + # TODO: introduce unwrap + CTicket c_ticket + shared_ptr[CSchema] c_schema = pyarrow_unwrap_schema(schema) + unique_ptr[CRecordBatchReader] reader + + c_ticket.ticket = ticket.ticket + with nogil: + check_status(self.client.get().DoGet(c_ticket, c_schema, &reader)) + result = FlightRecordBatchReader() + result.reader.reset(reader.release()) + return result + + def do_put(self, descriptor: FlightDescriptor, schema: Schema): + """Upload data to a flight.""" + cdef: + shared_ptr[CSchema] c_schema = pyarrow_unwrap_schema(schema) + unique_ptr[CRecordBatchWriter] writer + + with nogil: + check_status(self.client.get().DoPut( + descriptor.descriptor, c_schema, &writer)) + result = FlightRecordBatchWriter() + result.writer.reset(writer.release()) + return result + + +cdef class FlightDataStream: + cdef: + unique_ptr[CFlightDataStream] stream + + +cdef class RecordBatchStream(FlightDataStream): + def __init__(self, reader): + # TODO: we don't really expose the readers in Python. + # self.stream.reset(None) + pass + + +cdef void _get_flight_info(void* self, CFlightDescriptor c_descriptor, + unique_ptr[CFlightInfo]* info): + """Callback for implementing Flight servers in Python.""" + cdef FlightDescriptor descriptor = \ + FlightDescriptor.__new__(FlightDescriptor) + descriptor.descriptor = c_descriptor + result = ( self).get_flight_info(descriptor) + if not result: + info.reset(NULL) + # TODO: + else: + # TODO: + pass + + +cdef void _do_put(void* self, unique_ptr[CFlightMessageReader] reader): + """Callback for implementing Flight servers in Python.""" + cdef: + FlightRecordBatchReader py_reader = FlightRecordBatchReader() + FlightDescriptor descriptor = \ + FlightDescriptor.__new__(FlightDescriptor) + + descriptor.descriptor = reader.get().descriptor() + py_reader.reader.reset(reader.release()) + ( self).do_put(descriptor, py_reader) + + +cdef void _do_get(void* self, CTicket ticket, + unique_ptr[CFlightDataStream]* stream): + """Callback for implementing Flight servers in Python.""" + py_ticket = Ticket() + py_ticket.ticket = ticket.ticket + result = ( self).do_get(py_ticket) + if not isinstance(result, FlightDataStream): + raise TypeError("FlightServerBase.do_get must return " + "a FlightDataStream") + stream[0] = move(( result).stream) + + +cdef class FlightServerBase: + """A Flight service definition.""" + + cdef: + unique_ptr[PyFlightServer] server + + def run(self, port): + cdef: + PyFlightServerVtable vtable = PyFlightServerVtable() + int c_port = port + vtable.get_flight_info = &_get_flight_info + vtable.do_put = &_do_put + vtable.do_get = &_do_get + self.server.reset(new PyFlightServer(self, vtable)) + with nogil: + self.server.get().Run(c_port) + + def get_flight_info(self, descriptor): + raise NotImplementedError + + def do_put(self, descriptor, reader): + pass + + def shutdown(self): + # TODO: null check + self.server.get().Shutdown() diff --git a/python/pyarrow/flight.py b/python/pyarrow/flight.py new file mode 100644 index 00000000000..d238c3715af --- /dev/null +++ b/python/pyarrow/flight.py @@ -0,0 +1,20 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from pyarrow._flight import (FlightClient, Action, ActionType, # noqa + FlightDescriptor, FlightInfo, Ticket, Location, + FlightServerBase, DescriptorType) diff --git a/python/pyarrow/includes/common.pxd b/python/pyarrow/includes/common.pxd index 1b13ff0c127..97e23f98504 100644 --- a/python/pyarrow/includes/common.pxd +++ b/python/pyarrow/includes/common.pxd @@ -41,8 +41,12 @@ cdef extern from "numpy/halffloat.h": cdef extern from "arrow/api.h" namespace "arrow" nogil: # We can later add more of the common status factory methods as needed - cdef CStatus CStatus_OK "Status::OK"() - cdef CStatus CStatus_Invalid "Status::Invalid"() + cdef CStatus CStatus_OK "arrow::Status::OK"() + cdef CStatus CStatus_Invalid "arrow::Status::Invalid"() + cdef CStatus CStatus_NotImplemented \ + "arrow::Status::NotImplemented"(const c_string& msg) + cdef CStatus CStatus_UnknownError \ + "arrow::Status::UnknownError"(const c_string& msg) cdef cppclass CStatus "arrow::Status": CStatus() diff --git a/python/pyarrow/includes/libarrow_flight.pxd b/python/pyarrow/includes/libarrow_flight.pxd new file mode 100644 index 00000000000..4d64f521e11 --- /dev/null +++ b/python/pyarrow/includes/libarrow_flight.pxd @@ -0,0 +1,142 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# distutils: language = c++ + +from libcpp.functional cimport function + +from pyarrow.includes.common cimport * +from pyarrow.includes.libarrow cimport * + + +cdef extern from "arrow/flight/api.h" namespace "arrow" nogil: + cdef cppclass CActionType" arrow::flight::ActionType": + c_string type + c_string description + + cdef cppclass CAction" arrow::flight::Action": + c_string type + shared_ptr[CBuffer] body + + cdef cppclass CResult" arrow::flight::Result": + shared_ptr[CBuffer] body + + cdef cppclass CResultStream" arrow::flight::ResultStream": + CStatus Next(unique_ptr[CResult]* result) + + cdef cppclass CDescriptorType \ + " arrow::flight::FlightDescriptor::DescriptorType": + bint operator==(CDescriptorType) + + CDescriptorType CDescriptorTypeUnknown\ + " arrow::flight::FlightDescriptor::UNKNOWN" + CDescriptorType CDescriptorTypePath\ + " arrow::flight::FlightDescriptor::PATH" + CDescriptorType CDescriptorTypeCmd\ + " arrow::flight::FlightDescriptor::CMD" + + cdef cppclass CFlightDescriptor" arrow::flight::FlightDescriptor": + CDescriptorType type + c_string cmd + vector[c_string] path + + cdef cppclass CTicket" arrow::flight::Ticket": + CTicket() + c_string ticket + + cdef cppclass CLocation" arrow::flight::Location": + CLocation() + + c_string host + int32_t port + + cdef cppclass CFlightEndpoint" arrow::flight::FlightEndpoint": + CFlightEndpoint() + + CTicket ticket + vector[CLocation] locations + + cdef cppclass CFlightInfo" arrow::flight::FlightInfo": + uint64_t total_records() + uint64_t total_bytes() + CStatus GetSchema(shared_ptr[CSchema]* out) + CFlightDescriptor& descriptor() + const vector[CFlightEndpoint]& endpoints() + + cdef cppclass CFlightListing" arrow::flight::FlightListing": + CStatus Next(unique_ptr[CFlightInfo]* info) + + cdef cppclass CFlightMessageReader \ + " arrow::flight::FlightMessageReader"(CRecordBatchReader): + CFlightDescriptor& descriptor() + + cdef cppclass CFlightDataStream" arrow::flight::FlightDataStream": + pass + + cdef cppclass CRecordBatchStream \ + " arrow::flight::RecordBatchStream"(CFlightDataStream): + CRecordBatchStream(shared_ptr[CRecordBatchReader]& reader) + + cdef cppclass CFlightClient" arrow::flight::FlightClient": + @staticmethod + CStatus Connect(const c_string& host, int port, + unique_ptr[CFlightClient]* client) + + CStatus DoAction(CAction& action, unique_ptr[CResultStream]* results) + CStatus ListActions(vector[CActionType]* actions) + + CStatus ListFlights(unique_ptr[CFlightListing]* listing) + CStatus GetFlightInfo(CFlightDescriptor& descriptor, + unique_ptr[CFlightInfo]* info) + + CStatus DoGet(CTicket& ticket, shared_ptr[CSchema]& schema, + unique_ptr[CRecordBatchReader]* stream) + CStatus DoPut(CFlightDescriptor& descriptor, + shared_ptr[CSchema]& schema, + unique_ptr[CRecordBatchWriter]* stream) + + +# Callbacks for implementing Flight servers +# Use typedef to emulate syntax for std::function +ctypedef void cb_get_flight_info(object, const CFlightDescriptor&, + unique_ptr[CFlightInfo]*) +ctypedef void cb_do_put(object, unique_ptr[CFlightMessageReader]) +ctypedef void cb_do_get(object, const CTicket&, + unique_ptr[CFlightDataStream]*) + +cdef extern from "arrow/python/flight.h" namespace "arrow::py::flight" nogil: + cdef cppclass PyFlightServerVtable: + PyFlightServerVtable() + function[cb_get_flight_info] get_flight_info + function[cb_do_put] do_put + function[cb_do_get] do_get + + cdef cppclass PyFlightServer: + PyFlightServer(object server, PyFlightServerVtable vtable) + void Run(int port) + void Shutdown() + + cdef CStatus CreateFlightInfo" arrow::py::flight::CreateFlightInfo"( + shared_ptr[CSchema] schema, + CFlightDescriptor& descriptor, + vector[CFlightEndpoint] endpoints, + uint64_t total_records, + uint64_t total_bytes, + unique_ptr[CFlightInfo]* out) + +cdef extern from "" namespace "std": + unique_ptr[CFlightDataStream] move(unique_ptr[CFlightDataStream]) diff --git a/python/pyarrow/ipc.pxi b/python/pyarrow/ipc.pxi index 137d5261d24..c396bb9efac 100644 --- a/python/pyarrow/ipc.pxi +++ b/python/pyarrow/ipc.pxi @@ -148,27 +148,7 @@ cdef class MessageReader: # ---------------------------------------------------------------------- # File and stream readers and writers -cdef class _RecordBatchWriter: - cdef: - shared_ptr[CRecordBatchWriter] writer - shared_ptr[OutputStream] sink - bint closed - - def __cinit__(self): - pass - - def __dealloc__(self): - pass - - def _open(self, sink, Schema schema): - get_writer(sink, &self.sink) - - with nogil: - check_status( - CRecordBatchStreamWriter.Open(self.sink.get(), - schema.sp_schema, - &self.writer)) - +cdef class _CRecordBatchWriter: def write(self, table_or_batch): """ Write RecordBatch or Table to stream @@ -222,6 +202,33 @@ cdef class _RecordBatchWriter: with nogil: check_status(self.writer.get().Close()) + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + self.close() + + +cdef class _RecordBatchWriter(_CRecordBatchWriter): + cdef: + shared_ptr[OutputStream] sink + bint closed + + def __cinit__(self): + pass + + def __dealloc__(self): + pass + + def _open(self, sink, Schema schema): + get_writer(sink, &self.sink) + + with nogil: + check_status( + CRecordBatchStreamWriter.Open(self.sink.get(), + schema.sp_schema, + &self.writer)) + cdef _get_input_stream(object source, shared_ptr[InputStream]* out): cdef: @@ -237,25 +244,7 @@ cdef _get_input_stream(object source, shared_ptr[InputStream]* out): out[0] = file_handle -cdef class _RecordBatchReader: - cdef: - shared_ptr[CRecordBatchReader] reader - shared_ptr[InputStream] in_stream - - cdef readonly: - Schema schema - - def __cinit__(self): - pass - - def _open(self, source): - _get_input_stream(source, &self.in_stream) - with nogil: - check_status(CRecordBatchStreamReader.Open( - self.in_stream.get(), &self.reader)) - - self.schema = pyarrow_wrap_schema(self.reader.get().schema()) - +cdef class _CRecordBatchReader: def __iter__(self): while True: yield self.read_next_batch() @@ -291,6 +280,25 @@ cdef class _RecordBatchReader: return pyarrow_wrap_table(table) +cdef class _RecordBatchReader(_CRecordBatchReader): + cdef: + shared_ptr[InputStream] in_stream + + cdef readonly: + Schema schema + + def __cinit__(self): + pass + + def _open(self, source): + _get_input_stream(source, &self.in_stream) + with nogil: + check_status(CRecordBatchStreamReader.Open( + self.in_stream.get(), &self.reader)) + + self.schema = pyarrow_wrap_schema(self.reader.get().schema()) + + cdef class _RecordBatchFileWriter(_RecordBatchWriter): def _open(self, sink, Schema schema): diff --git a/python/pyarrow/lib.pxd b/python/pyarrow/lib.pxd index 8cd8f401a27..6f14e767ee6 100644 --- a/python/pyarrow/lib.pxd +++ b/python/pyarrow/lib.pxd @@ -383,6 +383,16 @@ cdef class NativeFile: cdef shared_ptr[OutputStream] get_output_stream(self) except * +cdef class _CRecordBatchWriter: + cdef: + shared_ptr[CRecordBatchWriter] writer + + +cdef class _CRecordBatchReader: + cdef: + shared_ptr[CRecordBatchReader] reader + + cdef get_input_stream(object source, c_bool use_memory_map, shared_ptr[InputStream]* reader) cdef get_reader(object source, c_bool use_memory_map, diff --git a/python/setup.py b/python/setup.py index e4a793686d0..0fc89c0f7c6 100755 --- a/python/setup.py +++ b/python/setup.py @@ -99,6 +99,7 @@ def run(self): ('boost-namespace=', None, 'namespace of boost (default: boost)'), ('with-cuda', None, 'build the Cuda extension'), + ('with-flight', None, 'build the Flight extension'), ('with-parquet', None, 'build the Parquet extension'), ('with-static-parquet', None, 'link parquet statically'), ('with-static-boost', None, 'link boost statically'), @@ -136,6 +137,8 @@ def initialize_options(self): self.with_cuda = strtobool( os.environ.get('PYARROW_WITH_CUDA', '0')) + self.with_flight = strtobool( + os.environ.get('PYARROW_WITH_FLIGHT', '0')) self.with_parquet = strtobool( os.environ.get('PYARROW_WITH_PARQUET', '0')) self.with_static_parquet = strtobool( @@ -161,6 +164,7 @@ def initialize_options(self): 'lib', '_csv', '_cuda', + '_flight', '_parquet', '_orc', '_plasma', @@ -200,6 +204,8 @@ def _run_cmake(self): cmake_options += ['-G', self.cmake_generator] if self.with_cuda: cmake_options.append('-DPYARROW_BUILD_CUDA=on') + if self.with_flight: + cmake_options.append('-DPYARROW_BUILD_FLIGHT=on') if self.with_parquet: cmake_options.append('-DPYARROW_BUILD_PARQUET=on') if self.with_static_parquet: @@ -344,6 +350,8 @@ def _run_cmake(self): move_shared_libs(build_prefix, build_lib, "arrow_python") if self.with_cuda: move_shared_libs(build_prefix, build_lib, "arrow_gpu") + if self.with_flight: + move_shared_libs(build_prefix, build_lib, "arrow_flight") if self.with_plasma: move_shared_libs(build_prefix, build_lib, "plasma") if self.with_gandiva: @@ -384,6 +392,8 @@ def _failure_permitted(self, name): return True if name == '_orc' and not self.with_orc: return True + if name == '_flight' and not self.with_flight: + return True if name == '_cuda' and not self.with_cuda: return True if name == 'gandiva' and not self.with_gandiva: @@ -530,7 +540,8 @@ def has_ext_modules(foo): install_requires = ( 'numpy >= 1.14', 'six >= 1.0.0', - 'futures; python_version < "3.2"' + 'futures; python_version < "3.2"', + 'enum34 >= 1.1.6; python_version < "3.4"', ) From 7764444775fc510586e66844570b010a55501b37 Mon Sep 17 00:00:00 2001 From: Wes McKinney Date: Mon, 11 Feb 2019 16:40:38 -0600 Subject: [PATCH 2/5] Reformat cmake --- cpp/src/arrow/python/CMakeLists.txt | 56 +++++++++++---------------- python/CMakeLists.txt | 60 ++++++++++------------------- 2 files changed, 44 insertions(+), 72 deletions(-) diff --git a/cpp/src/arrow/python/CMakeLists.txt b/cpp/src/arrow/python/CMakeLists.txt index df8f67a6276..9cf7eeb630f 100644 --- a/cpp/src/arrow/python/CMakeLists.txt +++ b/cpp/src/arrow/python/CMakeLists.txt @@ -28,53 +28,43 @@ add_custom_target(arrow_python-tests) add_dependencies(arrow_python-all arrow_python arrow_python-tests) set(ARROW_PYTHON_SRCS - arrow_to_pandas.cc - benchmark.cc - common.cc - config.cc - decimal.cc - deserialize.cc - helpers.cc - inference.cc - init.cc - io.cc - numpy_convert.cc - numpy_to_arrow.cc - python_to_arrow.cc - pyarrow.cc - serialize.cc -) - -if (ARROW_FLIGHT) + arrow_to_pandas.cc + benchmark.cc + common.cc + config.cc + decimal.cc + deserialize.cc + helpers.cc + inference.cc + init.cc + io.cc + numpy_convert.cc + numpy_to_arrow.cc + python_to_arrow.cc + pyarrow.cc + serialize.cc) + +if(ARROW_FLIGHT) set(ARROW_PYTHON_SRCS ${ARROW_PYTHON_SRCS} flight.cc) endif() -if ("${COMPILER_FAMILY}" STREQUAL "clang") - set_property(SOURCE pyarrow.cc - APPEND_STRING - PROPERTY COMPILE_FLAGS - " -Wno-cast-qual ") +if("${COMPILER_FAMILY}" STREQUAL "clang") + set_property(SOURCE pyarrow.cc APPEND_STRING PROPERTY COMPILE_FLAGS " -Wno-cast-qual ") endif() set(ARROW_PYTHON_SHARED_LINK_LIBS arrow_shared ${PYTHON_OTHER_LIBS}) -if (ARROW_FLIGHT) +if(ARROW_FLIGHT) # Must link shared: we don't want to link more than one copy of gRPC # into the eventual Cython shared object, otherwise gRPC calls fail # with weird errors due to multiple copies of global static state # (The other solution is to link gRPC shared everywhere instead of # statically only in Flight) - set(ARROW_PYTHON_SHARED_LINK_LIBS - ${ARROW_PYTHON_SHARED_LINK_LIBS} - arrow_flight_shared - ) + set(ARROW_PYTHON_SHARED_LINK_LIBS ${ARROW_PYTHON_SHARED_LINK_LIBS} arrow_flight_shared) endif() -if (WIN32) - set(ARROW_PYTHON_SHARED_LINK_LIBS - ${ARROW_PYTHON_SHARED_LINK_LIBS} - ${PYTHON_LIBRARIES} - ) +if(WIN32) + set(ARROW_PYTHON_SHARED_LINK_LIBS ${ARROW_PYTHON_SHARED_LINK_LIBS} ${PYTHON_LIBRARIES}) endif() set(ARROW_PYTHON_INCLUDES ${NUMPY_INCLUDE_DIRS} ${PYTHON_INCLUDE_DIRS}) diff --git a/python/CMakeLists.txt b/python/CMakeLists.txt index fb6eecab396..d27d3c53444 100644 --- a/python/CMakeLists.txt +++ b/python/CMakeLists.txt @@ -58,21 +58,11 @@ endif() # Top level cmake dir if("${CMAKE_SOURCE_DIR}" STREQUAL "${CMAKE_CURRENT_SOURCE_DIR}") - option(PYARROW_BUILD_CUDA - "Build the PyArrow CUDA support" - OFF) - option(PYARROW_BUILD_FLIGHT - "Build the PyArrow Flight integration" - OFF) - option(PYARROW_BUILD_GANDIVA - "Build the PyArrow Gandiva integration" - OFF) - option(PYARROW_BUILD_PARQUET - "Build the PyArrow Parquet integration" - OFF) - option(PYARROW_PARQUET_USE_SHARED - "Rely on parquet shared libraries where relevant" - ON) + option(PYARROW_BUILD_CUDA "Build the PyArrow CUDA support" OFF) + option(PYARROW_BUILD_FLIGHT "Build the PyArrow Flight integration" OFF) + option(PYARROW_BUILD_GANDIVA "Build the PyArrow Gandiva integration" OFF) + option(PYARROW_BUILD_PARQUET "Build the PyArrow Parquet integration" OFF) + option(PYARROW_PARQUET_USE_SHARED "Rely on parquet shared libraries where relevant" ON) option(PYARROW_BOOST_USE_SHARED "Rely on boost shared libraries on linking static parquet" ON) option(PYARROW_BUILD_PLASMA "Build the PyArrow Plasma integration" OFF) @@ -206,7 +196,7 @@ if(PYARROW_BUILD_FLIGHT) set(ARROW_FLIGHT TRUE) endif() -## Arrow +# Arrow find_package(Arrow REQUIRED) include_directories(SYSTEM ${ARROW_INCLUDE_DIR}) @@ -364,23 +354,17 @@ if(PYARROW_BUNDLE_ARROW_CPP) endif() endif() -if (MSVC) - ADD_THIRDPARTY_LIB(arrow - SHARED_LIB ${ARROW_SHARED_IMP_LIB}) - ADD_THIRDPARTY_LIB(arrow_python - SHARED_LIB ${ARROW_PYTHON_SHARED_IMP_LIB}) - if (PYARROW_BUILD_FLIGHT) - ADD_THIRDPARTY_LIB(arrow_flight - SHARED_LIB ${ARROW_FLIGHT_SHARED_IMP_LIB}) +if(MSVC) + ADD_THIRDPARTY_LIB(arrow SHARED_LIB ${ARROW_SHARED_IMP_LIB}) + ADD_THIRDPARTY_LIB(arrow_python SHARED_LIB ${ARROW_PYTHON_SHARED_IMP_LIB}) + if(PYARROW_BUILD_FLIGHT) + ADD_THIRDPARTY_LIB(arrow_flight SHARED_LIB ${ARROW_FLIGHT_SHARED_IMP_LIB}) endif() else() - ADD_THIRDPARTY_LIB(arrow - SHARED_LIB ${ARROW_SHARED_LIB}) - ADD_THIRDPARTY_LIB(arrow_python - SHARED_LIB ${ARROW_PYTHON_SHARED_LIB}) - if (PYARROW_BUILD_FLIGHT) - ADD_THIRDPARTY_LIB(arrow_flight - SHARED_LIB ${ARROW_FLIGHT_SHARED_LIB}) + ADD_THIRDPARTY_LIB(arrow SHARED_LIB ${ARROW_SHARED_LIB}) + ADD_THIRDPARTY_LIB(arrow_python SHARED_LIB ${ARROW_PYTHON_SHARED_LIB}) + if(PYARROW_BUILD_FLIGHT) + ADD_THIRDPARTY_LIB(arrow_flight SHARED_LIB ${ARROW_FLIGHT_SHARED_LIB}) endif() endif() @@ -501,9 +485,9 @@ if(PYARROW_BUILD_ORC) set(CYTHON_EXTENSIONS ${CYTHON_EXTENSIONS} _orc) endif() -## Flight -if (PYARROW_BUILD_FLIGHT) - if (PYARROW_BUNDLE_ARROW_CPP) +# Flight +if(PYARROW_BUILD_FLIGHT) + if(PYARROW_BUNDLE_ARROW_CPP) # TODO: message(FATAL_ERROR "Not yet implemented: bundling arrow-flight in pyarrow") endif() @@ -511,14 +495,12 @@ if (PYARROW_BUILD_FLIGHT) # here. Linking more than one copy leads to odd runtime errors due # to multiple copies of static global state. Thus we also need to # link Flight as a shared object. - set(LINK_LIBS ${LINK_LIBS} - arrow_flight_shared - ) + set(LINK_LIBS ${LINK_LIBS} arrow_flight_shared) set(CYTHON_EXTENSIONS ${CYTHON_EXTENSIONS} _flight) endif() -## Gandiva -if (PYARROW_BUILD_GANDIVA) +# Gandiva +if(PYARROW_BUILD_GANDIVA) find_package(Gandiva) if(NOT GANDIVA_FOUND) From e1c298ad36898295302920b4d5fac8d341275764 Mon Sep 17 00:00:00 2001 From: David Li Date: Tue, 12 Feb 2019 09:18:48 -0500 Subject: [PATCH 3/5] Lint CMake files --- python/CMakeLists.txt | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/python/CMakeLists.txt b/python/CMakeLists.txt index d27d3c53444..63a8cd09cbc 100644 --- a/python/CMakeLists.txt +++ b/python/CMakeLists.txt @@ -355,16 +355,16 @@ if(PYARROW_BUNDLE_ARROW_CPP) endif() if(MSVC) - ADD_THIRDPARTY_LIB(arrow SHARED_LIB ${ARROW_SHARED_IMP_LIB}) - ADD_THIRDPARTY_LIB(arrow_python SHARED_LIB ${ARROW_PYTHON_SHARED_IMP_LIB}) + add_thirdparty_lib(arrow SHARED_LIB ${ARROW_SHARED_IMP_LIB}) + add_thirdparty_lib(arrow_python SHARED_LIB ${ARROW_PYTHON_SHARED_IMP_LIB}) if(PYARROW_BUILD_FLIGHT) - ADD_THIRDPARTY_LIB(arrow_flight SHARED_LIB ${ARROW_FLIGHT_SHARED_IMP_LIB}) + add_thirdparty_lib(arrow_flight SHARED_LIB ${ARROW_FLIGHT_SHARED_IMP_LIB}) endif() else() - ADD_THIRDPARTY_LIB(arrow SHARED_LIB ${ARROW_SHARED_LIB}) - ADD_THIRDPARTY_LIB(arrow_python SHARED_LIB ${ARROW_PYTHON_SHARED_LIB}) + add_thirdparty_lib(arrow SHARED_LIB ${ARROW_SHARED_LIB}) + add_thirdparty_lib(arrow_python SHARED_LIB ${ARROW_PYTHON_SHARED_LIB}) if(PYARROW_BUILD_FLIGHT) - ADD_THIRDPARTY_LIB(arrow_flight SHARED_LIB ${ARROW_FLIGHT_SHARED_LIB}) + add_thirdparty_lib(arrow_flight SHARED_LIB ${ARROW_FLIGHT_SHARED_LIB}) endif() endif() From 9d5442a0107341eaad6c87288a648e50b831a838 Mon Sep 17 00:00:00 2001 From: David Li Date: Tue, 12 Feb 2019 15:42:30 -0500 Subject: [PATCH 4/5] Clarify various RecordBatchStream{Reader,Writer} wrappers --- python/pyarrow/ipc.pxi | 22 +++++++++++++++++++--- python/pyarrow/ipc.py | 4 ++-- 2 files changed, 21 insertions(+), 5 deletions(-) diff --git a/python/pyarrow/ipc.pxi b/python/pyarrow/ipc.pxi index c396bb9efac..e8573022bb4 100644 --- a/python/pyarrow/ipc.pxi +++ b/python/pyarrow/ipc.pxi @@ -149,6 +149,14 @@ cdef class MessageReader: # File and stream readers and writers cdef class _CRecordBatchWriter: + """The base RecordBatchWriter wrapper. + + Provides common implementations of convenience methods. Should not + be instantiated directly by user code. + """ + + # cdef block is in lib.pxd + def write(self, table_or_batch): """ Write RecordBatch or Table to stream @@ -209,7 +217,7 @@ cdef class _CRecordBatchWriter: self.close() -cdef class _RecordBatchWriter(_CRecordBatchWriter): +cdef class _RecordBatchStreamWriter(_CRecordBatchWriter): cdef: shared_ptr[OutputStream] sink bint closed @@ -245,6 +253,14 @@ cdef _get_input_stream(object source, shared_ptr[InputStream]* out): cdef class _CRecordBatchReader: + """The base RecordBatchReader wrapper. + + Provides common implementations of convenience methods. Should not + be instantiated directly by user code. + """ + + # cdef block is in lib.pxd + def __iter__(self): while True: yield self.read_next_batch() @@ -280,7 +296,7 @@ cdef class _CRecordBatchReader: return pyarrow_wrap_table(table) -cdef class _RecordBatchReader(_CRecordBatchReader): +cdef class _RecordBatchStreamReader(_CRecordBatchReader): cdef: shared_ptr[InputStream] in_stream @@ -299,7 +315,7 @@ cdef class _RecordBatchReader(_CRecordBatchReader): self.schema = pyarrow_wrap_schema(self.reader.get().schema()) -cdef class _RecordBatchFileWriter(_RecordBatchWriter): +cdef class _RecordBatchFileWriter(_RecordBatchStreamWriter): def _open(self, sink, Schema schema): get_writer(sink, &self.sink) diff --git a/python/pyarrow/ipc.py b/python/pyarrow/ipc.py index a79cafe2471..78bb347da48 100644 --- a/python/pyarrow/ipc.py +++ b/python/pyarrow/ipc.py @@ -45,7 +45,7 @@ def read_pandas(self, **options): return table.to_pandas(**options) -class RecordBatchStreamReader(lib._RecordBatchReader, _ReadPandasOption): +class RecordBatchStreamReader(lib._RecordBatchStreamReader, _ReadPandasOption): """ Reader for the Arrow streaming binary format @@ -58,7 +58,7 @@ def __init__(self, source): self._open(source) -class RecordBatchStreamWriter(lib._RecordBatchWriter): +class RecordBatchStreamWriter(lib._RecordBatchStreamWriter): """ Writer for the Arrow streaming binary format From ac29ab88d2854df74be3ab539617b33616a0f13a Mon Sep 17 00:00:00 2001 From: David Li Date: Wed, 13 Feb 2019 11:17:11 -0500 Subject: [PATCH 5/5] Clean up to-be-implemented parts of Flight Python bindings --- python/pyarrow/_flight.pyx | 18 ++++-------------- python/requirements.txt | 1 + 2 files changed, 5 insertions(+), 14 deletions(-) diff --git a/python/pyarrow/_flight.pyx b/python/pyarrow/_flight.pyx index 6e8b8fbe55b..48191710414 100644 --- a/python/pyarrow/_flight.pyx +++ b/python/pyarrow/_flight.pyx @@ -415,23 +415,13 @@ cdef class FlightDataStream: cdef class RecordBatchStream(FlightDataStream): def __init__(self, reader): # TODO: we don't really expose the readers in Python. - # self.stream.reset(None) pass cdef void _get_flight_info(void* self, CFlightDescriptor c_descriptor, unique_ptr[CFlightInfo]* info): """Callback for implementing Flight servers in Python.""" - cdef FlightDescriptor descriptor = \ - FlightDescriptor.__new__(FlightDescriptor) - descriptor.descriptor = c_descriptor - result = ( self).get_flight_info(descriptor) - if not result: - info.reset(NULL) - # TODO: - else: - # TODO: - pass + raise NotImplementedError("GetFlightInfo is not implemented") cdef void _do_put(void* self, unique_ptr[CFlightMessageReader] reader): @@ -479,8 +469,8 @@ cdef class FlightServerBase: raise NotImplementedError def do_put(self, descriptor, reader): - pass + raise NotImplementedError def shutdown(self): - # TODO: null check - self.server.get().Shutdown() + if self.server.get() != NULL: + self.server.get().Shutdown() diff --git a/python/requirements.txt b/python/requirements.txt index 3a23d1dacf8..ba67f6b5802 100644 --- a/python/requirements.txt +++ b/python/requirements.txt @@ -1,3 +1,4 @@ six>=1.0.0 numpy>=1.14 futures; python_version < "3.2" +enum34 >= 1.1.6; python_version < "3.4"