From 327b36884650a84b1182b8f02458bdc151da474c Mon Sep 17 00:00:00 2001 From: "Uwe L. Korn" Date: Wed, 30 Mar 2016 09:52:32 +0200 Subject: [PATCH 1/4] ARROW-86: [Python] Implement zero-copy Arrow-to-Pandas conversion We can create zero-copy NumPy arrays for floats and ints if we have no nulls. Each numpy-arrow-view has a reference to the underlying column to ensure that the Arrow structure lives at least as long as the newly created NumPy array. --- python/pyarrow/array.pyx | 1 - python/pyarrow/includes/pyarrow.pxd | 2 +- python/src/pyarrow/adapters/pandas.cc | 67 ++++++++++++++++++++------- python/src/pyarrow/adapters/pandas.h | 3 +- 4 files changed, 52 insertions(+), 21 deletions(-) diff --git a/python/pyarrow/array.pyx b/python/pyarrow/array.pyx index 456bf6d1da8..a80b3ce8398 100644 --- a/python/pyarrow/array.pyx +++ b/python/pyarrow/array.pyx @@ -288,4 +288,3 @@ cdef class RowBatch: def __getitem__(self, i): return self.arrays[i] - diff --git a/python/pyarrow/includes/pyarrow.pxd b/python/pyarrow/includes/pyarrow.pxd index 1066b8034be..fbaed4c957d 100644 --- a/python/pyarrow/includes/pyarrow.pxd +++ b/python/pyarrow/includes/pyarrow.pxd @@ -46,6 +46,6 @@ cdef extern from "pyarrow/api.h" namespace "pyarrow" nogil: Status PandasMaskedToArrow(MemoryPool* pool, object ao, object mo, shared_ptr[CArray]* out) - Status ArrowToPandas(const shared_ptr[CColumn]& arr, PyObject** out) + Status ArrowToPandas(const shared_ptr[CColumn]& arr, PyObject** out, object py_ref) MemoryPool* GetMemoryPool() diff --git a/python/src/pyarrow/adapters/pandas.cc b/python/src/pyarrow/adapters/pandas.cc index 22f1d7575f8..26915f328b7 100644 --- a/python/src/pyarrow/adapters/pandas.cc +++ b/python/src/pyarrow/adapters/pandas.cc @@ -520,8 +520,8 @@ static inline PyObject* make_pystring(const uint8_t* data, int32_t length) { template class ArrowDeserializer { public: - ArrowDeserializer(const std::shared_ptr& col) : - col_(col) {} + ArrowDeserializer(const std::shared_ptr& col, PyObject* py_ref) : + col_(col), py_ref_(py_ref) {} Status Convert(PyObject** out) { const std::shared_ptr data = col_->data(); @@ -548,6 +548,31 @@ class ArrowDeserializer { return Status::OK(); } + Status OutputFromData(int type, void* data) { + // Zero-Copy. We can pass the data pointer directly to NumPy. + Py_INCREF(py_ref_); + npy_intp dims[1] = {col_->length()}; + out_ = reinterpret_cast(PyArray_SimpleNewFromData(1, dims, + type, data)); + + if (out_ == NULL) { + // Error occurred, trust that SimpleNew set the error state + Py_DECREF(py_ref_); + return Status::OK(); + } + + if (PyArray_SetBaseObject(out_, py_ref_) == -1) { + // Error occurred, trust that SetBaseObject set the error state + Py_DECREF(py_ref_); + return Status::OK(); + } + + // Arrow data is immutable. + PyArray_CLEARFLAGS(out_, NPY_ARRAY_WRITEABLE); + + return Status::OK(); + } + template inline typename std::enable_if< arrow_traits::is_floating, Status>::type @@ -556,18 +581,20 @@ class ArrowDeserializer { arrow::PrimitiveArray* prim_arr = static_cast( arr.get()); - - RETURN_NOT_OK(AllocateOutput(arrow_traits::npy_type)); + const T* in_values = reinterpret_cast(prim_arr->data()->data()); if (arr->null_count() > 0) { + RETURN_NOT_OK(AllocateOutput(arrow_traits::npy_type)); + T* out_values = reinterpret_cast(PyArray_DATA(out_)); - const T* in_values = reinterpret_cast(prim_arr->data()->data()); for (int64_t i = 0; i < arr->length(); ++i) { out_values[i] = arr->IsNull(i) ? NAN : in_values[i]; } } else { - memcpy(PyArray_DATA(out_), prim_arr->data()->data(), - arr->length() * arr->type()->value_size()); + // Zero-Copy. We can pass the data pointer directly to NumPy. + void* data = const_cast(in_values); + int type = arrow_traits::npy_type; + RETURN_NOT_OK(OutputFromData(type, data)); } return Status::OK(); @@ -594,10 +621,10 @@ class ArrowDeserializer { out_values[i] = prim_arr->IsNull(i) ? NAN : in_values[i]; } } else { - RETURN_NOT_OK(AllocateOutput(arrow_traits::npy_type)); - - memcpy(PyArray_DATA(out_), in_values, - arr->length() * arr->type()->value_size()); + // Zero-Copy. We can pass the data pointer directly to NumPy. + void* data = const_cast(in_values); + int type = arrow_traits::npy_type; + RETURN_NOT_OK(OutputFromData(type, data)); } return Status::OK(); @@ -680,18 +707,20 @@ class ArrowDeserializer { } private: std::shared_ptr col_; + PyObject* py_ref_; PyArrayObject* out_; }; -#define FROM_ARROW_CASE(TYPE) \ - case arrow::Type::TYPE: \ - { \ - ArrowDeserializer converter(col); \ - return converter.Convert(out); \ - } \ +#define FROM_ARROW_CASE(TYPE) \ + case arrow::Type::TYPE: \ + { \ + ArrowDeserializer converter(col, py_ref); \ + return converter.Convert(out); \ + } \ break; -Status ArrowToPandas(const std::shared_ptr& col, PyObject** out) { +Status ArrowToPandas(const std::shared_ptr& col, PyObject** out, + PyObject* py_ref) { switch(col->type()->type) { FROM_ARROW_CASE(BOOL); FROM_ARROW_CASE(INT8); @@ -706,8 +735,10 @@ Status ArrowToPandas(const std::shared_ptr& col, PyObject** out) { FROM_ARROW_CASE(DOUBLE); FROM_ARROW_CASE(STRING); default: + Py_DECREF(py_ref); return Status::NotImplemented("Arrow type reading not implemented"); } + Py_DECREF(py_ref); return Status::OK(); } diff --git a/python/src/pyarrow/adapters/pandas.h b/python/src/pyarrow/adapters/pandas.h index 58eb3ca61cd..6c44107c41f 100644 --- a/python/src/pyarrow/adapters/pandas.h +++ b/python/src/pyarrow/adapters/pandas.h @@ -36,7 +36,8 @@ namespace pyarrow { class Status; -Status ArrowToPandas(const std::shared_ptr& col, PyObject** out); +Status ArrowToPandas(const std::shared_ptr& col, PyObject** out, + PyObject* py_ref); Status PandasMaskedToArrow(arrow::MemoryPool* pool, PyObject* ao, PyObject* mo, std::shared_ptr* out); From 9d35528bc77de036a4db77a0af04db42c2421b09 Mon Sep 17 00:00:00 2001 From: "Uwe L. Korn" Date: Sun, 3 Apr 2016 18:05:25 +0200 Subject: [PATCH 2/4] Handle reference counting with OwnedRef --- python/pyarrow/includes/pyarrow.pxd | 2 +- python/pyarrow/table.pyx | 6 ++++-- python/src/pyarrow/adapters/pandas.cc | 14 ++++++++------ python/src/pyarrow/adapters/pandas.h | 4 ++-- 4 files changed, 15 insertions(+), 11 deletions(-) diff --git a/python/pyarrow/includes/pyarrow.pxd b/python/pyarrow/includes/pyarrow.pxd index fbaed4c957d..92c814706fd 100644 --- a/python/pyarrow/includes/pyarrow.pxd +++ b/python/pyarrow/includes/pyarrow.pxd @@ -46,6 +46,6 @@ cdef extern from "pyarrow/api.h" namespace "pyarrow" nogil: Status PandasMaskedToArrow(MemoryPool* pool, object ao, object mo, shared_ptr[CArray]* out) - Status ArrowToPandas(const shared_ptr[CColumn]& arr, PyObject** out, object py_ref) + Status ArrowToPandas(const shared_ptr[CColumn]& arr, object py_ref, PyObject** out) MemoryPool* GetMemoryPool() diff --git a/python/pyarrow/table.pyx b/python/pyarrow/table.pyx index 4c4816f0c7e..f02d36f520b 100644 --- a/python/pyarrow/table.pyx +++ b/python/pyarrow/table.pyx @@ -96,7 +96,7 @@ cdef class Column: import pandas as pd - check_status(pyarrow.ArrowToPandas(self.sp_column, &arr)) + check_status(pyarrow.ArrowToPandas(self.sp_column, self, &arr)) return pd.Series(arr, name=self.name) cdef _check_nullptr(self): @@ -205,6 +205,7 @@ cdef class Table: cdef: PyObject* arr shared_ptr[CColumn] col + Column column import pandas as pd @@ -212,7 +213,8 @@ cdef class Table: data = [] for i in range(self.table.num_columns()): col = self.table.column(i) - check_status(pyarrow.ArrowToPandas(col, &arr)) + column = self.column(i) + check_status(pyarrow.ArrowToPandas(col, column, &arr)) names.append(frombytes(col.get().name())) data.append( arr) diff --git a/python/src/pyarrow/adapters/pandas.cc b/python/src/pyarrow/adapters/pandas.cc index 26915f328b7..88b3b8b76f7 100644 --- a/python/src/pyarrow/adapters/pandas.cc +++ b/python/src/pyarrow/adapters/pandas.cc @@ -551,20 +551,22 @@ class ArrowDeserializer { Status OutputFromData(int type, void* data) { // Zero-Copy. We can pass the data pointer directly to NumPy. Py_INCREF(py_ref_); + OwnedRef py_ref(py_ref); npy_intp dims[1] = {col_->length()}; out_ = reinterpret_cast(PyArray_SimpleNewFromData(1, dims, type, data)); if (out_ == NULL) { // Error occurred, trust that SimpleNew set the error state - Py_DECREF(py_ref_); return Status::OK(); } if (PyArray_SetBaseObject(out_, py_ref_) == -1) { // Error occurred, trust that SetBaseObject set the error state - Py_DECREF(py_ref_); return Status::OK(); + } else { + // PyArray_SetBaseObject steals our reference to py_ref_ + py_ref.reset(nullptr); } // Arrow data is immutable. @@ -719,8 +721,10 @@ class ArrowDeserializer { } \ break; -Status ArrowToPandas(const std::shared_ptr& col, PyObject** out, - PyObject* py_ref) { +Status ArrowToPandas(const std::shared_ptr& col, PyObject* py_ref, + PyObject** out) { + std::shared_ptr shared_py_ref = std::make_shared(py_ref); + switch(col->type()->type) { FROM_ARROW_CASE(BOOL); FROM_ARROW_CASE(INT8); @@ -735,10 +739,8 @@ Status ArrowToPandas(const std::shared_ptr& col, PyObject** out, FROM_ARROW_CASE(DOUBLE); FROM_ARROW_CASE(STRING); default: - Py_DECREF(py_ref); return Status::NotImplemented("Arrow type reading not implemented"); } - Py_DECREF(py_ref); return Status::OK(); } diff --git a/python/src/pyarrow/adapters/pandas.h b/python/src/pyarrow/adapters/pandas.h index 6c44107c41f..17922349de6 100644 --- a/python/src/pyarrow/adapters/pandas.h +++ b/python/src/pyarrow/adapters/pandas.h @@ -36,8 +36,8 @@ namespace pyarrow { class Status; -Status ArrowToPandas(const std::shared_ptr& col, PyObject** out, - PyObject* py_ref); +Status ArrowToPandas(const std::shared_ptr& col, PyObject* py_ref, + PyObject** out); Status PandasMaskedToArrow(arrow::MemoryPool* pool, PyObject* ao, PyObject* mo, std::shared_ptr* out); From 2cb4c7dd5778d248734d070abe64560129c8c296 Mon Sep 17 00:00:00 2001 From: "Uwe L. Korn" Date: Sun, 3 Apr 2016 18:30:28 +0200 Subject: [PATCH 3/4] Release instead of reset reference --- python/src/pyarrow/adapters/pandas.cc | 2 +- python/src/pyarrow/common.h | 4 ++++ 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/python/src/pyarrow/adapters/pandas.cc b/python/src/pyarrow/adapters/pandas.cc index 88b3b8b76f7..c1ae2958b6f 100644 --- a/python/src/pyarrow/adapters/pandas.cc +++ b/python/src/pyarrow/adapters/pandas.cc @@ -566,7 +566,7 @@ class ArrowDeserializer { return Status::OK(); } else { // PyArray_SetBaseObject steals our reference to py_ref_ - py_ref.reset(nullptr); + py_ref.release(); } // Arrow data is immutable. diff --git a/python/src/pyarrow/common.h b/python/src/pyarrow/common.h index cc9ad9ec5bb..0211e8948f2 100644 --- a/python/src/pyarrow/common.h +++ b/python/src/pyarrow/common.h @@ -53,6 +53,10 @@ class OwnedRef { obj_ = obj; } + void release() { + obj_ = nullptr; + } + PyObject* obj() const{ return obj_; } From ee29e90c8f779323315ae3a86764b26ae558da32 Mon Sep 17 00:00:00 2001 From: "Uwe L. Korn" Date: Sun, 3 Apr 2016 20:22:45 +0200 Subject: [PATCH 4/4] Remove duplicate ref counting --- python/src/pyarrow/adapters/pandas.cc | 2 -- 1 file changed, 2 deletions(-) diff --git a/python/src/pyarrow/adapters/pandas.cc b/python/src/pyarrow/adapters/pandas.cc index c1ae2958b6f..b39fde92034 100644 --- a/python/src/pyarrow/adapters/pandas.cc +++ b/python/src/pyarrow/adapters/pandas.cc @@ -723,8 +723,6 @@ class ArrowDeserializer { Status ArrowToPandas(const std::shared_ptr& col, PyObject* py_ref, PyObject** out) { - std::shared_ptr shared_py_ref = std::make_shared(py_ref); - switch(col->type()->type) { FROM_ARROW_CASE(BOOL); FROM_ARROW_CASE(INT8);