diff --git a/cpp/src/arrow/CMakeLists.txt b/cpp/src/arrow/CMakeLists.txt index 22ce6e91336..838e3966ee8 100644 --- a/cpp/src/arrow/CMakeLists.txt +++ b/cpp/src/arrow/CMakeLists.txt @@ -73,6 +73,7 @@ set(ARROW_SRCS array/builder_dict.cc array/builder_nested.cc array/builder_primitive.cc + array/builder_union.cc buffer.cc compare.cc diff --git a/cpp/src/arrow/array/builder_nested.cc b/cpp/src/arrow/array/builder_nested.cc index 2f600cd9b92..46637713c3e 100644 --- a/cpp/src/arrow/array/builder_nested.cc +++ b/cpp/src/arrow/array/builder_nested.cc @@ -99,6 +99,12 @@ Status ListBuilder::FinishInternal(std::shared_ptr* out) { RETURN_NOT_OK(value_builder_->FinishInternal(&items)); } + // If the type has not been specified in the constructor, infer it + // This is the case if the value_builder contains a DenseUnionBuilder + if (!arrow::internal::checked_cast(*type_).value_type()) { + type_ = std::static_pointer_cast( + std::make_shared(value_builder_->type())); + } std::shared_ptr null_bitmap; RETURN_NOT_OK(null_bitmap_builder_.Finish(&null_bitmap)); *out = ArrayData::Make(type_, length_, {null_bitmap, offsets}, null_count_); @@ -138,17 +144,29 @@ void StructBuilder::Reset() { Status StructBuilder::FinishInternal(std::shared_ptr* out) { std::shared_ptr null_bitmap; RETURN_NOT_OK(null_bitmap_builder_.Finish(&null_bitmap)); - *out = ArrayData::Make(type_, length_, {null_bitmap}, null_count_); - (*out)->child_data.resize(children_.size()); + std::vector> child_data(children_.size()); for (size_t i = 0; i < children_.size(); ++i) { if (length_ == 0) { // Try to make sure the child buffers are initialized RETURN_NOT_OK(children_[i]->Resize(0)); } - RETURN_NOT_OK(children_[i]->FinishInternal(&(*out)->child_data[i])); + RETURN_NOT_OK(children_[i]->FinishInternal(&child_data[i])); } + // If the type has not been specified in the constructor, infer it + // This is the case if one of the children contains a DenseUnionBuilder + if (!type_) { + std::vector> fields; + for (const auto& field_builder : children_) { + fields.push_back(field("", field_builder->type())); + } + type_ = struct_(fields); + } + + *out = ArrayData::Make(type_, length_, {null_bitmap}, null_count_); + (*out)->child_data = std::move(child_data); + capacity_ = length_ = null_count_ = 0; return Status::OK(); } diff --git a/cpp/src/arrow/array/builder_union.cc b/cpp/src/arrow/array/builder_union.cc new file mode 100644 index 00000000000..f51b7d7f020 --- /dev/null +++ b/cpp/src/arrow/array/builder_union.cc @@ -0,0 +1,60 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "arrow/array/builder_union.h" + +#include + +#include "arrow/util/logging.h" + +namespace arrow { + +DenseUnionBuilder::DenseUnionBuilder(MemoryPool* pool, + const std::shared_ptr& type) + : ArrayBuilder(type, pool), types_builder_(pool), offsets_builder_(pool) {} + +Status DenseUnionBuilder::FinishInternal(std::shared_ptr* out) { + std::shared_ptr types; + RETURN_NOT_OK(types_builder_.Finish(&types)); + std::shared_ptr offsets; + RETURN_NOT_OK(offsets_builder_.Finish(&offsets)); + + std::shared_ptr null_bitmap; + RETURN_NOT_OK(null_bitmap_builder_.Finish(&null_bitmap)); + + std::vector> fields; + std::vector> child_data(children_.size()); + std::vector type_ids; + for (size_t i = 0; i < children_.size(); ++i) { + std::shared_ptr data; + RETURN_NOT_OK(children_[i]->FinishInternal(&data)); + child_data[i] = data; + fields.push_back(field(field_names_[i], children_[i]->type())); + type_ids.push_back(static_cast(i)); + } + + // If the type has not been specified in the constructor, infer it + if (!type_) { + type_ = union_(fields, type_ids, UnionMode::DENSE); + } + + *out = ArrayData::Make(type_, length(), {null_bitmap, types, offsets}, null_count_); + (*out)->child_data = std::move(child_data); + return Status::OK(); +} + +} // namespace arrow diff --git a/cpp/src/arrow/array/builder_union.h b/cpp/src/arrow/array/builder_union.h new file mode 100644 index 00000000000..2ababc7d96e --- /dev/null +++ b/cpp/src/arrow/array/builder_union.h @@ -0,0 +1,89 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include +#include +#include + +#include "arrow/array.h" +#include "arrow/array/builder_base.h" +#include "arrow/buffer-builder.h" + +namespace arrow { + +/// \class DenseUnionBuilder +/// +/// You need to call AppendChild for each of the children builders you want +/// to use. The function will return an int8_t, which is the type tag +/// associated with that child. You can then call Append with that tag +/// (followed by an append on the child builder) to add elements to +/// the union array. +/// +/// You can either specify the type when the UnionBuilder is constructed +/// or let the UnionBuilder infer the type at runtime (by omitting the +/// type argument from the constructor). +/// +/// This API is EXPERIMENTAL. +class ARROW_EXPORT DenseUnionBuilder : public ArrayBuilder { + public: + /// Use this constructor to incrementally build the union array along + /// with types, offsets, and null bitmap. + explicit DenseUnionBuilder(MemoryPool* pool, + const std::shared_ptr& type = NULLPTR); + + Status AppendNull() { + ARROW_RETURN_NOT_OK(types_builder_.Append(0)); + ARROW_RETURN_NOT_OK(offsets_builder_.Append(0)); + return AppendToBitmap(false); + } + + /// \brief Append an element to the UnionArray. This must be followed + /// by an append to the appropriate child builder. + /// \param[in] type index of the child the value will be appended + /// \param[in] offset offset of the value in that child + Status Append(int8_t type, int32_t offset) { + ARROW_RETURN_NOT_OK(types_builder_.Append(type)); + ARROW_RETURN_NOT_OK(offsets_builder_.Append(offset)); + return AppendToBitmap(true); + } + + Status FinishInternal(std::shared_ptr* out) override; + + /// \brief Make a new child builder available to the UnionArray + /// + /// \param[in] child the child builder + /// \param[in] field_name the name of the field in the union array type + /// if type inference is used + /// \return child index, which is the "type" argument that needs + /// to be passed to the "Append" method to add a new element to + /// the union array. + int8_t AppendChild(const std::shared_ptr& child, + const std::string& field_name = "") { + children_.push_back(child); + field_names_.push_back(field_name); + return static_cast(children_.size() - 1); + } + + private: + TypedBufferBuilder types_builder_; + TypedBufferBuilder offsets_builder_; + std::vector field_names_; +}; + +} // namespace arrow diff --git a/cpp/src/arrow/python/deserialize.cc b/cpp/src/arrow/python/deserialize.cc index f1a7eab8fcb..f13070a5883 100644 --- a/cpp/src/arrow/python/deserialize.cc +++ b/cpp/src/arrow/python/deserialize.cc @@ -108,17 +108,16 @@ Status DeserializeArray(int32_t index, PyObject* base, const SerializedPyObject& return Status::OK(); } -Status GetValue(PyObject* context, const UnionArray& parent, const Array& arr, - int64_t index, int32_t type, PyObject* base, - const SerializedPyObject& blobs, PyObject** result) { - switch (arr.type()->id()) { - case Type::BOOL: +Status GetValue(PyObject* context, const Array& arr, int64_t index, int8_t type, + PyObject* base, const SerializedPyObject& blobs, PyObject** result) { + switch (type) { + case PythonType::BOOL: *result = PyBool_FromLong(checked_cast(arr).Value(index)); return Status::OK(); - case Type::INT64: { + case PythonType::PY2INT: + case PythonType::INT: { #if PY_MAJOR_VERSION < 3 - const std::string& child_name = parent.type()->child(type)->name(); - if (child_name == "py2_int") { + if (type == PythonType::PY2INT) { *result = PyInt_FromSsize_t(checked_cast(arr).Value(index)); return Status::OK(); } @@ -126,135 +125,151 @@ Status GetValue(PyObject* context, const UnionArray& parent, const Array& arr, *result = PyLong_FromSsize_t(checked_cast(arr).Value(index)); return Status::OK(); } - case Type::BINARY: { + case PythonType::BYTES: { auto view = checked_cast(arr).GetView(index); *result = PyBytes_FromStringAndSize(view.data(), view.length()); return CheckPyError(); } - case Type::STRING: { + case PythonType::STRING: { auto view = checked_cast(arr).GetView(index); *result = PyUnicode_FromStringAndSize(view.data(), view.length()); return CheckPyError(); } - case Type::HALF_FLOAT: { + case PythonType::HALF_FLOAT: { *result = PyHalf_FromHalf(checked_cast(arr).Value(index)); RETURN_IF_PYERROR(); return Status::OK(); } - case Type::FLOAT: + case PythonType::FLOAT: *result = PyFloat_FromDouble(checked_cast(arr).Value(index)); return Status::OK(); - case Type::DOUBLE: + case PythonType::DOUBLE: *result = PyFloat_FromDouble(checked_cast(arr).Value(index)); return Status::OK(); - case Type::DATE64: { + case PythonType::DATE64: { RETURN_NOT_OK(PyDateTime_from_int( checked_cast(arr).Value(index), TimeUnit::MICRO, result)); RETURN_IF_PYERROR(); return Status::OK(); } - case Type::STRUCT: { - const auto& s = checked_cast(arr); - const auto& l = checked_cast(*s.field(0)); - if (s.type()->child(0)->name() == "list") { - return DeserializeList(context, *l.values(), l.value_offset(index), - l.value_offset(index + 1), base, blobs, result); - } else if (s.type()->child(0)->name() == "tuple") { - return DeserializeTuple(context, *l.values(), l.value_offset(index), - l.value_offset(index + 1), base, blobs, result); - } else if (s.type()->child(0)->name() == "dict") { - return DeserializeDict(context, *l.values(), l.value_offset(index), - l.value_offset(index + 1), base, blobs, result); - } else if (s.type()->child(0)->name() == "set") { - return DeserializeSet(context, *l.values(), l.value_offset(index), + case PythonType::LIST: { + const auto& l = checked_cast(arr); + return DeserializeList(context, *l.values(), l.value_offset(index), + l.value_offset(index + 1), base, blobs, result); + } + case PythonType::DICT: { + const auto& l = checked_cast(arr); + return DeserializeDict(context, *l.values(), l.value_offset(index), + l.value_offset(index + 1), base, blobs, result); + } + case PythonType::TUPLE: { + const auto& l = checked_cast(arr); + return DeserializeTuple(context, *l.values(), l.value_offset(index), l.value_offset(index + 1), base, blobs, result); - } else { - DCHECK(false) << "unexpected StructArray type " << s.type()->child(0)->name(); - } } - default: { - const std::string& child_name = parent.type()->child(type)->name(); - if (child_name == "tensor") { - int32_t ref = checked_cast(arr).Value(index); - *result = wrap_tensor(blobs.tensors[ref]); - return Status::OK(); - } else if (child_name == "buffer") { - int32_t ref = checked_cast(arr).Value(index); - *result = wrap_buffer(blobs.buffers[ref]); - return Status::OK(); - } else if (child_name == "ndarray") { - int32_t ref = checked_cast(arr).Value(index); - return DeserializeArray(ref, base, blobs, result); - } else { - DCHECK(false) << "union tag " << type << " with child name '" << child_name - << "' not recognized"; - } + case PythonType::SET: { + const auto& l = checked_cast(arr); + return DeserializeSet(context, *l.values(), l.value_offset(index), + l.value_offset(index + 1), base, blobs, result); + } + case PythonType::TENSOR: { + int32_t ref = checked_cast(arr).Value(index); + *result = wrap_tensor(blobs.tensors[ref]); + return Status::OK(); + } + case PythonType::NDARRAY: { + int32_t ref = checked_cast(arr).Value(index); + return DeserializeArray(ref, base, blobs, result); + } + case PythonType::BUFFER: { + int32_t ref = checked_cast(arr).Value(index); + *result = wrap_buffer(blobs.buffers[ref]); + return Status::OK(); } + default: { ARROW_CHECK(false) << "union tag " << type << "' not recognized"; } } return Status::OK(); } -#define DESERIALIZE_SEQUENCE(CREATE_FN, SET_ITEM_FN) \ - const auto& data = checked_cast(array); \ - OwnedRef result(CREATE_FN(stop_idx - start_idx)); \ - const uint8_t* type_ids = data.raw_type_ids(); \ - const int32_t* value_offsets = data.raw_value_offsets(); \ - for (int64_t i = start_idx; i < stop_idx; ++i) { \ - if (data.IsNull(i)) { \ - Py_INCREF(Py_None); \ - SET_ITEM_FN(result.obj(), i - start_idx, Py_None); \ - } else { \ - int64_t offset = value_offsets[i]; \ - uint8_t type = type_ids[i]; \ - PyObject* value; \ - RETURN_NOT_OK(GetValue(context, data, *data.UnsafeChild(type), offset, type, base, \ - blobs, &value)); \ - SET_ITEM_FN(result.obj(), i - start_idx, value); \ - } \ - } \ - *out = result.detach(); \ - return Status::OK() - -Status DeserializeList(PyObject* context, const Array& array, int64_t start_idx, - int64_t stop_idx, PyObject* base, const SerializedPyObject& blobs, - PyObject** out) { - DESERIALIZE_SEQUENCE(PyList_New, PyList_SET_ITEM); -} - -Status DeserializeTuple(PyObject* context, const Array& array, int64_t start_idx, - int64_t stop_idx, PyObject* base, const SerializedPyObject& blobs, - PyObject** out) { - DESERIALIZE_SEQUENCE(PyTuple_New, PyTuple_SET_ITEM); +std::vector GetPythonTypes(const UnionArray& data) { + std::vector result; + auto type = data.type(); + for (int i = 0; i < type->num_children(); ++i) { + // stoi is locale dependent, but should be ok for small integers + result.push_back(static_cast(std::stoi(type->child(i)->name()))); + } + return result; } -Status DeserializeSet(PyObject* context, const Array& array, int64_t start_idx, - int64_t stop_idx, PyObject* base, const SerializedPyObject& blobs, - PyObject** out) { +template +Status DeserializeSequence(PyObject* context, const Array& array, int64_t start_idx, + int64_t stop_idx, PyObject* base, + const SerializedPyObject& blobs, + CreateSequenceFn&& create_sequence, SetItemFn&& set_item, + PyObject** out) { const auto& data = checked_cast(array); - OwnedRef result(PySet_New(nullptr)); + OwnedRef result(create_sequence(stop_idx - start_idx)); + RETURN_IF_PYERROR(); const uint8_t* type_ids = data.raw_type_ids(); const int32_t* value_offsets = data.raw_value_offsets(); + auto python_types = GetPythonTypes(data); for (int64_t i = start_idx; i < stop_idx; ++i) { if (data.IsNull(i)) { Py_INCREF(Py_None); - if (PySet_Add(result.obj(), Py_None) < 0) { - RETURN_IF_PYERROR(); - } + RETURN_NOT_OK(set_item(result.obj(), i - start_idx, Py_None)); } else { - int32_t offset = value_offsets[i]; - int8_t type = type_ids[i]; + int64_t offset = value_offsets[i]; + uint8_t type = type_ids[i]; PyObject* value; - RETURN_NOT_OK(GetValue(context, data, *data.UnsafeChild(type), offset, type, base, - blobs, &value)); - if (PySet_Add(result.obj(), value) < 0) { - RETURN_IF_PYERROR(); - } + RETURN_NOT_OK(GetValue(context, *data.UnsafeChild(type), offset, + python_types[type_ids[i]], base, blobs, &value)); + RETURN_NOT_OK(set_item(result.obj(), i - start_idx, value)); } } *out = result.detach(); return Status::OK(); } +Status DeserializeList(PyObject* context, const Array& array, int64_t start_idx, + int64_t stop_idx, PyObject* base, const SerializedPyObject& blobs, + PyObject** out) { + return DeserializeSequence(context, array, start_idx, stop_idx, base, blobs, + [](int64_t size) { return PyList_New(size); }, + [](PyObject* seq, int64_t index, PyObject* item) { + PyList_SET_ITEM(seq, index, item); + return Status::OK(); + }, + out); +} + +Status DeserializeTuple(PyObject* context, const Array& array, int64_t start_idx, + int64_t stop_idx, PyObject* base, const SerializedPyObject& blobs, + PyObject** out) { + return DeserializeSequence(context, array, start_idx, stop_idx, base, blobs, + [](int64_t size) { return PyTuple_New(size); }, + [](PyObject* seq, int64_t index, PyObject* item) { + PyTuple_SET_ITEM(seq, index, item); + return Status::OK(); + }, + out); +} + +Status DeserializeSet(PyObject* context, const Array& array, int64_t start_idx, + int64_t stop_idx, PyObject* base, const SerializedPyObject& blobs, + PyObject** out) { + return DeserializeSequence(context, array, start_idx, stop_idx, base, blobs, + [](int64_t size) { return PySet_New(nullptr); }, + [](PyObject* seq, int64_t index, PyObject* item) { + int err = PySet_Add(seq, item); + Py_DECREF(item); + if (err < 0) { + RETURN_IF_PYERROR(); + } + return Status::OK(); + }, + out); +} + Status ReadSerializedObject(io::RandomAccessFile* src, SerializedPyObject* out) { int64_t bytes_read; int32_t num_tensors; diff --git a/cpp/src/arrow/python/serialize.cc b/cpp/src/arrow/python/serialize.cc index ad2636af60c..4dd4c04a6cc 100644 --- a/cpp/src/arrow/python/serialize.cc +++ b/cpp/src/arrow/python/serialize.cc @@ -29,6 +29,7 @@ #include #include "arrow/array.h" +#include "arrow/array/builder_union.h" #include "arrow/builder.h" #include "arrow/io/interfaces.h" #include "arrow/io/memory.h" @@ -55,6 +56,12 @@ using internal::checked_cast; namespace py { +class SequenceBuilder; +class DictBuilder; + +Status Append(PyObject* context, PyObject* elem, SequenceBuilder* builder, + int32_t recursion_depth, SerializedPyObject* blobs_out); + // A Sequence is a heterogeneous collections of elements. It can contain // scalar Python types, lists, tuples, dictionaries and tensors. class SequenceBuilder { @@ -63,241 +70,162 @@ class SequenceBuilder { : pool_(pool), types_(::arrow::int8(), pool), offsets_(::arrow::int32(), pool), - nones_(pool), - bools_(::arrow::boolean(), pool), - ints_(::arrow::int64(), pool), - py2_ints_(::arrow::int64(), pool), - bytes_(::arrow::binary(), pool), - strings_(pool), - half_floats_(::arrow::float16(), pool), - floats_(::arrow::float32(), pool), - doubles_(::arrow::float64(), pool), - date64s_(::arrow::date64(), pool), - tensor_indices_(::arrow::int32(), pool), - ndarray_indices_(::arrow::int32(), pool), - buffer_indices_(::arrow::int32(), pool), - list_offsets_({0}), - tuple_offsets_({0}), - dict_offsets_({0}), - set_offsets_({0}) {} + type_map_(PythonType::NUM_PYTHON_TYPES, -1) { + builder_.reset(new DenseUnionBuilder(pool)); + } // Appending a none to the sequence - Status AppendNone() { - RETURN_NOT_OK(offsets_.Append(0)); - RETURN_NOT_OK(types_.Append(0)); - return nones_.AppendNull(); - } + Status AppendNone() { return builder_->AppendNull(); } - Status Update(int64_t offset, int8_t* tag) { - if (*tag == -1) { - *tag = num_tags_++; - } + template + Status Update(BuilderType* child_builder, int8_t tag) { int32_t offset32 = -1; - RETURN_NOT_OK(internal::CastSize(offset, &offset32)); + RETURN_NOT_OK(internal::CastSize(child_builder->length(), &offset32)); DCHECK_GE(offset32, 0); - RETURN_NOT_OK(offsets_.Append(offset32)); - RETURN_NOT_OK(types_.Append(*tag)); - return nones_.Append(true); + return builder_->Append(tag, offset32); + } + + template + Status CreateAndUpdate(std::shared_ptr* child_builder, int8_t tag, + MakeBuilderFn make_builder) { + if (!*child_builder) { + child_builder->reset(make_builder()); + // std::to_string is locale dependent, but should be ok for small integers + type_map_[tag] = builder_->AppendChild(*child_builder, std::to_string(tag)); + } + return Update(child_builder->get(), type_map_[tag]); } template - Status AppendPrimitive(const T val, int8_t* tag, BuilderType* out) { - RETURN_NOT_OK(Update(out->length(), tag)); - return out->Append(val); + Status AppendPrimitive(std::shared_ptr* child_builder, const T val, + int8_t tag) { + RETURN_NOT_OK( + CreateAndUpdate(child_builder, tag, [this]() { return new BuilderType(pool_); })); + return (*child_builder)->Append(val); } // Appending a boolean to the sequence Status AppendBool(const bool data) { - return AppendPrimitive(data, &bool_tag_, &bools_); + return AppendPrimitive(&bools_, data, PythonType::BOOL); } // Appending a python 2 int64_t to the sequence Status AppendPy2Int64(const int64_t data) { - return AppendPrimitive(data, &py2_int_tag_, &py2_ints_); + return AppendPrimitive(&py2_ints_, data, PythonType::PY2INT); } // Appending an int64_t to the sequence Status AppendInt64(const int64_t data) { - return AppendPrimitive(data, &int_tag_, &ints_); + return AppendPrimitive(&ints_, data, PythonType::INT); } // Append a list of bytes to the sequence Status AppendBytes(const uint8_t* data, int32_t length) { - RETURN_NOT_OK(Update(bytes_.length(), &bytes_tag_)); - return bytes_.Append(data, length); + RETURN_NOT_OK(CreateAndUpdate(&bytes_, PythonType::BYTES, + [this]() { return new BinaryBuilder(pool_); })); + return bytes_->Append(data, length); } // Appending a string to the sequence Status AppendString(const char* data, int32_t length) { - RETURN_NOT_OK(Update(strings_.length(), &string_tag_)); - return strings_.Append(data, length); + RETURN_NOT_OK(CreateAndUpdate(&strings_, PythonType::STRING, + [this]() { return new StringBuilder(pool_); })); + return strings_->Append(data, length); } // Appending a half_float to the sequence Status AppendHalfFloat(const npy_half data) { - return AppendPrimitive(data, &half_float_tag_, &half_floats_); + return AppendPrimitive(&half_floats_, data, PythonType::HALF_FLOAT); } // Appending a float to the sequence Status AppendFloat(const float data) { - return AppendPrimitive(data, &float_tag_, &floats_); + return AppendPrimitive(&floats_, data, PythonType::FLOAT); } // Appending a double to the sequence Status AppendDouble(const double data) { - return AppendPrimitive(data, &double_tag_, &doubles_); + return AppendPrimitive(&doubles_, data, PythonType::DOUBLE); } // Appending a Date64 timestamp to the sequence Status AppendDate64(const int64_t timestamp) { - return AppendPrimitive(timestamp, &date64_tag_, &date64s_); + return AppendPrimitive(&date64s_, timestamp, PythonType::DATE64); } // Appending a tensor to the sequence // // \param tensor_index Index of the tensor in the object. Status AppendTensor(const int32_t tensor_index) { - RETURN_NOT_OK(Update(tensor_indices_.length(), &tensor_tag_)); - return tensor_indices_.Append(tensor_index); + RETURN_NOT_OK(CreateAndUpdate(&tensor_indices_, PythonType::TENSOR, + [this]() { return new Int32Builder(pool_); })); + return tensor_indices_->Append(tensor_index); } // Appending a numpy ndarray to the sequence // // \param tensor_index Index of the tensor in the object. Status AppendNdarray(const int32_t ndarray_index) { - RETURN_NOT_OK(Update(ndarray_indices_.length(), &ndarray_tag_)); - return ndarray_indices_.Append(ndarray_index); + RETURN_NOT_OK(CreateAndUpdate(&ndarray_indices_, PythonType::NDARRAY, + [this]() { return new Int32Builder(pool_); })); + return ndarray_indices_->Append(ndarray_index); } // Appending a buffer to the sequence // // \param buffer_index Indes of the buffer in the object. Status AppendBuffer(const int32_t buffer_index) { - RETURN_NOT_OK(Update(buffer_indices_.length(), &buffer_tag_)); - return buffer_indices_.Append(buffer_index); - } - - // Add a sublist to the sequence. The data contained in the sublist will be - // specified in the "Finish" method. - // - // To construct l = [[11, 22], 33, [44, 55]] you would for example run - // list = ListBuilder(); - // list.AppendList(2); - // list.Append(33); - // list.AppendList(2); - // list.Finish([11, 22, 44, 55]); - // list.Finish(); - - // \param size - // The size of the sublist - Status AppendList(Py_ssize_t size) { - int32_t offset; - RETURN_NOT_OK(internal::CastSize(list_offsets_.back() + size, &offset)); - RETURN_NOT_OK(Update(list_offsets_.size() - 1, &list_tag_)); - list_offsets_.push_back(offset); - return Status::OK(); + RETURN_NOT_OK(CreateAndUpdate(&buffer_indices_, PythonType::BUFFER, + [this]() { return new Int32Builder(pool_); })); + return buffer_indices_->Append(buffer_index); + } + + Status AppendSequence(PyObject* context, PyObject* sequence, int8_t tag, + std::shared_ptr& target_sequence, + std::unique_ptr& values, int32_t recursion_depth, + SerializedPyObject* blobs_out) { + if (recursion_depth >= kMaxRecursionDepth) { + return Status::NotImplemented( + "This object exceeds the maximum recursion depth. It may contain itself " + "recursively."); + } + RETURN_NOT_OK(CreateAndUpdate(&target_sequence, tag, [this, &values]() { + values.reset(new SequenceBuilder(pool_)); + return new ListBuilder(pool_, values->builder()); + })); + RETURN_NOT_OK(target_sequence->Append()); + return internal::VisitIterable( + sequence, [&](PyObject* obj, bool* keep_going /* unused */) { + return Append(context, obj, values.get(), recursion_depth, blobs_out); + }); } - Status AppendTuple(Py_ssize_t size) { - int32_t offset; - RETURN_NOT_OK(internal::CastSize(tuple_offsets_.back() + size, &offset)); - RETURN_NOT_OK(Update(tuple_offsets_.size() - 1, &tuple_tag_)); - tuple_offsets_.push_back(offset); - return Status::OK(); + Status AppendList(PyObject* context, PyObject* list, int32_t recursion_depth, + SerializedPyObject* blobs_out) { + return AppendSequence(context, list, PythonType::LIST, lists_, list_values_, + recursion_depth, blobs_out); } - Status AppendDict(Py_ssize_t size) { - int32_t offset; - RETURN_NOT_OK(internal::CastSize(dict_offsets_.back() + size, &offset)); - RETURN_NOT_OK(Update(dict_offsets_.size() - 1, &dict_tag_)); - dict_offsets_.push_back(offset); - return Status::OK(); - } - - Status AppendSet(Py_ssize_t size) { - int32_t offset; - RETURN_NOT_OK(internal::CastSize(set_offsets_.back() + size, &offset)); - RETURN_NOT_OK(Update(set_offsets_.size() - 1, &set_tag_)); - set_offsets_.push_back(offset); - return Status::OK(); + Status AppendTuple(PyObject* context, PyObject* tuple, int32_t recursion_depth, + SerializedPyObject* blobs_out) { + return AppendSequence(context, tuple, PythonType::TUPLE, tuples_, tuple_values_, + recursion_depth, blobs_out); } - template - Status AddElement(const int8_t tag, BuilderType* out, const std::string& name = "") { - if (tag != -1) { - fields_[tag] = ::arrow::field(name, out->type()); - RETURN_NOT_OK(out->Finish(&children_[tag])); - RETURN_NOT_OK(nones_.Append(true)); - type_ids_.push_back(tag); - } - return Status::OK(); + Status AppendSet(PyObject* context, PyObject* set, int32_t recursion_depth, + SerializedPyObject* blobs_out) { + return AppendSequence(context, set, PythonType::SET, sets_, set_values_, + recursion_depth, blobs_out); } - Status AddSubsequence(int8_t tag, const Array* data, - const std::vector& offsets, const std::string& name) { - if (data != nullptr) { - DCHECK(data->length() == offsets.back()); - std::shared_ptr offset_array; - Int32Builder builder(::arrow::int32(), pool_); - RETURN_NOT_OK(builder.AppendValues(offsets.data(), offsets.size())); - RETURN_NOT_OK(builder.Finish(&offset_array)); - std::shared_ptr list_array; - RETURN_NOT_OK(ListArray::FromArrays(*offset_array, *data, pool_, &list_array)); - auto field = ::arrow::field(name, list_array->type()); - auto type = ::arrow::struct_({field}); - fields_[tag] = ::arrow::field("", type); - children_[tag] = std::shared_ptr( - new StructArray(type, list_array->length(), {list_array})); - RETURN_NOT_OK(nones_.Append(true)); - type_ids_.push_back(tag); - } else { - DCHECK_EQ(offsets.size(), 1); - } - return Status::OK(); - } + Status AppendDict(PyObject* context, PyObject* dict, int32_t recursion_depth, + SerializedPyObject* blobs_out); // Finish building the sequence and return the result. // Input arrays may be nullptr - Status Finish(const Array* list_data, const Array* tuple_data, const Array* dict_data, - const Array* set_data, std::shared_ptr* out) { - fields_.resize(num_tags_); - children_.resize(num_tags_); - - RETURN_NOT_OK(AddElement(bool_tag_, &bools_)); - RETURN_NOT_OK(AddElement(int_tag_, &ints_)); - RETURN_NOT_OK(AddElement(py2_int_tag_, &py2_ints_, "py2_int")); - RETURN_NOT_OK(AddElement(string_tag_, &strings_)); - RETURN_NOT_OK(AddElement(bytes_tag_, &bytes_)); - RETURN_NOT_OK(AddElement(half_float_tag_, &half_floats_)); - RETURN_NOT_OK(AddElement(float_tag_, &floats_)); - RETURN_NOT_OK(AddElement(double_tag_, &doubles_)); - RETURN_NOT_OK(AddElement(date64_tag_, &date64s_)); - RETURN_NOT_OK(AddElement(tensor_tag_, &tensor_indices_, "tensor")); - RETURN_NOT_OK(AddElement(buffer_tag_, &buffer_indices_, "buffer")); - RETURN_NOT_OK(AddElement(ndarray_tag_, &ndarray_indices_, "ndarray")); - - RETURN_NOT_OK(AddSubsequence(list_tag_, list_data, list_offsets_, "list")); - RETURN_NOT_OK(AddSubsequence(tuple_tag_, tuple_data, tuple_offsets_, "tuple")); - RETURN_NOT_OK(AddSubsequence(dict_tag_, dict_data, dict_offsets_, "dict")); - RETURN_NOT_OK(AddSubsequence(set_tag_, set_data, set_offsets_, "set")); - - std::shared_ptr types_array; - RETURN_NOT_OK(types_.Finish(&types_array)); - const auto& types = checked_cast(*types_array); - - std::shared_ptr offsets_array; - RETURN_NOT_OK(offsets_.Finish(&offsets_array)); - const auto& offsets = checked_cast(*offsets_array); - - std::shared_ptr nones_array; - RETURN_NOT_OK(nones_.Finish(&nones_array)); - const auto& nones = checked_cast(*nones_array); - - auto type = ::arrow::union_(fields_, type_ids_, UnionMode::DENSE); - out->reset(new UnionArray(type, types.length(), children_, types.values(), - offsets.values(), nones.null_bitmap(), nones.null_count())); - return Status::OK(); - } + Status Finish(std::shared_ptr* out) { return builder_->Finish(out); } + + std::shared_ptr builder() { return builder_; } private: MemoryPool* pool_; @@ -305,55 +233,33 @@ class SequenceBuilder { Int8Builder types_; Int32Builder offsets_; - BooleanBuilder nones_; - BooleanBuilder bools_; - Int64Builder ints_; - Int64Builder py2_ints_; - BinaryBuilder bytes_; - StringBuilder strings_; - HalfFloatBuilder half_floats_; - FloatBuilder floats_; - DoubleBuilder doubles_; - Date64Builder date64s_; - - Int32Builder tensor_indices_; - Int32Builder ndarray_indices_; - Int32Builder buffer_indices_; - - std::vector list_offsets_; - std::vector tuple_offsets_; - std::vector dict_offsets_; - std::vector set_offsets_; - - // Tags for members of the sequence. If they are set to -1 it means - // they are not used and will not part be of the metadata when we call - // SequenceBuilder::Finish. If a member with one of the tags is added, - // the associated variable gets a unique index starting from 0. This - // happens in the UPDATE macro in sequence.cc. - int8_t bool_tag_ = -1; - int8_t int_tag_ = -1; - int8_t py2_int_tag_ = -1; - int8_t string_tag_ = -1; - int8_t bytes_tag_ = -1; - int8_t half_float_tag_ = -1; - int8_t float_tag_ = -1; - int8_t double_tag_ = -1; - int8_t date64_tag_ = -1; - - int8_t tensor_tag_ = -1; - int8_t buffer_tag_ = -1; - int8_t ndarray_tag_ = -1; - int8_t list_tag_ = -1; - int8_t tuple_tag_ = -1; - int8_t dict_tag_ = -1; - int8_t set_tag_ = -1; - - int8_t num_tags_ = 0; - - // Members for the output union constructed in Finish - std::vector> fields_; - std::vector> children_; - std::vector type_ids_; + /// Mapping from PythonType to child index + std::vector type_map_; + + std::shared_ptr bools_; + std::shared_ptr ints_; + std::shared_ptr py2_ints_; + std::shared_ptr bytes_; + std::shared_ptr strings_; + std::shared_ptr half_floats_; + std::shared_ptr floats_; + std::shared_ptr doubles_; + std::shared_ptr date64s_; + + std::unique_ptr list_values_; + std::shared_ptr lists_; + std::unique_ptr dict_values_; + std::shared_ptr dicts_; + std::unique_ptr tuple_values_; + std::shared_ptr tuples_; + std::unique_ptr set_values_; + std::shared_ptr sets_; + + std::shared_ptr tensor_indices_; + std::shared_ptr ndarray_indices_; + std::shared_ptr buffer_indices_; + + std::shared_ptr builder_; }; // Constructing dictionaries of key/value pairs. Sequences of @@ -362,7 +268,9 @@ class SequenceBuilder { // can be obtained via the Finish method. class DictBuilder { public: - explicit DictBuilder(MemoryPool* pool = nullptr) : keys_(pool), vals_(pool) {} + explicit DictBuilder(MemoryPool* pool = nullptr) : keys_(pool), vals_(pool) { + builder_.reset(new StructBuilder(nullptr, pool, {keys_.builder(), vals_.builder()})); + } // Builder for the keys of the dictionary SequenceBuilder& keys() { return keys_; } @@ -371,38 +279,55 @@ class DictBuilder { // Construct an Arrow StructArray representing the dictionary. // Contains a field "keys" for the keys and "vals" for the values. - // \param val_list_data - // List containing the data from nested lists in the value - // list of the dictionary - // - // \param val_dict_data - // List containing the data from nested dictionaries in the - // value list of the dictionary - Status Finish(const Array* key_tuple_data, const Array* key_dict_data, - const Array* val_list_data, const Array* val_tuple_data, - const Array* val_dict_data, const Array* val_set_data, - std::shared_ptr* out) { - // lists and sets can't be keys of dicts in Python, that is why for - // the keys we do not need to collect sublists - std::shared_ptr keys, vals; - RETURN_NOT_OK(keys_.Finish(nullptr, key_tuple_data, key_dict_data, nullptr, &keys)); - RETURN_NOT_OK( - vals_.Finish(val_list_data, val_tuple_data, val_dict_data, val_set_data, &vals)); - auto keys_field = std::make_shared("keys", keys->type()); - auto vals_field = std::make_shared("vals", vals->type()); - auto type = std::make_shared( - std::vector>({keys_field, vals_field})); - std::vector> field_arrays({keys, vals}); - DCHECK(keys->length() == vals->length()); - out->reset(new StructArray(type, keys->length(), field_arrays)); - return Status::OK(); - } + Status Finish(std::shared_ptr* out) { return builder_->Finish(out); } + + std::shared_ptr builder() { return builder_; } private: SequenceBuilder keys_; SequenceBuilder vals_; + std::shared_ptr builder_; }; +Status SequenceBuilder::AppendDict(PyObject* context, PyObject* dict, + int32_t recursion_depth, + SerializedPyObject* blobs_out) { + if (recursion_depth >= kMaxRecursionDepth) { + return Status::NotImplemented( + "This object exceeds the maximum recursion depth. It may contain itself " + "recursively."); + } + RETURN_NOT_OK(CreateAndUpdate(&dicts_, PythonType::DICT, [this]() { + dict_values_.reset(new DictBuilder(pool_)); + return new ListBuilder(pool_, dict_values_->builder()); + })); + RETURN_NOT_OK(dicts_->Append()); + PyObject* key; + PyObject* value; + Py_ssize_t pos = 0; + while (PyDict_Next(dict, &pos, &key, &value)) { + RETURN_NOT_OK(dict_values_->builder()->Append()); + RETURN_NOT_OK( + Append(context, key, &dict_values_->keys(), recursion_depth + 1, blobs_out)); + RETURN_NOT_OK( + Append(context, value, &dict_values_->vals(), recursion_depth + 1, blobs_out)); + } + + // This block is used to decrement the reference counts of the results + // returned by the serialization callback, which is called in AppendArray, + // in DeserializeDict and in Append + static PyObject* py_type = PyUnicode_FromString("_pytype_"); + if (PyDict_Contains(dict, py_type)) { + // If the dictionary contains the key "_pytype_", then the user has to + // have registered a callback. + if (context == Py_None) { + return Status::Invalid("No serialization callback set"); + } + Py_XDECREF(dict); + } + return Status::OK(); +} + Status CallCustomCallback(PyObject* context, PyObject* method_name, PyObject* elem, PyObject** result) { *result = NULL; @@ -433,16 +358,8 @@ Status CallDeserializeCallback(PyObject* context, PyObject* value, return CallCustomCallback(context, method_name.obj(), value, deserialized_object); } -Status SerializeDict(PyObject* context, std::vector dicts, - int32_t recursion_depth, std::shared_ptr* out, - SerializedPyObject* blobs_out); - -Status SerializeArray(PyObject* context, PyArrayObject* array, SequenceBuilder* builder, - std::vector* subdicts, SerializedPyObject* blobs_out); - -Status SerializeSequences(PyObject* context, std::vector sequences, - int32_t recursion_depth, std::shared_ptr* out, - SerializedPyObject* blobs_out); +Status AppendArray(PyObject* context, PyArrayObject* array, SequenceBuilder* builder, + int32_t recursion_depth, SerializedPyObject* blobs_out); template Status AppendIntegerScalar(PyObject* obj, SequenceBuilder* builder) { @@ -502,9 +419,7 @@ Status AppendScalar(PyObject* obj, SequenceBuilder* builder) { } Status Append(PyObject* context, PyObject* elem, SequenceBuilder* builder, - std::vector* sublists, std::vector* subtuples, - std::vector* subdicts, std::vector* subsets, - SerializedPyObject* blobs_out) { + int32_t recursion_depth, SerializedPyObject* blobs_out) { // The bool case must precede the int case (PyInt_Check passes for bools) if (PyBool_Check(elem)) { RETURN_NOT_OK(builder->AppendBool(elem == Py_True)); @@ -523,8 +438,8 @@ Status Append(PyObject* context, PyObject* elem, SequenceBuilder* builder, PyObject* serialized_object; // The reference count of serialized_object will be decremented in SerializeDict RETURN_NOT_OK(CallSerializeCallback(context, elem, &serialized_object)); - RETURN_NOT_OK(builder->AppendDict(PyDict_Size(serialized_object))); - subdicts->push_back(serialized_object); + RETURN_NOT_OK( + builder->AppendDict(context, serialized_object, recursion_depth, blobs_out)); } #if PY_MAJOR_VERSION < 3 } else if (PyInt_Check(elem)) { @@ -542,22 +457,18 @@ Status Append(PyObject* context, PyObject* elem, SequenceBuilder* builder, RETURN_NOT_OK(internal::CastSize(view.size, &size)); RETURN_NOT_OK(builder->AppendString(view.bytes, size)); } else if (PyList_CheckExact(elem)) { - RETURN_NOT_OK(builder->AppendList(PyList_Size(elem))); - sublists->push_back(elem); + RETURN_NOT_OK(builder->AppendList(context, elem, recursion_depth, blobs_out)); } else if (PyDict_CheckExact(elem)) { - RETURN_NOT_OK(builder->AppendDict(PyDict_Size(elem))); - subdicts->push_back(elem); + RETURN_NOT_OK(builder->AppendDict(context, elem, recursion_depth, blobs_out)); } else if (PyTuple_CheckExact(elem)) { - RETURN_NOT_OK(builder->AppendTuple(PyTuple_Size(elem))); - subtuples->push_back(elem); + RETURN_NOT_OK(builder->AppendTuple(context, elem, recursion_depth, blobs_out)); } else if (PySet_Check(elem)) { - RETURN_NOT_OK(builder->AppendSet(PySet_Size(elem))); - subsets->push_back(elem); + RETURN_NOT_OK(builder->AppendSet(context, elem, recursion_depth, blobs_out)); } else if (PyArray_IsScalar(elem, Generic)) { RETURN_NOT_OK(AppendScalar(elem, builder)); } else if (PyArray_CheckExact(elem)) { - RETURN_NOT_OK(SerializeArray(context, reinterpret_cast(elem), builder, - subdicts, blobs_out)); + RETURN_NOT_OK(AppendArray(context, reinterpret_cast(elem), builder, + recursion_depth, blobs_out)); } else if (elem == Py_None) { RETURN_NOT_OK(builder->AppendNone()); } else if (PyDateTime_Check(elem)) { @@ -578,14 +489,14 @@ Status Append(PyObject* context, PyObject* elem, SequenceBuilder* builder, PyObject* serialized_object; // The reference count of serialized_object will be decremented in SerializeDict RETURN_NOT_OK(CallSerializeCallback(context, elem, &serialized_object)); - RETURN_NOT_OK(builder->AppendDict(PyDict_Size(serialized_object))); - subdicts->push_back(serialized_object); + RETURN_NOT_OK( + builder->AppendDict(context, serialized_object, recursion_depth, blobs_out)); } return Status::OK(); } -Status SerializeArray(PyObject* context, PyArrayObject* array, SequenceBuilder* builder, - std::vector* subdicts, SerializedPyObject* blobs_out) { +Status AppendArray(PyObject* context, PyArrayObject* array, SequenceBuilder* builder, + int32_t recursion_depth, SerializedPyObject* blobs_out) { int dtype = PyArray_TYPE(array); switch (dtype) { case NPY_UINT8: @@ -611,126 +522,10 @@ Status SerializeArray(PyObject* context, PyArrayObject* array, SequenceBuilder* // The reference count of serialized_object will be decremented in SerializeDict RETURN_NOT_OK(CallSerializeCallback(context, reinterpret_cast(array), &serialized_object)); - RETURN_NOT_OK(builder->AppendDict(PyDict_Size(serialized_object))); - subdicts->push_back(serialized_object); - } - } - return Status::OK(); -} - -Status SerializeSequences(PyObject* context, std::vector sequences, - int32_t recursion_depth, std::shared_ptr* out, - SerializedPyObject* blobs_out) { - DCHECK(out); - if (recursion_depth >= kMaxRecursionDepth) { - return Status::NotImplemented( - "This object exceeds the maximum recursion depth. It may contain itself " - "recursively."); - } - SequenceBuilder builder; - std::vector sublists, subtuples, subdicts, subsets; - for (const auto& sequence : sequences) { - RETURN_NOT_OK(internal::VisitIterable( - sequence, [&](PyObject* obj, bool* keep_going /* unused */) { - return Append(context, obj, &builder, &sublists, &subtuples, &subdicts, - &subsets, blobs_out); - })); - } - std::shared_ptr list; - if (sublists.size() > 0) { - RETURN_NOT_OK( - SerializeSequences(context, sublists, recursion_depth + 1, &list, blobs_out)); - } - std::shared_ptr tuple; - if (subtuples.size() > 0) { - RETURN_NOT_OK( - SerializeSequences(context, subtuples, recursion_depth + 1, &tuple, blobs_out)); - } - std::shared_ptr dict; - if (subdicts.size() > 0) { - RETURN_NOT_OK( - SerializeDict(context, subdicts, recursion_depth + 1, &dict, blobs_out)); - } - std::shared_ptr set; - if (subsets.size() > 0) { - RETURN_NOT_OK( - SerializeSequences(context, subsets, recursion_depth + 1, &set, blobs_out)); - } - return builder.Finish(list.get(), tuple.get(), dict.get(), set.get(), out); -} - -Status SerializeDict(PyObject* context, std::vector dicts, - int32_t recursion_depth, std::shared_ptr* out, - SerializedPyObject* blobs_out) { - DictBuilder result; - if (recursion_depth >= kMaxRecursionDepth) { - return Status::NotImplemented( - "This object exceeds the maximum recursion depth. It may contain itself " - "recursively."); - } - std::vector key_tuples, key_dicts, val_lists, val_tuples, val_dicts, - val_sets, dummy; - for (const auto& dict : dicts) { - PyObject* key; - PyObject* value; - Py_ssize_t pos = 0; - while (PyDict_Next(dict, &pos, &key, &value)) { - RETURN_NOT_OK(Append(context, key, &result.keys(), &dummy, &key_tuples, &key_dicts, - &dummy, blobs_out)); - DCHECK_EQ(dummy.size(), 0); - RETURN_NOT_OK(Append(context, value, &result.vals(), &val_lists, &val_tuples, - &val_dicts, &val_sets, blobs_out)); - } - } - std::shared_ptr key_tuples_arr; - if (key_tuples.size() > 0) { - RETURN_NOT_OK(SerializeSequences(context, key_tuples, recursion_depth + 1, - &key_tuples_arr, blobs_out)); - } - std::shared_ptr key_dicts_arr; - if (key_dicts.size() > 0) { - RETURN_NOT_OK(SerializeDict(context, key_dicts, recursion_depth + 1, &key_dicts_arr, - blobs_out)); - } - std::shared_ptr val_list_arr; - if (val_lists.size() > 0) { - RETURN_NOT_OK(SerializeSequences(context, val_lists, recursion_depth + 1, - &val_list_arr, blobs_out)); - } - std::shared_ptr val_tuples_arr; - if (val_tuples.size() > 0) { - RETURN_NOT_OK(SerializeSequences(context, val_tuples, recursion_depth + 1, - &val_tuples_arr, blobs_out)); - } - std::shared_ptr val_dict_arr; - if (val_dicts.size() > 0) { - RETURN_NOT_OK( - SerializeDict(context, val_dicts, recursion_depth + 1, &val_dict_arr, blobs_out)); - } - std::shared_ptr val_set_arr; - if (val_sets.size() > 0) { - RETURN_NOT_OK(SerializeSequences(context, val_sets, recursion_depth + 1, &val_set_arr, - blobs_out)); - } - RETURN_NOT_OK(result.Finish(key_tuples_arr.get(), key_dicts_arr.get(), - val_list_arr.get(), val_tuples_arr.get(), - val_dict_arr.get(), val_set_arr.get(), out)); - - // This block is used to decrement the reference counts of the results - // returned by the serialization callback, which is called in SerializeArray, - // in DeserializeDict and in Append - static PyObject* py_type = PyUnicode_FromString("_pytype_"); - for (const auto& dict : dicts) { - if (PyDict_Contains(dict, py_type)) { - // If the dictionary contains the key "_pytype_", then the user has to - // have registered a callback. - if (context == Py_None) { - return Status::Invalid("No serialization callback set"); - } - Py_XDECREF(dict); + RETURN_NOT_OK(builder->AppendDict(context, serialized_object, recursion_depth + 1, + blobs_out)); } } - return Status::OK(); } @@ -744,9 +539,13 @@ Status SerializeObject(PyObject* context, PyObject* sequence, SerializedPyObject PyAcquireGIL lock; PyDateTime_IMPORT; import_pyarrow(); - std::vector sequences = {sequence}; + SequenceBuilder builder; + RETURN_NOT_OK(internal::VisitIterable( + sequence, [&](PyObject* obj, bool* keep_going /* unused */) { + return Append(context, obj, &builder, 0, out); + })); std::shared_ptr array; - RETURN_NOT_OK(SerializeSequences(context, sequences, 0, &array, out)); + RETURN_NOT_OK(builder.Finish(&array)); out->batch = MakeBatch(array); return Status::OK(); } @@ -756,7 +555,7 @@ Status SerializeNdarray(std::shared_ptr tensor, SerializedPyObject* out) SequenceBuilder builder; RETURN_NOT_OK(builder.AppendNdarray(static_cast(out->ndarrays.size()))); out->ndarrays.push_back(tensor); - RETURN_NOT_OK(builder.Finish(nullptr, nullptr, nullptr, nullptr, &array)); + RETURN_NOT_OK(builder.Finish(&array)); out->batch = MakeBatch(array); return Status::OK(); } diff --git a/cpp/src/arrow/python/serialize.h b/cpp/src/arrow/python/serialize.h index 9a9cc65087d..6cdbbe5053f 100644 --- a/cpp/src/arrow/python/serialize.h +++ b/cpp/src/arrow/python/serialize.h @@ -107,6 +107,28 @@ Status WriteNdarrayHeader(std::shared_ptr dtype, const std::vector& shape, int64_t tensor_num_bytes, io::OutputStream* dst); +struct PythonType { + enum type { + BOOL, + INT, + PY2INT, + BYTES, + STRING, + HALF_FLOAT, + FLOAT, + DOUBLE, + DATE64, + LIST, + DICT, + TUPLE, + SET, + TENSOR, + NDARRAY, + BUFFER, + NUM_PYTHON_TYPES + }; +}; + } // namespace py } // namespace arrow