diff --git a/.travis.yml b/.travis.yml index 9c714a68948..ddadf739aab 100644 --- a/.travis.yml +++ b/.travis.yml @@ -55,6 +55,7 @@ matrix: - export ARROW_TRAVIS_VALGRIND=1 - export ARROW_TRAVIS_PLASMA=1 - export ARROW_TRAVIS_CLANG_FORMAT=1 + - export ARROW_BUILD_WARNING_LEVEL=CHECKIN - export CC="clang-4.0" - export CXX="clang++-4.0" - $TRAVIS_BUILD_DIR/ci/travis_install_clang_tools.sh @@ -74,6 +75,7 @@ matrix: before_script: - export ARROW_TRAVIS_USE_TOOLCHAIN=1 - export ARROW_TRAVIS_PLASMA=1 + - export ARROW_BUILD_WARNING_LEVEL=CHECKIN - travis_wait 50 $TRAVIS_BUILD_DIR/ci/travis_before_script_cpp.sh script: - $TRAVIS_BUILD_DIR/ci/travis_script_cpp.sh diff --git a/LICENSE.txt b/LICENSE.txt index 84e6a4e2a2a..30966d36f37 100644 --- a/LICENSE.txt +++ b/LICENSE.txt @@ -552,3 +552,34 @@ distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. + +-------------------------------------------------------------------------------- + +This project includes code from the jemalloc project + +https://github.com/jemalloc/jemalloc + +Copyright (C) 2002-2017 Jason Evans . +All rights reserved. +Copyright (C) 2007-2012 Mozilla Foundation. All rights reserved. +Copyright (C) 2009-2017 Facebook, Inc. All rights reserved. + +Redistribution and use in source and binary forms, with or without +modification, are permitted provided that the following conditions are met: +1. Redistributions of source code must retain the above copyright notice(s), + this list of conditions and the following disclaimer. +2. Redistributions in binary form must reproduce the above copyright notice(s), + this list of conditions and the following disclaimer in the documentation + and/or other materials provided with the distribution. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDER(S) ``AS IS'' AND ANY EXPRESS +OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF +MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO +EVENT SHALL THE COPYRIGHT HOLDER(S) BE LIABLE FOR ANY DIRECT, INDIRECT, +INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR +PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF +LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE +OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF +ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +-------------------------------------------------------------------------------- diff --git a/c_glib/arrow-glib/record-batch.cpp b/c_glib/arrow-glib/record-batch.cpp index f381af0a2c2..f23a0cf7582 100644 --- a/c_glib/arrow-glib/record-batch.cpp +++ b/c_glib/arrow-glib/record-batch.cpp @@ -150,9 +150,8 @@ garrow_record_batch_new(GArrowSchema *schema, } auto arrow_record_batch = - std::make_shared(garrow_schema_get_raw(schema), - n_rows, - arrow_columns); + arrow::RecordBatch::Make(garrow_schema_get_raw(schema), + n_rows, arrow_columns); return garrow_record_batch_new_raw(&arrow_record_batch); } diff --git a/c_glib/arrow-glib/table.cpp b/c_glib/arrow-glib/table.cpp index 779f2ef62b8..e086396f8f9 100644 --- a/c_glib/arrow-glib/table.cpp +++ b/c_glib/arrow-glib/table.cpp @@ -143,8 +143,7 @@ garrow_table_new(GArrowSchema *schema, } auto arrow_table = - std::make_shared(garrow_schema_get_raw(schema), - arrow_columns); + arrow::Table::Make(garrow_schema_get_raw(schema), arrow_columns); return garrow_table_new_raw(&arrow_table); } diff --git a/c_glib/test/test-file-writer.rb b/c_glib/test/test-file-writer.rb index 3de8e5cf34b..67aed85f73b 100644 --- a/c_glib/test/test-file-writer.rb +++ b/c_glib/test/test-file-writer.rb @@ -19,14 +19,18 @@ class TestFileWriter < Test::Unit::TestCase include Helper::Buildable def test_write_record_batch + data = [true] + field = Arrow::Field.new("enabled", Arrow::BooleanDataType.new) + schema = Arrow::Schema.new([field]) + tempfile = Tempfile.open("arrow-ipc-file-writer") output = Arrow::FileOutputStream.new(tempfile.path, false) begin - field = Arrow::Field.new("enabled", Arrow::BooleanDataType.new) - schema = Arrow::Schema.new([field]) file_writer = Arrow::RecordBatchFileWriter.new(output, schema) begin - record_batch = Arrow::RecordBatch.new(schema, 0, []) + record_batch = Arrow::RecordBatch.new(schema, + data.size, + [build_boolean_array(data)]) file_writer.write_record_batch(record_batch) ensure file_writer.close @@ -38,8 +42,12 @@ def test_write_record_batch input = Arrow::MemoryMappedInputStream.new(tempfile.path) begin file_reader = Arrow::RecordBatchFileReader.new(input) - assert_equal(["enabled"], + assert_equal([field.name], file_reader.schema.fields.collect(&:name)) + assert_equal(Arrow::RecordBatch.new(schema, + data.size, + [build_boolean_array(data)]), + file_reader.read_record_batch(0)) ensure input.close end diff --git a/c_glib/test/test-gio-input-stream.rb b/c_glib/test/test-gio-input-stream.rb index a71a370430e..2adf25b3af5 100644 --- a/c_glib/test/test-gio-input-stream.rb +++ b/c_glib/test/test-gio-input-stream.rb @@ -16,15 +16,21 @@ # under the License. class TestGIOInputStream < Test::Unit::TestCase + include Helper::Buildable + def test_reader_backend + data = [true] + field = Arrow::Field.new("enabled", Arrow::BooleanDataType.new) + schema = Arrow::Schema.new([field]) + tempfile = Tempfile.open("arrow-gio-input-stream") output = Arrow::FileOutputStream.new(tempfile.path, false) begin - field = Arrow::Field.new("enabled", Arrow::BooleanDataType.new) - schema = Arrow::Schema.new([field]) file_writer = Arrow::RecordBatchFileWriter.new(output, schema) begin - record_batch = Arrow::RecordBatch.new(schema, 0, []) + record_batch = Arrow::RecordBatch.new(schema, + data.size, + [build_boolean_array(data)]) file_writer.write_record_batch(record_batch) ensure file_writer.close @@ -38,8 +44,12 @@ def test_reader_backend input = Arrow::GIOInputStream.new(input_stream) begin file_reader = Arrow::RecordBatchFileReader.new(input) - assert_equal(["enabled"], + assert_equal([field.name], file_reader.schema.fields.collect(&:name)) + assert_equal(Arrow::RecordBatch.new(schema, + data.size, + [build_boolean_array(data)]), + file_reader.read_record_batch(0)) ensure input.close end diff --git a/c_glib/test/test-gio-output-stream.rb b/c_glib/test/test-gio-output-stream.rb index adaa8c1b7b2..c77598ed110 100644 --- a/c_glib/test/test-gio-output-stream.rb +++ b/c_glib/test/test-gio-output-stream.rb @@ -16,17 +16,23 @@ # under the License. class TestGIOOutputStream < Test::Unit::TestCase + include Helper::Buildable + def test_writer_backend + data = [true] + field = Arrow::Field.new("enabled", Arrow::BooleanDataType.new) + schema = Arrow::Schema.new([field]) + tempfile = Tempfile.open("arrow-gio-output-stream") file = Gio::File.new_for_path(tempfile.path) output_stream = file.append_to(:none) output = Arrow::GIOOutputStream.new(output_stream) begin - field = Arrow::Field.new("enabled", Arrow::BooleanDataType.new) - schema = Arrow::Schema.new([field]) file_writer = Arrow::RecordBatchFileWriter.new(output, schema) begin - record_batch = Arrow::RecordBatch.new(schema, 0, []) + record_batch = Arrow::RecordBatch.new(schema, + data.size, + [build_boolean_array(data)]) file_writer.write_record_batch(record_batch) ensure file_writer.close @@ -38,8 +44,12 @@ def test_writer_backend input = Arrow::MemoryMappedInputStream.new(tempfile.path) begin file_reader = Arrow::RecordBatchFileReader.new(input) - assert_equal(["enabled"], + assert_equal([field.name], file_reader.schema.fields.collect(&:name)) + assert_equal(Arrow::RecordBatch.new(schema, + data.size, + [build_boolean_array(data)]), + file_reader.read_record_batch(0)) ensure input.close end diff --git a/c_glib/test/test-stream-writer.rb b/c_glib/test/test-stream-writer.rb index c3d0e1490ce..32754e20838 100644 --- a/c_glib/test/test-stream-writer.rb +++ b/c_glib/test/test-stream-writer.rb @@ -19,17 +19,19 @@ class TestStreamWriter < Test::Unit::TestCase include Helper::Buildable def test_write_record_batch + data = [true] + field = Arrow::Field.new("enabled", Arrow::BooleanDataType.new) + schema = Arrow::Schema.new([field]) + tempfile = Tempfile.open("arrow-ipc-stream-writer") output = Arrow::FileOutputStream.new(tempfile.path, false) begin - field = Arrow::Field.new("enabled", Arrow::BooleanDataType.new) - schema = Arrow::Schema.new([field]) stream_writer = Arrow::RecordBatchStreamWriter.new(output, schema) begin columns = [ - build_boolean_array([true]), + build_boolean_array(data), ] - record_batch = Arrow::RecordBatch.new(schema, 1, columns) + record_batch = Arrow::RecordBatch.new(schema, data.size, columns) stream_writer.write_record_batch(record_batch) ensure stream_writer.close @@ -41,10 +43,12 @@ def test_write_record_batch input = Arrow::MemoryMappedInputStream.new(tempfile.path) begin stream_reader = Arrow::RecordBatchStreamReader.new(input) - assert_equal(["enabled"], + assert_equal([field.name], stream_reader.schema.fields.collect(&:name)) - assert_equal(true, - stream_reader.read_next.get_column(0).get_value(0)) + assert_equal(Arrow::RecordBatch.new(schema, + data.size, + [build_boolean_array(data)]), + stream_reader.read_next) assert_nil(stream_reader.read_next) ensure input.close diff --git a/ci/travis_before_script_cpp.sh b/ci/travis_before_script_cpp.sh index 4998f190f98..664f7ce5fed 100755 --- a/ci/travis_before_script_cpp.sh +++ b/ci/travis_before_script_cpp.sh @@ -91,12 +91,14 @@ fi if [ $TRAVIS_OS_NAME == "linux" ]; then cmake $CMAKE_COMMON_FLAGS \ $CMAKE_LINUX_FLAGS \ - -DBUILD_WARNING_LEVEL=CHECKIN \ + -DCMAKE_BUILD_TYPE=$ARROW_BUILD_TYPE \ + -DBUILD_WARNING_LEVEL=$ARROW_BUILD_WARNING_LEVEL \ $ARROW_CPP_DIR else cmake $CMAKE_COMMON_FLAGS \ $CMAKE_OSX_FLAGS \ - -DBUILD_WARNING_LEVEL=CHECKIN \ + -DCMAKE_BUILD_TYPE=$ARROW_BUILD_TYPE \ + -DBUILD_WARNING_LEVEL=$ARROW_BUILD_WARNING_LEVEL \ $ARROW_CPP_DIR fi diff --git a/ci/travis_env_common.sh b/ci/travis_env_common.sh index 52c7da4e017..21b6e266ea6 100755 --- a/ci/travis_env_common.sh +++ b/ci/travis_env_common.sh @@ -38,6 +38,9 @@ export ARROW_PYTHON_PARQUET_HOME=$TRAVIS_BUILD_DIR/parquet-env export CMAKE_EXPORT_COMPILE_COMMANDS=1 +export ARROW_BUILD_TYPE=${ARROW_BUILD_TYPE:=debug} +export ARROW_BUILD_WARNING_LEVEL=${ARROW_BUILD_WARNING_LEVEL:=Production} + if [ "$ARROW_TRAVIS_USE_TOOLCHAIN" == "1" ]; then # C++ toolchain export CPP_TOOLCHAIN=$TRAVIS_BUILD_DIR/cpp-toolchain diff --git a/ci/travis_script_python.sh b/ci/travis_script_python.sh index 603201bcc16..5f7b0a9a1af 100755 --- a/ci/travis_script_python.sh +++ b/ci/travis_script_python.sh @@ -63,6 +63,7 @@ cmake -GNinja \ -DARROW_BUILD_UTILITIES=off \ -DARROW_PLASMA=on \ -DARROW_PYTHON=on \ + -DCMAKE_BUILD_TYPE=$ARROW_BUILD_TYPE \ -DCMAKE_INSTALL_PREFIX=$ARROW_HOME \ $ARROW_CPP_DIR @@ -78,6 +79,8 @@ if [ "$PYTHON_VERSION" == "2.7" ]; then pip install futures fi +export PYARROW_BUILD_TYPE=$ARROW_BUILD_TYPE + pip install -r requirements.txt python setup.py build_ext --with-parquet --with-plasma \ install --single-version-externally-managed --record=record.text diff --git a/cpp/cmake_modules/ThirdpartyToolchain.cmake b/cpp/cmake_modules/ThirdpartyToolchain.cmake index 42d7eddc9c9..411cf7584f7 100644 --- a/cpp/cmake_modules/ThirdpartyToolchain.cmake +++ b/cpp/cmake_modules/ThirdpartyToolchain.cmake @@ -24,7 +24,7 @@ set(GFLAGS_VERSION "2.2.0") set(GTEST_VERSION "1.8.0") set(GBENCHMARK_VERSION "1.1.0") set(FLATBUFFERS_VERSION "1.7.1") -set(JEMALLOC_VERSION "4.4.0") +set(JEMALLOC_VERSION "17c897976c60b0e6e4f4a365c751027244dada7a") set(SNAPPY_VERSION "1.1.3") set(BROTLI_VERSION "v0.6.0") set(LZ4_VERSION "1.7.5") @@ -471,8 +471,8 @@ if (ARROW_JEMALLOC) set(JEMALLOC_STATIC_LIB "${JEMALLOC_PREFIX}/lib/libjemalloc_pic${CMAKE_STATIC_LIBRARY_SUFFIX}") set(JEMALLOC_VENDORED 1) ExternalProject_Add(jemalloc_ep - URL https://github.com/jemalloc/jemalloc/releases/download/${JEMALLOC_VERSION}/jemalloc-${JEMALLOC_VERSION}.tar.bz2 - CONFIGURE_COMMAND ./configure "--prefix=${JEMALLOC_PREFIX}" "--with-jemalloc-prefix=je_arrow_" "--with-private-namespace=je_arrow_private_" + URL ${CMAKE_CURRENT_SOURCE_DIR}/thirdparty/jemalloc/${JEMALLOC_VERSION}.tar.gz + CONFIGURE_COMMAND ./autogen.sh "--prefix=${JEMALLOC_PREFIX}" "--with-jemalloc-prefix=je_arrow_" "--with-private-namespace=je_arrow_private_" && touch doc/jemalloc.html && touch doc/jemalloc.3 ${EP_LOG_OPTIONS} BUILD_IN_SOURCE 1 BUILD_COMMAND ${MAKE} diff --git a/cpp/src/arrow/CMakeLists.txt b/cpp/src/arrow/CMakeLists.txt index 496e0da9d62..94705781fa4 100644 --- a/cpp/src/arrow/CMakeLists.txt +++ b/cpp/src/arrow/CMakeLists.txt @@ -22,6 +22,7 @@ set(ARROW_SRCS compare.cc memory_pool.cc pretty_print.cc + record_batch.cc status.cc table.cc table_builder.cc @@ -144,6 +145,7 @@ install(FILES compare.h memory_pool.h pretty_print.h + record_batch.h status.h table.h table_builder.h diff --git a/cpp/src/arrow/api.h b/cpp/src/arrow/api.h index 5d2e859f3a4..7cae8414a77 100644 --- a/cpp/src/arrow/api.h +++ b/cpp/src/arrow/api.h @@ -26,6 +26,7 @@ #include "arrow/compare.h" #include "arrow/memory_pool.h" #include "arrow/pretty_print.h" +#include "arrow/record_batch.h" #include "arrow/status.h" #include "arrow/table.h" #include "arrow/table_builder.h" diff --git a/cpp/src/arrow/array.h b/cpp/src/arrow/array.h index 28756a6abda..dda9dd38be4 100644 --- a/cpp/src/arrow/array.h +++ b/cpp/src/arrow/array.h @@ -279,6 +279,8 @@ class ARROW_EXPORT Array { ARROW_DISALLOW_COPY_AND_ASSIGN(Array); }; +using ArrayVector = std::vector>; + static inline std::ostream& operator<<(std::ostream& os, const Array& x) { os << x.ToString(); return os; diff --git a/cpp/src/arrow/builder.cc b/cpp/src/arrow/builder.cc index 3e213fcd5ca..a42f9024545 100644 --- a/cpp/src/arrow/builder.cc +++ b/cpp/src/arrow/builder.cc @@ -28,7 +28,6 @@ #include "arrow/buffer.h" #include "arrow/compare.h" #include "arrow/status.h" -#include "arrow/table.h" #include "arrow/type.h" #include "arrow/type_traits.h" #include "arrow/util/bit-util.h" diff --git a/cpp/src/arrow/builder.h b/cpp/src/arrow/builder.h index 32741b53ac4..e59e166580a 100644 --- a/cpp/src/arrow/builder.h +++ b/cpp/src/arrow/builder.h @@ -29,7 +29,6 @@ #include "arrow/buffer.h" #include "arrow/memory_pool.h" #include "arrow/status.h" -#include "arrow/table.h" #include "arrow/type.h" #include "arrow/type_traits.h" #include "arrow/util/bit-util.h" diff --git a/cpp/src/arrow/column-benchmark.cc b/cpp/src/arrow/column-benchmark.cc index e50ddf6d703..af2c368c329 100644 --- a/cpp/src/arrow/column-benchmark.cc +++ b/cpp/src/arrow/column-benchmark.cc @@ -19,6 +19,7 @@ #include "arrow/array.h" #include "arrow/memory_pool.h" +#include "arrow/table.h" #include "arrow/test-util.h" namespace arrow { diff --git a/cpp/src/arrow/compute/compute-test.cc b/cpp/src/arrow/compute/compute-test.cc index 58a991c60f2..fa408ae40cd 100644 --- a/cpp/src/arrow/compute/compute-test.cc +++ b/cpp/src/arrow/compute/compute-test.cc @@ -697,7 +697,7 @@ TEST_F(TestCast, PreallocatedMemory) { out_data->buffers.push_back(out_values); Datum out(out_data); - ASSERT_OK(kernel->Call(&this->ctx_, *arr->data(), &out)); + ASSERT_OK(kernel->Call(&this->ctx_, Datum(arr), &out)); // Buffer address unchanged ASSERT_EQ(out_values.get(), out_data->buffers[1].get()); diff --git a/cpp/src/arrow/compute/kernel.h b/cpp/src/arrow/compute/kernel.h index 0037245d610..0bfa55cfee1 100644 --- a/cpp/src/arrow/compute/kernel.h +++ b/cpp/src/arrow/compute/kernel.h @@ -22,6 +22,7 @@ #include #include "arrow/array.h" +#include "arrow/record_batch.h" #include "arrow/table.h" #include "arrow/util/macros.h" #include "arrow/util/variant.h" @@ -131,7 +132,7 @@ struct ARROW_EXPORT Datum { /// \brief An array-valued function of a single input argument class ARROW_EXPORT UnaryKernel : public OpKernel { public: - virtual Status Call(FunctionContext* ctx, const ArrayData& input, Datum* out) = 0; + virtual Status Call(FunctionContext* ctx, const Datum& input, Datum* out) = 0; }; } // namespace compute diff --git a/cpp/src/arrow/compute/kernels/cast.cc b/cpp/src/arrow/compute/kernels/cast.cc index c866054eadd..d595d2ea507 100644 --- a/cpp/src/arrow/compute/kernels/cast.cc +++ b/cpp/src/arrow/compute/kernels/cast.cc @@ -740,20 +740,23 @@ class CastKernel : public UnaryKernel { can_pre_allocate_values_(can_pre_allocate_values), out_type_(out_type) {} - Status Call(FunctionContext* ctx, const ArrayData& input, Datum* out) override { + Status Call(FunctionContext* ctx, const Datum& input, Datum* out) override { + DCHECK_EQ(Datum::ARRAY, input.kind()); + + const ArrayData& in_data = *input.array(); ArrayData* result; if (out->kind() == Datum::NONE) { - out->value = std::make_shared(out_type_, input.length); + out->value = std::make_shared(out_type_, in_data.length); } result = out->array().get(); if (!is_zero_copy_) { RETURN_NOT_OK( - AllocateIfNotPreallocated(ctx, input, can_pre_allocate_values_, result)); + AllocateIfNotPreallocated(ctx, in_data, can_pre_allocate_values_, result)); } - func_(ctx, options_, input, result); + func_(ctx, options_, in_data, result); RETURN_IF_ERROR(ctx); return Status::OK(); diff --git a/cpp/src/arrow/compute/kernels/hash.cc b/cpp/src/arrow/compute/kernels/hash.cc index 3af41609fef..95f03993215 100644 --- a/cpp/src/arrow/compute/kernels/hash.cc +++ b/cpp/src/arrow/compute/kernels/hash.cc @@ -658,8 +658,9 @@ class HashKernelImpl : public HashKernel { explicit HashKernelImpl(std::unique_ptr hasher) : hasher_(std::move(hasher)) {} - Status Call(FunctionContext* ctx, const ArrayData& input, Datum* out) override { - RETURN_NOT_OK(Append(ctx, input)); + Status Call(FunctionContext* ctx, const Datum& input, Datum* out) override { + DCHECK_EQ(Datum::ARRAY, input.kind()); + RETURN_NOT_OK(Append(ctx, *input.array())); return Flush(out); } diff --git a/cpp/src/arrow/compute/kernels/util-internal.cc b/cpp/src/arrow/compute/kernels/util-internal.cc index df68637e088..28428bfcba6 100644 --- a/cpp/src/arrow/compute/kernels/util-internal.cc +++ b/cpp/src/arrow/compute/kernels/util-internal.cc @@ -34,13 +34,13 @@ Status InvokeUnaryArrayKernel(FunctionContext* ctx, UnaryKernel* kernel, const Datum& value, std::vector* outputs) { if (value.kind() == Datum::ARRAY) { Datum output; - RETURN_NOT_OK(kernel->Call(ctx, *value.array(), &output)); + RETURN_NOT_OK(kernel->Call(ctx, value, &output)); outputs->push_back(output); } else if (value.kind() == Datum::CHUNKED_ARRAY) { const ChunkedArray& array = *value.chunked_array(); for (int i = 0; i < array.num_chunks(); i++) { Datum output; - RETURN_NOT_OK(kernel->Call(ctx, *(array.chunk(i)->data()), &output)); + RETURN_NOT_OK(kernel->Call(ctx, Datum(array.chunk(i)), &output)); outputs->push_back(output); } } else { diff --git a/cpp/src/arrow/gpu/cuda_arrow_ipc.cc b/cpp/src/arrow/gpu/cuda_arrow_ipc.cc index 022268e0347..a7262c8b4d4 100644 --- a/cpp/src/arrow/gpu/cuda_arrow_ipc.cc +++ b/cpp/src/arrow/gpu/cuda_arrow_ipc.cc @@ -27,8 +27,8 @@ #include "arrow/ipc/message.h" #include "arrow/ipc/reader.h" #include "arrow/ipc/writer.h" +#include "arrow/record_batch.h" #include "arrow/status.h" -#include "arrow/table.h" #include "arrow/util/visibility.h" #include "arrow/gpu/cuda_context.h" diff --git a/cpp/src/arrow/ipc/feather-test.cc b/cpp/src/arrow/ipc/feather-test.cc index 6bd16462df9..e3de17f1f75 100644 --- a/cpp/src/arrow/ipc/feather-test.cc +++ b/cpp/src/arrow/ipc/feather-test.cc @@ -29,6 +29,7 @@ #include "arrow/ipc/feather.h" #include "arrow/ipc/test-common.h" #include "arrow/pretty_print.h" +#include "arrow/table.h" #include "arrow/test-util.h" namespace arrow { @@ -376,8 +377,8 @@ TEST_F(TestTableWriter, TimeTypes) { schema->field(i)->type(), values->length(), buffers, values->null_count(), 0)); } - RecordBatch batch(schema, values->length(), std::move(arrays)); - CheckBatch(batch); + auto batch = RecordBatch::Make(schema, values->length(), std::move(arrays)); + CheckBatch(*batch); } TEST_F(TestTableWriter, VLenPrimitiveRoundTrip) { diff --git a/cpp/src/arrow/ipc/feather.cc b/cpp/src/arrow/ipc/feather.cc index cea720bd01b..077dc39305b 100644 --- a/cpp/src/arrow/ipc/feather.cc +++ b/cpp/src/arrow/ipc/feather.cc @@ -32,6 +32,7 @@ #include "arrow/ipc/feather-internal.h" #include "arrow/ipc/feather_generated.h" #include "arrow/ipc/util.h" // IWYU pragma: keep +#include "arrow/record_batch.h" #include "arrow/status.h" #include "arrow/table.h" #include "arrow/type.h" diff --git a/cpp/src/arrow/ipc/ipc-json-test.cc b/cpp/src/arrow/ipc/ipc-json-test.cc index a560f09d6fd..e496826f96b 100644 --- a/cpp/src/arrow/ipc/ipc-json-test.cc +++ b/cpp/src/arrow/ipc/ipc-json-test.cc @@ -31,8 +31,8 @@ #include "arrow/ipc/json.h" #include "arrow/ipc/test-common.h" #include "arrow/memory_pool.h" +#include "arrow/record_batch.h" #include "arrow/status.h" -#include "arrow/table.h" #include "arrow/test-util.h" #include "arrow/type.h" #include "arrow/type_traits.h" @@ -269,7 +269,7 @@ TEST(TestJsonFileReadWrite, BasicRoundTrip) { std::vector> arrays; MakeBatchArrays(schema, num_rows, &arrays); - auto batch = std::make_shared(schema, num_rows, arrays); + auto batch = RecordBatch::Make(schema, num_rows, arrays); batches.push_back(batch); ASSERT_OK(writer->WriteRecordBatch(*batch)); } diff --git a/cpp/src/arrow/ipc/ipc-read-write-benchmark.cc b/cpp/src/arrow/ipc/ipc-read-write-benchmark.cc index 9ed0abde651..8561fb86037 100644 --- a/cpp/src/arrow/ipc/ipc-read-write-benchmark.cc +++ b/cpp/src/arrow/ipc/ipc-read-write-benchmark.cc @@ -63,7 +63,7 @@ std::shared_ptr MakeRecordBatch(int64_t total_size, int64_t num_fie } auto schema = std::make_shared(fields); - return std::make_shared(schema, length, arrays); + return RecordBatch::Make(schema, length, arrays); } static void BM_WriteRecordBatch(benchmark::State& state) { // NOLINT non-const reference diff --git a/cpp/src/arrow/ipc/ipc-read-write-test.cc b/cpp/src/arrow/ipc/ipc-read-write-test.cc index 40cd3f0eef0..1fcbdac5ebc 100644 --- a/cpp/src/arrow/ipc/ipc-read-write-test.cc +++ b/cpp/src/arrow/ipc/ipc-read-write-test.cc @@ -197,8 +197,8 @@ class IpcTestFixture : public io::MemoryMapFixture { std::vector> fields = {f0}; auto schema = std::make_shared(fields); - RecordBatch batch(schema, 0, {array}); - CheckRoundtrip(batch, buffer_size); + auto batch = RecordBatch::Make(schema, 0, {array}); + CheckRoundtrip(*batch, buffer_size); } protected: @@ -292,13 +292,13 @@ TEST_F(TestWriteRecordBatch, SliceTruncatesBuffers) { auto CheckArray = [this](const std::shared_ptr& array) { auto f0 = field("f0", array->type()); auto schema = ::arrow::schema({f0}); - RecordBatch batch(schema, array->length(), {array}); - auto sliced_batch = batch.Slice(0, 5); + auto batch = RecordBatch::Make(schema, array->length(), {array}); + auto sliced_batch = batch->Slice(0, 5); int64_t full_size; int64_t sliced_size; - ASSERT_OK(GetRecordBatchSize(batch, &full_size)); + ASSERT_OK(GetRecordBatchSize(*batch, &full_size)); ASSERT_OK(GetRecordBatchSize(*sliced_batch, &sliced_size)); ASSERT_TRUE(sliced_size < full_size) << sliced_size << " " << full_size; @@ -411,8 +411,7 @@ class RecursionLimits : public ::testing::Test, public io::MemoryMapFixture { *schema = ::arrow::schema({f0}); - std::vector> arrays = {array}; - *batch = std::make_shared(*schema, batch_length, arrays); + *batch = RecordBatch::Make(*schema, batch_length, {array}); std::stringstream ss; ss << "test-write-past-max-recursion-" << g_file_number++; @@ -632,7 +631,7 @@ TEST_F(TestIpcRoundTrip, LargeRecordBatch) { std::vector> fields = {f0}; auto schema = std::make_shared(fields); - RecordBatch batch(schema, length, {array}); + auto batch = RecordBatch::Make(schema, length, {array}); std::string path = "test-write-large-record_batch"; @@ -641,8 +640,8 @@ TEST_F(TestIpcRoundTrip, LargeRecordBatch) { ASSERT_OK(io::MemoryMapFixture::InitMemoryMap(kBufferSize, path, &mmap_)); std::shared_ptr result; - ASSERT_OK(DoLargeRoundTrip(batch, false, &result)); - CheckReadResult(*result, batch); + ASSERT_OK(DoLargeRoundTrip(*batch, false, &result)); + CheckReadResult(*result, *batch); ASSERT_EQ(length, result->num_rows()); } diff --git a/cpp/src/arrow/ipc/json-integration-test.cc b/cpp/src/arrow/ipc/json-integration-test.cc index c7530a467b3..f487487dfda 100644 --- a/cpp/src/arrow/ipc/json-integration-test.cc +++ b/cpp/src/arrow/ipc/json-integration-test.cc @@ -34,8 +34,8 @@ #include "arrow/ipc/reader.h" #include "arrow/ipc/writer.h" #include "arrow/pretty_print.h" +#include "arrow/record_batch.h" #include "arrow/status.h" -#include "arrow/table.h" #include "arrow/test-util.h" #include "arrow/type.h" diff --git a/cpp/src/arrow/ipc/json-internal.cc b/cpp/src/arrow/ipc/json-internal.cc index bdf1ef52b40..bfb3d282d87 100644 --- a/cpp/src/arrow/ipc/json-internal.cc +++ b/cpp/src/arrow/ipc/json-internal.cc @@ -28,8 +28,8 @@ #include "arrow/array.h" #include "arrow/builder.h" #include "arrow/ipc/dictionary.h" +#include "arrow/record_batch.h" #include "arrow/status.h" -#include "arrow/table.h" #include "arrow/type.h" #include "arrow/type_traits.h" #include "arrow/util/bit-util.h" @@ -125,8 +125,8 @@ class SchemaWriter { // Make a dummy record batch. A bit tedious as we have to make a schema auto schema = ::arrow::schema({arrow::field("dictionary", dictionary->type())}); - RecordBatch batch(schema, dictionary->length(), {dictionary}); - RETURN_NOT_OK(WriteRecordBatch(batch, writer_)); + auto batch = RecordBatch::Make(schema, dictionary->length(), {dictionary}); + RETURN_NOT_OK(WriteRecordBatch(*batch, writer_)); writer_->EndObject(); return Status::OK(); } @@ -1435,7 +1435,7 @@ Status ReadRecordBatch(const rj::Value& json_obj, const std::shared_ptr& RETURN_NOT_OK(ReadArray(pool, json_columns[i], type, &columns[i])); } - *batch = std::make_shared(schema, num_rows, columns); + *batch = RecordBatch::Make(schema, num_rows, columns); return Status::OK(); } diff --git a/cpp/src/arrow/ipc/json.cc b/cpp/src/arrow/ipc/json.cc index 30a1bb81e1a..ea2947d5d4c 100644 --- a/cpp/src/arrow/ipc/json.cc +++ b/cpp/src/arrow/ipc/json.cc @@ -24,8 +24,8 @@ #include "arrow/buffer.h" #include "arrow/ipc/json-internal.h" #include "arrow/memory_pool.h" +#include "arrow/record_batch.h" #include "arrow/status.h" -#include "arrow/table.h" #include "arrow/type.h" #include "arrow/util/logging.h" diff --git a/cpp/src/arrow/ipc/reader.cc b/cpp/src/arrow/ipc/reader.cc index 8e10d7d66f9..5960e81883d 100644 --- a/cpp/src/arrow/ipc/reader.cc +++ b/cpp/src/arrow/ipc/reader.cc @@ -37,8 +37,8 @@ #include "arrow/ipc/message.h" #include "arrow/ipc/metadata-internal.h" #include "arrow/ipc/util.h" +#include "arrow/record_batch.h" #include "arrow/status.h" -#include "arrow/table.h" #include "arrow/tensor.h" #include "arrow/type.h" #include "arrow/util/bit-util.h" @@ -307,7 +307,7 @@ static Status LoadRecordBatchFromSource(const std::shared_ptr& schema, arrays[i] = std::move(arr); } - *out = std::make_shared(schema, num_rows, std::move(arrays)); + *out = RecordBatch::Make(schema, num_rows, std::move(arrays)); return Status::OK(); } diff --git a/cpp/src/arrow/ipc/reader.h b/cpp/src/arrow/ipc/reader.h index 7581fbda5b1..627f67e2517 100644 --- a/cpp/src/arrow/ipc/reader.h +++ b/cpp/src/arrow/ipc/reader.h @@ -24,13 +24,12 @@ #include #include "arrow/ipc/message.h" -#include "arrow/table.h" +#include "arrow/record_batch.h" #include "arrow/util/visibility.h" namespace arrow { class Buffer; -class RecordBatch; class Schema; class Status; class Tensor; diff --git a/cpp/src/arrow/ipc/test-common.h b/cpp/src/arrow/ipc/test-common.h index 7fc13938105..6f8a0dcc61f 100644 --- a/cpp/src/arrow/ipc/test-common.h +++ b/cpp/src/arrow/ipc/test-common.h @@ -30,8 +30,8 @@ #include "arrow/builder.h" #include "arrow/memory_pool.h" #include "arrow/pretty_print.h" +#include "arrow/record_batch.h" #include "arrow/status.h" -#include "arrow/table.h" #include "arrow/test-util.h" #include "arrow/type.h" #include "arrow/util/bit-util.h" @@ -184,7 +184,7 @@ Status MakeBooleanBatchSized(const int length, std::shared_ptr* out std::shared_ptr a0, a1; RETURN_NOT_OK(MakeRandomBooleanArray(length, true, &a0)); RETURN_NOT_OK(MakeRandomBooleanArray(length, false, &a1)); - out->reset(new RecordBatch(schema, length, {a0, a1})); + *out = RecordBatch::Make(schema, length, {a0, a1}); return Status::OK(); } @@ -203,7 +203,7 @@ Status MakeIntBatchSized(int length, std::shared_ptr* out) { MemoryPool* pool = default_memory_pool(); RETURN_NOT_OK(MakeRandomInt32Array(length, false, pool, &a0)); RETURN_NOT_OK(MakeRandomInt32Array(length, true, pool, &a1)); - out->reset(new RecordBatch(schema, length, {a0, a1})); + *out = RecordBatch::Make(schema, length, {a0, a1}); return Status::OK(); } @@ -252,7 +252,7 @@ Status MakeStringTypesRecordBatch(std::shared_ptr* out) { auto s = MakeRandomBinaryArray(length, true, pool, &a1); RETURN_NOT_OK(s); } - out->reset(new RecordBatch(schema, length, {a0, a1})); + *out = RecordBatch::Make(schema, length, {a0, a1}); return Status::OK(); } @@ -261,7 +261,7 @@ Status MakeNullRecordBatch(std::shared_ptr* out) { auto f0 = field("f0", null()); auto schema = ::arrow::schema({f0}); std::shared_ptr a0 = std::make_shared(length); - out->reset(new RecordBatch(schema, length, {a0})); + *out = RecordBatch::Make(schema, length, {a0}); return Status::OK(); } @@ -284,7 +284,7 @@ Status MakeListRecordBatch(std::shared_ptr* out) { RETURN_NOT_OK( MakeRandomListArray(list_array, length, include_nulls, pool, &list_list_array)); RETURN_NOT_OK(MakeRandomInt32Array(length, include_nulls, pool, &flat_array)); - out->reset(new RecordBatch(schema, length, {list_array, list_list_array, flat_array})); + *out = RecordBatch::Make(schema, length, {list_array, list_list_array, flat_array}); return Status::OK(); } @@ -304,7 +304,7 @@ Status MakeZeroLengthRecordBatch(std::shared_ptr* out) { RETURN_NOT_OK( MakeRandomListArray(list_array, 0, include_nulls, pool, &list_list_array)); RETURN_NOT_OK(MakeRandomInt32Array(0, include_nulls, pool, &flat_array)); - out->reset(new RecordBatch(schema, 0, {list_array, list_list_array, flat_array})); + *out = RecordBatch::Make(schema, 0, {list_array, list_list_array, flat_array}); return Status::OK(); } @@ -327,7 +327,7 @@ Status MakeNonNullRecordBatch(std::shared_ptr* out) { RETURN_NOT_OK( MakeRandomListArray(list_array, length, include_nulls, pool, &list_list_array)); RETURN_NOT_OK(MakeRandomInt32Array(length, include_nulls, pool, &flat_array)); - out->reset(new RecordBatch(schema, length, {list_array, list_list_array, flat_array})); + *out = RecordBatch::Make(schema, length, {list_array, list_list_array, flat_array}); return Status::OK(); } @@ -347,7 +347,7 @@ Status MakeDeeplyNestedList(std::shared_ptr* out) { auto f0 = field("f0", type); auto schema = ::arrow::schema({f0}); std::vector> arrays = {array}; - out->reset(new RecordBatch(schema, batch_length, arrays)); + *out = RecordBatch::Make(schema, batch_length, arrays); return Status::OK(); } @@ -377,7 +377,7 @@ Status MakeStruct(std::shared_ptr* out) { // construct batch std::vector> arrays = {no_nulls, with_nulls}; - out->reset(new RecordBatch(schema, list_batch->num_rows(), arrays)); + *out = RecordBatch::Make(schema, list_batch->num_rows(), arrays); return Status::OK(); } @@ -445,7 +445,7 @@ Status MakeUnion(std::shared_ptr* out) { // construct batch std::vector> arrays = {sparse_no_nulls, sparse, dense}; - out->reset(new RecordBatch(schema, length, arrays)); + *out = RecordBatch::Make(schema, length, arrays); return Status::OK(); } @@ -526,7 +526,7 @@ Status MakeDictionary(std::shared_ptr* out) { std::vector> arrays = {a0, a1, a2, a3, a4}; - out->reset(new RecordBatch(schema, length, arrays)); + *out = RecordBatch::Make(schema, length, arrays); return Status::OK(); } @@ -564,7 +564,7 @@ Status MakeDictionaryFlat(std::shared_ptr* out) { {field("dict1", f0_type), field("sparse", f1_type), field("dense", f2_type)}); std::vector> arrays = {a0, a1, a2}; - out->reset(new RecordBatch(schema, length, arrays)); + *out = RecordBatch::Make(schema, length, arrays); return Status::OK(); } @@ -584,8 +584,7 @@ Status MakeDates(std::shared_ptr* out) { std::shared_ptr date64_array; ArrayFromVector(is_valid, date64_values, &date64_array); - std::vector> arrays = {date32_array, date64_array}; - *out = std::make_shared(schema, date32_array->length(), arrays); + *out = RecordBatch::Make(schema, date32_array->length(), {date32_array, date64_array}); return Status::OK(); } @@ -604,8 +603,7 @@ Status MakeTimestamps(std::shared_ptr* out) { ArrayFromVector(f1->type(), is_valid, ts_values, &a1); ArrayFromVector(f2->type(), is_valid, ts_values, &a2); - ArrayVector arrays = {a0, a1, a2}; - *out = std::make_shared(schema, a0->length(), arrays); + *out = RecordBatch::Make(schema, a0->length(), {a0, a1, a2}); return Status::OK(); } @@ -628,8 +626,7 @@ Status MakeTimes(std::shared_ptr* out) { ArrayFromVector(f2->type(), is_valid, t32_values, &a2); ArrayFromVector(f3->type(), is_valid, t64_values, &a3); - ArrayVector arrays = {a0, a1, a2, a3}; - *out = std::make_shared(schema, a0->length(), arrays); + *out = RecordBatch::Make(schema, a0->length(), {a0, a1, a2, a3}); return Status::OK(); } @@ -665,8 +662,7 @@ Status MakeFWBinary(std::shared_ptr* out) { RETURN_NOT_OK(b1.Finish(&a1)); RETURN_NOT_OK(b2.Finish(&a2)); - ArrayVector arrays = {a1, a2}; - *out = std::make_shared(schema, a1->length(), arrays); + *out = RecordBatch::Make(schema, a1->length(), {a1, a2}); return Status::OK(); } @@ -695,8 +691,7 @@ Status MakeDecimal(std::shared_ptr* out) { auto a2 = std::make_shared(f1->type(), length, data); - ArrayVector arrays = {a1, a2}; - *out = std::make_shared(schema, length, arrays); + *out = RecordBatch::Make(schema, length, {a1, a2}); return Status::OK(); } @@ -716,8 +711,7 @@ Status MakeNull(std::shared_ptr* out) { std::shared_ptr a2; ArrayFromVector(f1->type(), is_valid, int_values, &a2); - ArrayVector arrays = {a1, a2}; - *out = std::make_shared(schema, a1->length(), arrays); + *out = RecordBatch::Make(schema, a1->length(), {a1, a2}); return Status::OK(); } diff --git a/cpp/src/arrow/ipc/writer.cc b/cpp/src/arrow/ipc/writer.cc index 323116f589b..3c1db06159e 100644 --- a/cpp/src/arrow/ipc/writer.cc +++ b/cpp/src/arrow/ipc/writer.cc @@ -32,6 +32,7 @@ #include "arrow/ipc/metadata-internal.h" #include "arrow/ipc/util.h" #include "arrow/memory_pool.h" +#include "arrow/record_batch.h" #include "arrow/status.h" #include "arrow/table.h" #include "arrow/tensor.h" @@ -508,12 +509,9 @@ class DictionaryWriter : public RecordBatchSerializer { dictionary_id_ = dictionary_id; // Make a dummy record batch. A bit tedious as we have to make a schema - std::vector> fields = { - arrow::field("dictionary", dictionary->type())}; - auto schema = std::make_shared(fields); - RecordBatch batch(schema, dictionary->length(), {dictionary}); - - return RecordBatchSerializer::Write(batch, dst, metadata_length, body_length); + auto schema = arrow::schema({arrow::field("dictionary", dictionary->type())}); + auto batch = RecordBatch::Make(schema, dictionary->length(), {dictionary}); + return RecordBatchSerializer::Write(*batch, dst, metadata_length, body_length); } private: diff --git a/cpp/src/arrow/pretty_print.cc b/cpp/src/arrow/pretty_print.cc index cfbc30315fc..bd5f8ce10ea 100644 --- a/cpp/src/arrow/pretty_print.cc +++ b/cpp/src/arrow/pretty_print.cc @@ -22,8 +22,8 @@ #include "arrow/array.h" #include "arrow/pretty_print.h" +#include "arrow/record_batch.h" #include "arrow/status.h" -#include "arrow/table.h" #include "arrow/type.h" #include "arrow/type_traits.h" #include "arrow/util/logging.h" diff --git a/cpp/src/arrow/python/python-test.cc b/cpp/src/arrow/python/python-test.cc index 86391a18598..3b7d7d884ef 100644 --- a/cpp/src/arrow/python/python-test.cc +++ b/cpp/src/arrow/python/python-test.cc @@ -23,6 +23,7 @@ #include "arrow/array.h" #include "arrow/builder.h" +#include "arrow/table.h" #include "arrow/test-util.h" #include "arrow/python/arrow_to_pandas.h" @@ -81,8 +82,8 @@ TEST(PandasConversionTest, TestObjectBlockWriteFails) { std::vector> fields = {f1, f2, f3}; std::vector> cols = {arr, arr, arr}; - auto schema = std::make_shared(fields); - auto table = std::make_shared(schema, cols); + auto schema = ::arrow::schema(fields); + auto table = Table::Make(schema, cols); PyObject* out; Py_BEGIN_ALLOW_THREADS; diff --git a/cpp/src/arrow/python/python_to_arrow.cc b/cpp/src/arrow/python/python_to_arrow.cc index b0c6287f088..72cc5b6e1db 100644 --- a/cpp/src/arrow/python/python_to_arrow.cc +++ b/cpp/src/arrow/python/python_to_arrow.cc @@ -32,13 +32,15 @@ #include "arrow/builder.h" #include "arrow/io/interfaces.h" #include "arrow/ipc/writer.h" +#include "arrow/record_batch.h" +#include "arrow/tensor.h" +#include "arrow/util/logging.h" + #include "arrow/python/common.h" #include "arrow/python/helpers.h" #include "arrow/python/numpy_convert.h" #include "arrow/python/platform.h" #include "arrow/python/util/datetime.h" -#include "arrow/tensor.h" -#include "arrow/util/logging.h" constexpr int32_t kMaxRecursionDepth = 100; @@ -694,7 +696,7 @@ Status SerializeDict(PyObject* context, std::vector dicts, std::shared_ptr MakeBatch(std::shared_ptr data) { auto field = std::make_shared("list", data->type()); auto schema = ::arrow::schema({field}); - return std::shared_ptr(new RecordBatch(schema, data->length(), {data})); + return RecordBatch::Make(schema, data->length(), {data}); } Status SerializeObject(PyObject* context, PyObject* sequence, SerializedPyObject* out) { diff --git a/cpp/src/arrow/record_batch.cc b/cpp/src/arrow/record_batch.cc new file mode 100644 index 00000000000..60932bdf3e4 --- /dev/null +++ b/cpp/src/arrow/record_batch.cc @@ -0,0 +1,206 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "arrow/record_batch.h" + +#include +#include +#include +#include + +#include "arrow/array.h" +#include "arrow/status.h" +#include "arrow/type.h" +#include "arrow/util/logging.h" + +namespace arrow { + +/// \class SimpleRecordBatch +/// \brief A basic, non-lazy in-memory record batch +class SimpleRecordBatch : public RecordBatch { + public: + SimpleRecordBatch(const std::shared_ptr& schema, int64_t num_rows, + const std::vector>& columns) + : RecordBatch(schema, num_rows) { + columns_.resize(columns.size()); + boxed_columns_.resize(schema->num_fields()); + for (size_t i = 0; i < columns.size(); ++i) { + columns_[i] = columns[i]->data(); + } + } + + SimpleRecordBatch(const std::shared_ptr& schema, int64_t num_rows, + std::vector>&& columns) + : RecordBatch(schema, num_rows) { + columns_.resize(columns.size()); + boxed_columns_.resize(schema->num_fields()); + for (size_t i = 0; i < columns.size(); ++i) { + columns_[i] = columns[i]->data(); + } + } + + SimpleRecordBatch(const std::shared_ptr& schema, int64_t num_rows, + std::vector>&& columns) + : RecordBatch(schema, num_rows) { + columns_ = std::move(columns); + boxed_columns_.resize(schema->num_fields()); + } + + SimpleRecordBatch(const std::shared_ptr& schema, int64_t num_rows, + const std::vector>& columns) + : RecordBatch(schema, num_rows) { + columns_ = columns; + boxed_columns_.resize(schema->num_fields()); + } + + std::shared_ptr column(int i) const override { + if (!boxed_columns_[i]) { + boxed_columns_[i] = MakeArray(columns_[i]); + } + DCHECK(boxed_columns_[i]); + return boxed_columns_[i]; + } + + std::shared_ptr column_data(int i) const override { return columns_[i]; } + + std::shared_ptr ReplaceSchemaMetadata( + const std::shared_ptr& metadata) const override { + auto new_schema = schema_->AddMetadata(metadata); + return RecordBatch::Make(new_schema, num_rows_, columns_); + } + + std::shared_ptr Slice(int64_t offset, int64_t length) const override { + std::vector> arrays; + arrays.reserve(num_columns()); + for (const auto& field : columns_) { + int64_t col_length = std::min(field->length - offset, length); + int64_t col_offset = field->offset + offset; + + auto new_data = std::make_shared(*field); + new_data->length = col_length; + new_data->offset = col_offset; + new_data->null_count = kUnknownNullCount; + arrays.emplace_back(new_data); + } + int64_t num_rows = std::min(num_rows_ - offset, length); + return std::make_shared(schema_, num_rows, std::move(arrays)); + } + + Status Validate() const override { + if (static_cast(columns_.size()) != schema_->num_fields()) { + return Status::Invalid("Number of columns did not match schema"); + } + return RecordBatch::Validate(); + } + + private: + std::vector> columns_; + + // Caching boxed array data + mutable std::vector> boxed_columns_; +}; + +RecordBatch::RecordBatch(const std::shared_ptr& schema, int64_t num_rows) + : schema_(schema), num_rows_(num_rows) {} + +std::shared_ptr RecordBatch::Make( + const std::shared_ptr& schema, int64_t num_rows, + const std::vector>& columns) { + return std::make_shared(schema, num_rows, columns); +} + +std::shared_ptr RecordBatch::Make( + const std::shared_ptr& schema, int64_t num_rows, + std::vector>&& columns) { + return std::make_shared(schema, num_rows, std::move(columns)); +} + +std::shared_ptr RecordBatch::Make( + const std::shared_ptr& schema, int64_t num_rows, + std::vector>&& columns) { + return std::make_shared(schema, num_rows, std::move(columns)); +} + +std::shared_ptr RecordBatch::Make( + const std::shared_ptr& schema, int64_t num_rows, + const std::vector>& columns) { + return std::make_shared(schema, num_rows, columns); +} + +const std::string& RecordBatch::column_name(int i) const { + return schema_->field(i)->name(); +} + +bool RecordBatch::Equals(const RecordBatch& other) const { + if (num_columns() != other.num_columns() || num_rows_ != other.num_rows()) { + return false; + } + + for (int i = 0; i < num_columns(); ++i) { + if (!column(i)->Equals(other.column(i))) { + return false; + } + } + + return true; +} + +bool RecordBatch::ApproxEquals(const RecordBatch& other) const { + if (num_columns() != other.num_columns() || num_rows_ != other.num_rows()) { + return false; + } + + for (int i = 0; i < num_columns(); ++i) { + if (!column(i)->ApproxEquals(other.column(i))) { + return false; + } + } + + return true; +} + +std::shared_ptr RecordBatch::Slice(int64_t offset) const { + return Slice(offset, this->num_rows() - offset); +} + +Status RecordBatch::Validate() const { + for (int i = 0; i < num_columns(); ++i) { + auto arr_shared = this->column_data(i); + const ArrayData& arr = *arr_shared; + if (arr.length != num_rows_) { + std::stringstream ss; + ss << "Number of rows in column " << i << " did not match batch: " << arr.length + << " vs " << num_rows_; + return Status::Invalid(ss.str()); + } + const auto& schema_type = *schema_->field(i)->type(); + if (!arr.type->Equals(schema_type)) { + std::stringstream ss; + ss << "Column " << i << " type not match schema: " << arr.type->ToString() << " vs " + << schema_type.ToString(); + return Status::Invalid(ss.str()); + } + } + return Status::OK(); +} + +// ---------------------------------------------------------------------- +// Base record batch reader + +RecordBatchReader::~RecordBatchReader() {} + +} // namespace arrow diff --git a/cpp/src/arrow/record_batch.h b/cpp/src/arrow/record_batch.h new file mode 100644 index 00000000000..b2c4c76b3f2 --- /dev/null +++ b/cpp/src/arrow/record_batch.h @@ -0,0 +1,154 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#ifndef ARROW_RECORD_BATCH_H +#define ARROW_RECORD_BATCH_H + +#include +#include +#include +#include + +#include "arrow/array.h" +#include "arrow/type.h" +#include "arrow/util/macros.h" +#include "arrow/util/visibility.h" + +namespace arrow { + +class KeyValueMetadata; +class Status; + +/// \class RecordBatch +/// \brief Collection of equal-length arrays matching a particular Schema +/// +/// A record batch is table-like data structure that is semantically a sequence +/// of fields, each a contiguous Arrow array +class ARROW_EXPORT RecordBatch { + public: + virtual ~RecordBatch() = default; + + /// \param[in] schema The record batch schema + /// \param[in] num_rows length of fields in the record batch. Each array + /// should have the same length as num_rows + /// \param[in] columns the record batch fields as vector of arrays + static std::shared_ptr Make( + const std::shared_ptr& schema, int64_t num_rows, + const std::vector>& columns); + + /// \brief Move-based constructor for a vector of Array instances + static std::shared_ptr Make(const std::shared_ptr& schema, + int64_t num_rows, + std::vector>&& columns); + + /// \brief Construct record batch from vector of internal data structures + /// \since 0.5.0 + /// + /// This class is only provided with an rvalue-reference for the input data, + /// and is intended for internal use, or advanced users. + /// + /// \param schema the record batch schema + /// \param num_rows the number of semantic rows in the record batch. This + /// should be equal to the length of each field + /// \param columns the data for the batch's columns + static std::shared_ptr Make( + const std::shared_ptr& schema, int64_t num_rows, + std::vector>&& columns); + + /// \brief Construct record batch by copying vector of array data + /// \since 0.5.0 + static std::shared_ptr Make( + const std::shared_ptr& schema, int64_t num_rows, + const std::vector>& columns); + + /// \brief Determine if two record batches are exactly equal + /// \return true if batches are equal + bool Equals(const RecordBatch& other) const; + + /// \brief Determine if two record batches are approximately equal + bool ApproxEquals(const RecordBatch& other) const; + + // \return the table's schema + /// \return true if batches are equal + std::shared_ptr schema() const { return schema_; } + + /// \brief Retrieve an array from the record batch + /// \param[in] i field index, does not boundscheck + /// \return an Array object + virtual std::shared_ptr column(int i) const = 0; + + /// \brief Retrieve an array's internaldata from the record batch + /// \param[in] i field index, does not boundscheck + /// \return an internal ArrayData object + virtual std::shared_ptr column_data(int i) const = 0; + + virtual std::shared_ptr ReplaceSchemaMetadata( + const std::shared_ptr& metadata) const = 0; + + /// \brief Name in i-th column + const std::string& column_name(int i) const; + + /// \return the number of columns in the table + int num_columns() const { return schema_->num_fields(); } + + /// \return the number of rows (the corresponding length of each column) + int64_t num_rows() const { return num_rows_; } + + /// \brief Slice each of the arrays in the record batch + /// \param[in] offset the starting offset to slice, through end of batch + /// \return new record batch + virtual std::shared_ptr Slice(int64_t offset) const; + + /// \brief Slice each of the arrays in the record batch + /// \param[in] offset the starting offset to slice + /// \param[in] length the number of elements to slice from offset + /// \return new record batch + virtual std::shared_ptr Slice(int64_t offset, int64_t length) const = 0; + + /// \brief Check for schema or length inconsistencies + /// \return Status + virtual Status Validate() const; + + protected: + RecordBatch(const std::shared_ptr& schema, int64_t num_rows); + + std::shared_ptr schema_; + int64_t num_rows_; + + private: + ARROW_DISALLOW_COPY_AND_ASSIGN(RecordBatch); +}; + +/// \brief Abstract interface for reading stream of record batches +class ARROW_EXPORT RecordBatchReader { + public: + virtual ~RecordBatchReader(); + + /// \return the shared schema of the record batches in the stream + virtual std::shared_ptr schema() const = 0; + + /// Read the next record batch in the stream. Return null for batch when + /// reaching end of stream + /// + /// \param[out] batch the next loaded batch, null at end of stream + /// \return Status + virtual Status ReadNext(std::shared_ptr* batch) = 0; +}; + +} // namespace arrow + +#endif // ARROW_RECORD_BATCH_H diff --git a/cpp/src/arrow/table-test.cc b/cpp/src/arrow/table-test.cc index b490310c26a..e77d3aa8bbc 100644 --- a/cpp/src/arrow/table-test.cc +++ b/cpp/src/arrow/table-test.cc @@ -22,6 +22,7 @@ #include "gtest/gtest.h" #include "arrow/array.h" +#include "arrow/record_batch.h" #include "arrow/status.h" #include "arrow/table.h" #include "arrow/test-common.h" @@ -216,8 +217,8 @@ class TestTable : public TestBase { TEST_F(TestTable, EmptySchema) { auto empty_schema = ::arrow::schema({}); - table_.reset(new Table(empty_schema, columns_)); - ASSERT_OK(table_->ValidateColumns()); + table_ = Table::Make(empty_schema, columns_); + ASSERT_OK(table_->Validate()); ASSERT_EQ(0, table_->num_rows()); ASSERT_EQ(0, table_->num_columns()); } @@ -226,20 +227,20 @@ TEST_F(TestTable, Ctors) { const int length = 100; MakeExample1(length); - table_.reset(new Table(schema_, columns_)); - ASSERT_OK(table_->ValidateColumns()); + table_ = Table::Make(schema_, columns_); + ASSERT_OK(table_->Validate()); ASSERT_EQ(length, table_->num_rows()); ASSERT_EQ(3, table_->num_columns()); - auto array_ctor = std::make_shared
(schema_, arrays_); + auto array_ctor = Table::Make(schema_, arrays_); ASSERT_TRUE(table_->Equals(*array_ctor)); - table_.reset(new Table(schema_, columns_, length)); - ASSERT_OK(table_->ValidateColumns()); + table_ = Table::Make(schema_, columns_, length); + ASSERT_OK(table_->Validate()); ASSERT_EQ(length, table_->num_rows()); - ASSERT_OK(MakeTable(schema_, arrays_, &table_)); - ASSERT_OK(table_->ValidateColumns()); + table_ = Table::Make(schema_, arrays_); + ASSERT_OK(table_->Validate()); ASSERT_EQ(length, table_->num_rows()); ASSERT_EQ(3, table_->num_columns()); } @@ -248,7 +249,7 @@ TEST_F(TestTable, Metadata) { const int length = 100; MakeExample1(length); - table_.reset(new Table(schema_, columns_)); + table_ = Table::Make(schema_, columns_); ASSERT_TRUE(table_->schema()->Equals(*schema_)); @@ -262,14 +263,14 @@ TEST_F(TestTable, InvalidColumns) { const int length = 100; MakeExample1(length); - table_.reset(new Table(schema_, columns_, length - 1)); - ASSERT_RAISES(Invalid, table_->ValidateColumns()); + table_ = Table::Make(schema_, columns_, length - 1); + ASSERT_RAISES(Invalid, table_->Validate()); columns_.clear(); // Wrong number of columns - table_.reset(new Table(schema_, columns_, length)); - ASSERT_RAISES(Invalid, table_->ValidateColumns()); + table_ = Table::Make(schema_, columns_, length); + ASSERT_RAISES(Invalid, table_->Validate()); columns_ = { std::make_shared(schema_->field(0), MakeRandomArray(length)), @@ -277,15 +278,15 @@ TEST_F(TestTable, InvalidColumns) { std::make_shared(schema_->field(2), MakeRandomArray(length - 1))}; - table_.reset(new Table(schema_, columns_, length)); - ASSERT_RAISES(Invalid, table_->ValidateColumns()); + table_ = Table::Make(schema_, columns_, length); + ASSERT_RAISES(Invalid, table_->Validate()); } TEST_F(TestTable, Equals) { const int length = 100; MakeExample1(length); - table_.reset(new Table(schema_, columns_)); + table_ = Table::Make(schema_, columns_); ASSERT_TRUE(table_->Equals(*table_)); // Differing schema @@ -294,7 +295,8 @@ TEST_F(TestTable, Equals) { auto f2 = field("f5", int16()); vector> fields = {f0, f1, f2}; auto other_schema = std::make_shared(fields); - ASSERT_FALSE(table_->Equals(Table(other_schema, columns_))); + auto other = Table::Make(other_schema, columns_); + ASSERT_FALSE(table_->Equals(*other)); // Differing columns std::vector> other_columns = { std::make_shared(schema_->field(0), @@ -303,19 +305,21 @@ TEST_F(TestTable, Equals) { MakeRandomArray(length, 10)), std::make_shared(schema_->field(2), MakeRandomArray(length, 10))}; - ASSERT_FALSE(table_->Equals(Table(schema_, other_columns))); + + other = Table::Make(schema_, other_columns); + ASSERT_FALSE(table_->Equals(*other)); } TEST_F(TestTable, FromRecordBatches) { const int64_t length = 10; MakeExample1(length); - auto batch1 = std::make_shared(schema_, length, arrays_); + auto batch1 = RecordBatch::Make(schema_, length, arrays_); std::shared_ptr
result, expected; ASSERT_OK(Table::FromRecordBatches({batch1}, &result)); - expected = std::make_shared
(schema_, columns_); + expected = Table::Make(schema_, columns_); ASSERT_TRUE(result->Equals(*expected)); std::vector> other_columns; @@ -325,18 +329,17 @@ TEST_F(TestTable, FromRecordBatches) { } ASSERT_OK(Table::FromRecordBatches({batch1, batch1}, &result)); - expected = std::make_shared
(schema_, other_columns); + expected = Table::Make(schema_, other_columns); ASSERT_TRUE(result->Equals(*expected)); // Error states std::vector> empty_batches; ASSERT_RAISES(Invalid, Table::FromRecordBatches(empty_batches, &result)); - std::vector> fields = {schema_->field(0), schema_->field(1)}; - auto other_schema = std::make_shared(fields); + auto other_schema = ::arrow::schema({schema_->field(0), schema_->field(1)}); std::vector> other_arrays = {arrays_[0], arrays_[1]}; - auto batch2 = std::make_shared(other_schema, length, other_arrays); + auto batch2 = RecordBatch::Make(other_schema, length, other_arrays); ASSERT_RAISES(Invalid, Table::FromRecordBatches({batch1, batch2}, &result)); } @@ -344,11 +347,11 @@ TEST_F(TestTable, ConcatenateTables) { const int64_t length = 10; MakeExample1(length); - auto batch1 = std::make_shared(schema_, length, arrays_); + auto batch1 = RecordBatch::Make(schema_, length, arrays_); // generate different data MakeExample1(length); - auto batch2 = std::make_shared(schema_, length, arrays_); + auto batch2 = RecordBatch::Make(schema_, length, arrays_); std::shared_ptr
t1, t2, t3, result, expected; ASSERT_OK(Table::FromRecordBatches({batch1}, &t1)); @@ -362,11 +365,10 @@ TEST_F(TestTable, ConcatenateTables) { std::vector> empty_tables; ASSERT_RAISES(Invalid, ConcatenateTables(empty_tables, &result)); - std::vector> fields = {schema_->field(0), schema_->field(1)}; - auto other_schema = std::make_shared(fields); + auto other_schema = ::arrow::schema({schema_->field(0), schema_->field(1)}); std::vector> other_arrays = {arrays_[0], arrays_[1]}; - auto batch3 = std::make_shared(other_schema, length, other_arrays); + auto batch3 = RecordBatch::Make(other_schema, length, other_arrays); ASSERT_OK(Table::FromRecordBatches({batch3}, &t3)); ASSERT_RAISES(Invalid, ConcatenateTables({t1, t3}, &result)); @@ -376,31 +378,38 @@ TEST_F(TestTable, RemoveColumn) { const int64_t length = 10; MakeExample1(length); - Table table(schema_, columns_); + auto table_sp = Table::Make(schema_, columns_); + const Table& table = *table_sp; std::shared_ptr
result; ASSERT_OK(table.RemoveColumn(0, &result)); auto ex_schema = ::arrow::schema({schema_->field(1), schema_->field(2)}); std::vector> ex_columns = {table.column(1), table.column(2)}; - ASSERT_TRUE(result->Equals(Table(ex_schema, ex_columns))); + + auto expected = Table::Make(ex_schema, ex_columns); + ASSERT_TRUE(result->Equals(*expected)); ASSERT_OK(table.RemoveColumn(1, &result)); ex_schema = ::arrow::schema({schema_->field(0), schema_->field(2)}); ex_columns = {table.column(0), table.column(2)}; - ASSERT_TRUE(result->Equals(Table(ex_schema, ex_columns))); + + expected = Table::Make(ex_schema, ex_columns); + ASSERT_TRUE(result->Equals(*expected)); ASSERT_OK(table.RemoveColumn(2, &result)); ex_schema = ::arrow::schema({schema_->field(0), schema_->field(1)}); ex_columns = {table.column(0), table.column(1)}; - ASSERT_TRUE(result->Equals(Table(ex_schema, ex_columns))); + expected = Table::Make(ex_schema, ex_columns); + ASSERT_TRUE(result->Equals(*expected)); } TEST_F(TestTable, AddColumn) { const int64_t length = 10; MakeExample1(length); - Table table(schema_, columns_); + auto table_sp = Table::Make(schema_, columns_); + const Table& table = *table_sp; std::shared_ptr
result; // Some negative tests with invalid index @@ -419,50 +428,32 @@ TEST_F(TestTable, AddColumn) { ASSERT_OK(table.AddColumn(0, columns_[0], &result)); auto ex_schema = ::arrow::schema( {schema_->field(0), schema_->field(0), schema_->field(1), schema_->field(2)}); - std::vector> ex_columns = {table.column(0), table.column(0), - table.column(1), table.column(2)}; - ASSERT_TRUE(result->Equals(Table(ex_schema, ex_columns))); + + auto expected = Table::Make( + ex_schema, {table.column(0), table.column(0), table.column(1), table.column(2)}); + ASSERT_TRUE(result->Equals(*expected)); ASSERT_OK(table.AddColumn(1, columns_[0], &result)); ex_schema = ::arrow::schema( {schema_->field(0), schema_->field(0), schema_->field(1), schema_->field(2)}); - ex_columns = {table.column(0), table.column(0), table.column(1), table.column(2)}; - ASSERT_TRUE(result->Equals(Table(ex_schema, ex_columns))); + + expected = Table::Make( + ex_schema, {table.column(0), table.column(0), table.column(1), table.column(2)}); + ASSERT_TRUE(result->Equals(*expected)); ASSERT_OK(table.AddColumn(2, columns_[0], &result)); ex_schema = ::arrow::schema( {schema_->field(0), schema_->field(1), schema_->field(0), schema_->field(2)}); - ex_columns = {table.column(0), table.column(1), table.column(0), table.column(2)}; - ASSERT_TRUE(result->Equals(Table(ex_schema, ex_columns))); + expected = Table::Make( + ex_schema, {table.column(0), table.column(1), table.column(0), table.column(2)}); + ASSERT_TRUE(result->Equals(*expected)); ASSERT_OK(table.AddColumn(3, columns_[0], &result)); ex_schema = ::arrow::schema( {schema_->field(0), schema_->field(1), schema_->field(2), schema_->field(0)}); - ex_columns = {table.column(0), table.column(1), table.column(2), table.column(0)}; - ASSERT_TRUE(result->Equals(Table(ex_schema, ex_columns))); -} - -TEST_F(TestTable, IsChunked) { - ArrayVector c1, c2; - - auto a1 = MakeRandomArray(10); - auto a2 = MakeRandomArray(20); - - auto sch1 = arrow::schema({field("f1", int32()), field("f2", int32())}); - - std::vector> columns; - - std::shared_ptr batch; - - columns = {column(sch1->field(0), {a1}), column(sch1->field(1), {a1})}; - auto t1 = std::make_shared
(sch1, columns); - - ASSERT_FALSE(t1->IsChunked()); - - columns = {column(sch1->field(0), {a2}), column(sch1->field(1), {a1, a1})}; - auto t2 = std::make_shared
(sch1, columns); - - ASSERT_TRUE(t2->IsChunked()); + expected = Table::Make( + ex_schema, {table.column(0), table.column(1), table.column(2), table.column(0)}); + ASSERT_TRUE(result->Equals(*expected)); } class TestRecordBatch : public TestBase {}; @@ -475,24 +466,22 @@ TEST_F(TestRecordBatch, Equals) { auto f2 = field("f2", int16()); vector> fields = {f0, f1, f2}; - auto schema = std::make_shared(fields); + auto schema = ::arrow::schema({f0, f1, f2}); + auto schema2 = ::arrow::schema({f0, f1}); auto a0 = MakeRandomArray(length); auto a1 = MakeRandomArray(length); auto a2 = MakeRandomArray(length); - RecordBatch b1(schema, length, {a0, a1, a2}); - RecordBatch b3(schema, length, {a0, a1}); - RecordBatch b4(schema, length, {a0, a1, a1}); + auto b1 = RecordBatch::Make(schema, length, {a0, a1, a2}); + auto b3 = RecordBatch::Make(schema2, length, {a0, a1}); + auto b4 = RecordBatch::Make(schema, length, {a0, a1, a1}); - ASSERT_TRUE(b1.Equals(b1)); - ASSERT_FALSE(b1.Equals(b3)); - ASSERT_FALSE(b1.Equals(b4)); + ASSERT_TRUE(b1->Equals(*b1)); + ASSERT_FALSE(b1->Equals(*b3)); + ASSERT_FALSE(b1->Equals(*b4)); } -#ifdef NDEBUG -// In debug builds, RecordBatch ctor aborts if you construct an invalid one - TEST_F(TestRecordBatch, Validate) { const int length = 10; @@ -507,21 +496,19 @@ TEST_F(TestRecordBatch, Validate) { auto a2 = MakeRandomArray(length); auto a3 = MakeRandomArray(5); - RecordBatch b1(schema, length, {a0, a1, a2}); + auto b1 = RecordBatch::Make(schema, length, {a0, a1, a2}); - ASSERT_OK(b1.Validate()); + ASSERT_OK(b1->Validate()); // Length mismatch - RecordBatch b2(schema, length, {a0, a1, a3}); - ASSERT_RAISES(Invalid, b2.Validate()); + auto b2 = RecordBatch::Make(schema, length, {a0, a1, a3}); + ASSERT_RAISES(Invalid, b2->Validate()); // Type mismatch - RecordBatch b3(schema, length, {a0, a1, a0}); - ASSERT_RAISES(Invalid, b3.Validate()); + auto b3 = RecordBatch::Make(schema, length, {a0, a1, a0}); + ASSERT_RAISES(Invalid, b3->Validate()); } -#endif - TEST_F(TestRecordBatch, Slice) { const int length = 10; @@ -529,19 +516,19 @@ TEST_F(TestRecordBatch, Slice) { auto f1 = field("f1", uint8()); vector> fields = {f0, f1}; - auto schema = std::make_shared(fields); + auto schema = ::arrow::schema(fields); auto a0 = MakeRandomArray(length); auto a1 = MakeRandomArray(length); - RecordBatch batch(schema, length, {a0, a1}); + auto batch = RecordBatch::Make(schema, length, {a0, a1}); - auto batch_slice = batch.Slice(2); - auto batch_slice2 = batch.Slice(1, 5); + auto batch_slice = batch->Slice(2); + auto batch_slice2 = batch->Slice(1, 5); - ASSERT_EQ(batch_slice->num_rows(), batch.num_rows() - 2); + ASSERT_EQ(batch_slice->num_rows(), batch->num_rows() - 2); - for (int i = 0; i < batch.num_columns(); ++i) { + for (int i = 0; i < batch->num_columns(); ++i) { ASSERT_EQ(2, batch_slice->column(i)->offset()); ASSERT_EQ(length - 2, batch_slice->column(i)->length()); @@ -567,9 +554,9 @@ TEST_F(TestTableBatchReader, ReadNext) { std::shared_ptr batch; columns = {column(sch1->field(0), {a1, a4, a2}), column(sch1->field(1), {a2, a2})}; - Table t1(sch1, columns); + auto t1 = Table::Make(sch1, columns); - TableBatchReader i1(t1); + TableBatchReader i1(*t1); ASSERT_OK(i1.ReadNext(&batch)); ASSERT_EQ(10, batch->num_rows()); @@ -584,9 +571,9 @@ TEST_F(TestTableBatchReader, ReadNext) { ASSERT_EQ(nullptr, batch); columns = {column(sch1->field(0), {a1}), column(sch1->field(1), {a4})}; - Table t2(sch1, columns); + auto t2 = Table::Make(sch1, columns); - TableBatchReader i2(t2); + TableBatchReader i2(*t2); ASSERT_OK(i2.ReadNext(&batch)); ASSERT_EQ(10, batch->num_rows()); diff --git a/cpp/src/arrow/table.cc b/cpp/src/arrow/table.cc index fe19bf4ce0b..8f3f195765a 100644 --- a/cpp/src/arrow/table.cc +++ b/cpp/src/arrow/table.cc @@ -23,6 +23,7 @@ #include #include "arrow/array.h" +#include "arrow/record_batch.h" #include "arrow/status.h" #include "arrow/type.h" #include "arrow/util/logging.h" @@ -153,171 +154,126 @@ Status Column::ValidateData() { } // ---------------------------------------------------------------------- -// RecordBatch methods - -RecordBatch::RecordBatch(const std::shared_ptr& schema, int64_t num_rows) - : schema_(schema), num_rows_(num_rows) { - boxed_columns_.resize(schema->num_fields()); -} - -RecordBatch::RecordBatch(const std::shared_ptr& schema, int64_t num_rows, - const std::vector>& columns) - : RecordBatch(schema, num_rows) { - columns_.resize(columns.size()); - for (size_t i = 0; i < columns.size(); ++i) { - columns_[i] = columns[i]->data(); - } -} - -RecordBatch::RecordBatch(const std::shared_ptr& schema, int64_t num_rows, - std::vector>&& columns) - : RecordBatch(schema, num_rows) { - columns_.resize(columns.size()); - for (size_t i = 0; i < columns.size(); ++i) { - columns_[i] = columns[i]->data(); - } -} - -RecordBatch::RecordBatch(const std::shared_ptr& schema, int64_t num_rows, - std::vector>&& columns) - : RecordBatch(schema, num_rows) { - columns_ = std::move(columns); -} - -RecordBatch::RecordBatch(const std::shared_ptr& schema, int64_t num_rows, - const std::vector>& columns) - : RecordBatch(schema, num_rows) { - columns_ = columns; -} - -std::shared_ptr RecordBatch::column(int i) const { - if (!boxed_columns_[i]) { - boxed_columns_[i] = MakeArray(columns_[i]); - } - DCHECK(boxed_columns_[i]); - return boxed_columns_[i]; -} - -const std::string& RecordBatch::column_name(int i) const { - return schema_->field(i)->name(); -} - -bool RecordBatch::Equals(const RecordBatch& other) const { - if (num_columns() != other.num_columns() || num_rows_ != other.num_rows()) { - return false; - } +// Table methods - for (int i = 0; i < num_columns(); ++i) { - if (!column(i)->Equals(other.column(i))) { - return false; +/// \class SimpleTable +/// \brief A basic, non-lazy in-memory table, like SimpleRecordBatch +class SimpleTable : public Table { + public: + SimpleTable(const std::shared_ptr& schema, + const std::vector>& columns, int64_t num_rows = -1) + : columns_(columns) { + schema_ = schema; + if (num_rows < 0) { + if (columns.size() == 0) { + num_rows_ = 0; + } else { + num_rows_ = columns[0]->length(); + } + } else { + num_rows_ = num_rows; } } - return true; -} - -bool RecordBatch::ApproxEquals(const RecordBatch& other) const { - if (num_columns() != other.num_columns() || num_rows_ != other.num_rows()) { - return false; - } + SimpleTable(const std::shared_ptr& schema, + const std::vector>& columns, int64_t num_rows = -1) { + schema_ = schema; + if (num_rows < 0) { + if (columns.size() == 0) { + num_rows_ = 0; + } else { + num_rows_ = columns[0]->length(); + } + } else { + num_rows_ = num_rows; + } - for (int i = 0; i < num_columns(); ++i) { - if (!column(i)->ApproxEquals(other.column(i))) { - return false; + columns_.resize(columns.size()); + for (size_t i = 0; i < columns.size(); ++i) { + columns_[i] = + std::make_shared(schema->field(static_cast(i)), columns[i]); } } - return true; -} - -std::shared_ptr RecordBatch::ReplaceSchemaMetadata( - const std::shared_ptr& metadata) const { - auto new_schema = schema_->AddMetadata(metadata); - return std::make_shared(new_schema, num_rows_, columns_); -} + std::shared_ptr column(int i) const override { return columns_[i]; } -std::shared_ptr RecordBatch::Slice(int64_t offset) const { - return Slice(offset, this->num_rows() - offset); -} + Status RemoveColumn(int i, std::shared_ptr
* out) const override { + std::shared_ptr new_schema; + RETURN_NOT_OK(schema_->RemoveField(i, &new_schema)); -std::shared_ptr RecordBatch::Slice(int64_t offset, int64_t length) const { - std::vector> arrays; - arrays.reserve(num_columns()); - for (const auto& field : columns_) { - int64_t col_length = std::min(field->length - offset, length); - int64_t col_offset = field->offset + offset; - - auto new_data = std::make_shared(*field); - new_data->length = col_length; - new_data->offset = col_offset; - new_data->null_count = kUnknownNullCount; - arrays.emplace_back(new_data); + *out = Table::Make(new_schema, internal::DeleteVectorElement(columns_, i)); + return Status::OK(); } - int64_t num_rows = std::min(num_rows_ - offset, length); - return std::make_shared(schema_, num_rows, std::move(arrays)); -} -Status RecordBatch::Validate() const { - for (int i = 0; i < num_columns(); ++i) { - const ArrayData& arr = *columns_[i]; - if (arr.length != num_rows_) { + Status AddColumn(int i, const std::shared_ptr& col, + std::shared_ptr
* out) const override { + if (i < 0 || i > num_columns() + 1) { + return Status::Invalid("Invalid column index."); + } + if (col == nullptr) { std::stringstream ss; - ss << "Number of rows in column " << i << " did not match batch: " << arr.length - << " vs " << num_rows_; + ss << "Column " << i << " was null"; return Status::Invalid(ss.str()); } - const auto& schema_type = *schema_->field(i)->type(); - if (!arr.type->Equals(schema_type)) { + if (col->length() != num_rows_) { std::stringstream ss; - ss << "Column " << i << " type not match schema: " << arr.type->ToString() << " vs " - << schema_type.ToString(); + ss << "Added column's length must match table's length. Expected length " + << num_rows_ << " but got length " << col->length(); return Status::Invalid(ss.str()); } + + std::shared_ptr new_schema; + RETURN_NOT_OK(schema_->AddField(i, col->field(), &new_schema)); + + *out = Table::Make(new_schema, internal::AddVectorElement(columns_, i, col)); + return Status::OK(); } - return Status::OK(); -} -// ---------------------------------------------------------------------- -// Table methods + std::shared_ptr
ReplaceSchemaMetadata( + const std::shared_ptr& metadata) const override { + auto new_schema = schema_->AddMetadata(metadata); + return Table::Make(new_schema, columns_); + } -Table::Table(const std::shared_ptr& schema, - const std::vector>& columns, int64_t num_rows) - : schema_(schema), columns_(columns) { - if (num_rows < 0) { - if (columns.size() == 0) { - num_rows_ = 0; - } else { - num_rows_ = columns[0]->length(); + Status Validate() const override { + if (static_cast(columns_.size()) != schema_->num_fields()) { + return Status::Invalid("Number of columns did not match schema"); } - } else { - num_rows_ = num_rows; - } -} -Table::Table(const std::shared_ptr& schema, - const std::vector>& columns, int64_t num_rows) - : schema_(schema) { - if (num_rows < 0) { - if (columns.size() == 0) { - num_rows_ = 0; - } else { - num_rows_ = columns[0]->length(); + // Make sure columns are all the same length + for (int i = 0; i < num_columns(); ++i) { + const Column* col = columns_[i].get(); + if (col == nullptr) { + std::stringstream ss; + ss << "Column " << i << " was null"; + return Status::Invalid(ss.str()); + } + if (col->length() != num_rows_) { + std::stringstream ss; + ss << "Column " << i << " named " << col->name() << " expected length " + << num_rows_ << " but got length " << col->length(); + return Status::Invalid(ss.str()); + } } - } else { - num_rows_ = num_rows; + return Status::OK(); } - columns_.resize(columns.size()); - for (size_t i = 0; i < columns.size(); ++i) { - columns_[i] = - std::make_shared(schema->field(static_cast(i)), columns[i]); - } + private: + std::vector> columns_; +}; + +Table::Table() {} + +std::shared_ptr
Table::Make(const std::shared_ptr& schema, + const std::vector>& columns, + int64_t num_rows) { + return std::make_shared(schema, columns, num_rows); } -std::shared_ptr
Table::ReplaceSchemaMetadata( - const std::shared_ptr& metadata) const { - auto new_schema = schema_->AddMetadata(metadata); - return std::make_shared
(new_schema, columns_); +std::shared_ptr
Table::Make(const std::shared_ptr& schema, + const std::vector>& arrays, + int64_t num_rows) { + return std::make_shared(schema, arrays, num_rows); } Status Table::FromRecordBatches(const std::vector>& batches, @@ -351,7 +307,7 @@ Status Table::FromRecordBatches(const std::vector>& columns[i] = std::make_shared(schema->field(i), column_arrays); } - *table = std::make_shared
(schema, columns); + *table = Table::Make(schema, columns); return Status::OK(); } @@ -388,7 +344,7 @@ Status ConcatenateTables(const std::vector>& tables, } columns[i] = std::make_shared(schema->field(i), column_arrays); } - *table = std::make_shared
(schema, columns); + *table = Table::Make(schema, columns); return Status::OK(); } @@ -399,82 +355,19 @@ bool Table::Equals(const Table& other) const { if (!schema_->Equals(*other.schema())) { return false; } - if (static_cast(columns_.size()) != other.num_columns()) { + if (this->num_columns() != other.num_columns()) { return false; } - for (int i = 0; i < static_cast(columns_.size()); i++) { - if (!columns_[i]->Equals(other.column(i))) { + for (int i = 0; i < this->num_columns(); i++) { + if (!this->column(i)->Equals(other.column(i))) { return false; } } return true; } -Status Table::RemoveColumn(int i, std::shared_ptr
* out) const { - std::shared_ptr new_schema; - RETURN_NOT_OK(schema_->RemoveField(i, &new_schema)); - - *out = std::make_shared
(new_schema, internal::DeleteVectorElement(columns_, i)); - return Status::OK(); -} - -Status Table::AddColumn(int i, const std::shared_ptr& col, - std::shared_ptr
* out) const { - if (i < 0 || i > num_columns() + 1) { - return Status::Invalid("Invalid column index."); - } - if (col == nullptr) { - std::stringstream ss; - ss << "Column " << i << " was null"; - return Status::Invalid(ss.str()); - } - if (col->length() != num_rows_) { - std::stringstream ss; - ss << "Added column's length must match table's length. Expected length " << num_rows_ - << " but got length " << col->length(); - return Status::Invalid(ss.str()); - } - - std::shared_ptr new_schema; - RETURN_NOT_OK(schema_->AddField(i, col->field(), &new_schema)); - - *out = - std::make_shared
(new_schema, internal::AddVectorElement(columns_, i, col)); - return Status::OK(); -} - -Status Table::ValidateColumns() const { - if (num_columns() != schema_->num_fields()) { - return Status::Invalid("Number of columns did not match schema"); - } - - // Make sure columns are all the same length - for (size_t i = 0; i < columns_.size(); ++i) { - const Column* col = columns_[i].get(); - if (col == nullptr) { - std::stringstream ss; - ss << "Column " << i << " was null"; - return Status::Invalid(ss.str()); - } - if (col->length() != num_rows_) { - std::stringstream ss; - ss << "Column " << i << " named " << col->name() << " expected length " << num_rows_ - << " but got length " << col->length(); - return Status::Invalid(ss.str()); - } - } - return Status::OK(); -} - -bool Table::IsChunked() const { - for (size_t i = 0; i < columns_.size(); ++i) { - if (columns_[i]->data()->num_chunks() > 1) { - return true; - } - } - return false; -} +#ifndef ARROW_NO_DEPRECATED_API Status MakeTable(const std::shared_ptr& schema, const std::vector>& arrays, @@ -493,15 +386,12 @@ Status MakeTable(const std::shared_ptr& schema, columns.emplace_back(std::make_shared(schema->field(i), arrays[i])); } - *table = std::make_shared
(schema, columns); + *table = Table::Make(schema, columns); return Status::OK(); } -// ---------------------------------------------------------------------- -// Base record batch reader - -RecordBatchReader::~RecordBatchReader() {} +#endif // ARROW_NO_DEPRECATED_API // ---------------------------------------------------------------------- // Convert a table to a sequence of record batches @@ -565,8 +455,7 @@ class TableBatchReader::TableBatchReaderImpl { } absolute_row_position_ += chunksize; - *out = - std::make_shared(table_.schema(), chunksize, std::move(batch_data)); + *out = RecordBatch::Make(table_.schema(), chunksize, std::move(batch_data)); return Status::OK(); } diff --git a/cpp/src/arrow/table.h b/cpp/src/arrow/table.h index 2cff32f74ef..d0312d93cb9 100644 --- a/cpp/src/arrow/table.h +++ b/cpp/src/arrow/table.h @@ -24,6 +24,7 @@ #include #include "arrow/array.h" +#include "arrow/record_batch.h" #include "arrow/type.h" #include "arrow/util/macros.h" #include "arrow/util/visibility.h" @@ -33,8 +34,6 @@ namespace arrow { class KeyValueMetadata; class Status; -using ArrayVector = std::vector>; - /// \class ChunkedArray /// \brief A data structure managing a list of primitive Arrow arrays logically /// as one large array @@ -113,123 +112,28 @@ class ARROW_EXPORT Column { ARROW_DISALLOW_COPY_AND_ASSIGN(Column); }; -/// \class RecordBatch -/// \brief Collection of equal-length arrays matching a particular Schema -/// -/// A record batch is table-like data structure consisting of an internal -/// sequence of fields, each a contiguous Arrow array -class ARROW_EXPORT RecordBatch { - public: - /// \param[in] schema The record batch schema - /// \param[in] num_rows length of fields in the record batch. Each array - /// should have the same length as num_rows - /// \param[in] columns the record batch fields as vector of arrays - RecordBatch(const std::shared_ptr& schema, int64_t num_rows, - const std::vector>& columns); - - /// \brief Move-based constructor for a vector of Array instances - RecordBatch(const std::shared_ptr& schema, int64_t num_rows, - std::vector>&& columns); - - /// \brief Construct record batch from vector of internal data structures - /// \since 0.5.0 - /// - /// This class is only provided with an rvalue-reference for the input data, - /// and is intended for internal use, or advanced users. - /// - /// \param schema the record batch schema - /// \param num_rows the number of semantic rows in the record batch. This - /// should be equal to the length of each field - /// \param columns the data for the batch's columns - RecordBatch(const std::shared_ptr& schema, int64_t num_rows, - std::vector>&& columns); - - /// \brief Construct record batch by copying vector of array data - /// \since 0.5.0 - RecordBatch(const std::shared_ptr& schema, int64_t num_rows, - const std::vector>& columns); - - /// \brief Determine if two record batches are exactly equal - /// \return true if batches are equal - bool Equals(const RecordBatch& other) const; - - /// \brief Determine if two record batches are approximately equal - bool ApproxEquals(const RecordBatch& other) const; - - // \return the table's schema - /// \return true if batches are equal - std::shared_ptr schema() const { return schema_; } - - /// \brief Retrieve an array from the record batch - /// \param[in] i field index, does not boundscheck - /// \return an Array object - std::shared_ptr column(int i) const; - - std::shared_ptr column_data(int i) const { return columns_[i]; } - - /// \brief Name in i-th column - const std::string& column_name(int i) const; - - /// \return the number of columns in the table - int num_columns() const { return static_cast(columns_.size()); } - - /// \return the number of rows (the corresponding length of each column) - int64_t num_rows() const { return num_rows_; } - - /// \brief Replace schema key-value metadata with new metadata (EXPERIMENTAL) - /// \since 0.5.0 - /// - /// \param[in] metadata new KeyValueMetadata - /// \return new RecordBatch - std::shared_ptr ReplaceSchemaMetadata( - const std::shared_ptr& metadata) const; - - /// \brief Slice each of the arrays in the record batch - /// \param[in] offset the starting offset to slice, through end of batch - /// \return new record batch - std::shared_ptr Slice(int64_t offset) const; - - /// \brief Slice each of the arrays in the record batch - /// \param[in] offset the starting offset to slice - /// \param[in] length the number of elements to slice from offset - /// \return new record batch - std::shared_ptr Slice(int64_t offset, int64_t length) const; - - /// \brief Check for schema or length inconsistencies - /// \return Status - Status Validate() const; - - private: - ARROW_DISALLOW_COPY_AND_ASSIGN(RecordBatch); - - RecordBatch(const std::shared_ptr& schema, int64_t num_rows); - - std::shared_ptr schema_; - int64_t num_rows_; - std::vector> columns_; - - // Caching boxed array data - mutable std::vector> boxed_columns_; -}; - /// \class Table /// \brief Logical table as sequence of chunked arrays class ARROW_EXPORT Table { public: + virtual ~Table() = default; + /// \brief Construct Table from schema and columns /// If columns is zero-length, the table's number of rows is zero /// \param schema The table schema (column types) /// \param columns The table's columns /// \param num_rows number of rows in table, -1 (default) to infer from columns - Table(const std::shared_ptr& schema, - const std::vector>& columns, int64_t num_rows = -1); + static std::shared_ptr
Make(const std::shared_ptr& schema, + const std::vector>& columns, + int64_t num_rows = -1); /// \brief Construct Table from schema and arrays /// \param schema The table schema (column types) /// \param arrays The table's columns as arrays /// \param num_rows number of rows in table, -1 (default) to infer from columns - Table(const std::shared_ptr& schema, - const std::vector>& arrays, int64_t num_rows = -1); + static std::shared_ptr
Make(const std::shared_ptr& schema, + const std::vector>& arrays, + int64_t num_rows = -1); // Construct table from RecordBatch, but only if all of the batch schemas are // equal. Returns Status::Invalid if there is some problem @@ -242,25 +146,28 @@ class ARROW_EXPORT Table { /// \param[in] i column index, does not boundscheck /// \return the i-th column - std::shared_ptr column(int i) const { return columns_[i]; } + virtual std::shared_ptr column(int i) const = 0; /// \brief Remove column from the table, producing a new Table - Status RemoveColumn(int i, std::shared_ptr
* out) const; + virtual Status RemoveColumn(int i, std::shared_ptr
* out) const = 0; /// \brief Add column to the table, producing a new Table - Status AddColumn(int i, const std::shared_ptr& column, - std::shared_ptr
* out) const; + virtual Status AddColumn(int i, const std::shared_ptr& column, + std::shared_ptr
* out) const = 0; /// \brief Replace schema key-value metadata with new metadata (EXPERIMENTAL) /// \since 0.5.0 /// /// \param[in] metadata new KeyValueMetadata /// \return new Table - std::shared_ptr
ReplaceSchemaMetadata( - const std::shared_ptr& metadata) const; + virtual std::shared_ptr
ReplaceSchemaMetadata( + const std::shared_ptr& metadata) const = 0; + + /// \brief Perform any checks to validate the input arguments + virtual Status Validate() const = 0; /// \return the number of columns in the table - int num_columns() const { return static_cast(columns_.size()); } + int num_columns() const { return schema_->num_fields(); } /// \return the number of rows (the corresponding length of each column) int64_t num_rows() const { return num_rows_; } @@ -268,35 +175,14 @@ class ARROW_EXPORT Table { /// \brief Determine if semantic contents of tables are exactly equal bool Equals(const Table& other) const; - /// \brief Perform any checks to validate the input arguments - Status ValidateColumns() const; - - /// \brief Return true if any column has multiple chunks - bool IsChunked() const; - - private: - ARROW_DISALLOW_COPY_AND_ASSIGN(Table); + protected: + Table(); std::shared_ptr schema_; - std::vector> columns_; - int64_t num_rows_; -}; - -/// \brief Abstract interface for reading stream of record batches -class ARROW_EXPORT RecordBatchReader { - public: - virtual ~RecordBatchReader(); - /// \return the shared schema of the record batches in the stream - virtual std::shared_ptr schema() const = 0; - - /// Read the next record batch in the stream. Return null for batch when - /// reaching end of stream - /// - /// \param[out] batch the next loaded batch, null at end of stream - /// \return Status - virtual Status ReadNext(std::shared_ptr* batch) = 0; + private: + ARROW_DISALLOW_COPY_AND_ASSIGN(Table); }; /// \brief Compute a sequence of record batches from a (possibly chunked) Table @@ -322,13 +208,18 @@ ARROW_EXPORT Status ConcatenateTables(const std::vector>& tables, std::shared_ptr
* table); +#ifndef ARROW_NO_DEPRECATED_API + /// \brief Construct table from multiple input tables. /// \return Status, fails if any schemas are different +/// \note Deprecated since 0.8.0 ARROW_EXPORT Status MakeTable(const std::shared_ptr& schema, const std::vector>& arrays, std::shared_ptr
* table); +#endif + } // namespace arrow #endif // ARROW_TABLE_H diff --git a/cpp/src/arrow/table_builder-test.cc b/cpp/src/arrow/table_builder-test.cc index 07d9b6b2d65..8167577e906 100644 --- a/cpp/src/arrow/table_builder-test.cc +++ b/cpp/src/arrow/table_builder-test.cc @@ -22,6 +22,7 @@ #include "gtest/gtest.h" #include "arrow/array.h" +#include "arrow/record_batch.h" #include "arrow/status.h" #include "arrow/table.h" #include "arrow/table_builder.h" @@ -98,7 +99,7 @@ TEST_F(TestRecordBatchBuilder, Basics) { ASSERT_OK(ex_b1.Finish(&a1)); ASSERT_OK(ex_b2.Finish(&a2)); - RecordBatch expected(schema, 4, {a0, a1, a2}); + auto expected = RecordBatch::Make(schema, 4, {a0, a1, a2}); // Builder attributes ASSERT_EQ(3, builder->num_fields()); @@ -119,7 +120,7 @@ TEST_F(TestRecordBatchBuilder, Basics) { ASSERT_OK(builder->Flush(&batch)); } - ASSERT_BATCHES_EQUAL(expected, *batch); + ASSERT_BATCHES_EQUAL(*expected, *batch); } // Test setting initial capacity diff --git a/cpp/src/arrow/table_builder.cc b/cpp/src/arrow/table_builder.cc index a1bd95940a6..379d886deac 100644 --- a/cpp/src/arrow/table_builder.cc +++ b/cpp/src/arrow/table_builder.cc @@ -24,6 +24,7 @@ #include "arrow/array.h" #include "arrow/builder.h" +#include "arrow/record_batch.h" #include "arrow/status.h" #include "arrow/table.h" #include "arrow/type.h" @@ -64,7 +65,7 @@ Status RecordBatchBuilder::Flush(bool reset_builders, } length = fields[i]->length(); } - *batch = std::make_shared(schema_, length, std::move(fields)); + *batch = RecordBatch::Make(schema_, length, std::move(fields)); if (reset_builders) { return InitBuilders(); } else { diff --git a/cpp/src/arrow/test-common.h b/cpp/src/arrow/test-common.h index a4c4fddff73..911adf7b605 100644 --- a/cpp/src/arrow/test-common.h +++ b/cpp/src/arrow/test-common.h @@ -30,7 +30,6 @@ #include "arrow/buffer.h" #include "arrow/builder.h" #include "arrow/memory_pool.h" -#include "arrow/table.h" #include "arrow/test-util.h" namespace arrow { diff --git a/cpp/src/arrow/test-util.h b/cpp/src/arrow/test-util.h index 77f489ab177..1a34808488a 100644 --- a/cpp/src/arrow/test-util.h +++ b/cpp/src/arrow/test-util.h @@ -35,7 +35,6 @@ #include "arrow/memory_pool.h" #include "arrow/pretty_print.h" #include "arrow/status.h" -#include "arrow/table.h" #include "arrow/type.h" #include "arrow/type_traits.h" #include "arrow/util/bit-util.h" @@ -375,7 +374,7 @@ void AssertArraysEqual(const Array& expected, const Array& actual) { #define ASSERT_BATCHES_EQUAL(LEFT, RIGHT) \ do { \ - if (!LEFT.ApproxEquals(RIGHT)) { \ + if (!(LEFT).ApproxEquals(RIGHT)) { \ std::stringstream ss; \ ss << "Left:\n"; \ ASSERT_OK(PrettyPrint(LEFT, 0, &ss)); \ diff --git a/cpp/src/arrow/type.h b/cpp/src/arrow/type.h index 70f275c0fa4..8dcc1592da0 100644 --- a/cpp/src/arrow/type.h +++ b/cpp/src/arrow/type.h @@ -498,9 +498,9 @@ class ARROW_EXPORT StructType : public NestedType { std::vector GetBufferLayout() const override; }; -class ARROW_EXPORT DecimalBaseType : public FixedSizeBinaryType { +class ARROW_EXPORT DecimalType : public FixedSizeBinaryType { public: - explicit DecimalBaseType(int32_t byte_width, int32_t precision, int32_t scale) + explicit DecimalType(int32_t byte_width, int32_t precision, int32_t scale) : FixedSizeBinaryType(byte_width, Type::DECIMAL), precision_(precision), scale_(scale) {} @@ -513,21 +513,18 @@ class ARROW_EXPORT DecimalBaseType : public FixedSizeBinaryType { int32_t scale_; }; -class ARROW_EXPORT Decimal128Type : public DecimalBaseType { +class ARROW_EXPORT Decimal128Type : public DecimalType { public: static constexpr Type::type type_id = Type::DECIMAL; explicit Decimal128Type(int32_t precision, int32_t scale) - : DecimalBaseType(16, precision, scale) {} + : DecimalType(16, precision, scale) {} Status Accept(TypeVisitor* visitor) const override; std::string ToString() const override; std::string name() const override { return "decimal"; } }; -// TODO(wesm): Remove this -using DecimalType = Decimal128Type; - struct UnionMode { enum type { SPARSE, DENSE }; }; diff --git a/cpp/thirdparty/jemalloc/17c897976c60b0e6e4f4a365c751027244dada7a.tar.gz b/cpp/thirdparty/jemalloc/17c897976c60b0e6e4f4a365c751027244dada7a.tar.gz new file mode 100644 index 00000000000..29d9266a12d Binary files /dev/null and b/cpp/thirdparty/jemalloc/17c897976c60b0e6e4f4a365c751027244dada7a.tar.gz differ diff --git a/cpp/thirdparty/jemalloc/README.md b/cpp/thirdparty/jemalloc/README.md new file mode 100644 index 00000000000..272ff9c730b --- /dev/null +++ b/cpp/thirdparty/jemalloc/README.md @@ -0,0 +1,22 @@ + + +This directory contains a vendored commit from the jemalloc stable-4 branch. +You can bump the version by downloading +https://github.com/jemalloc/jemalloc/archive/{{ commit }}.tar.gz diff --git a/js/gulp/closure-task.js b/js/gulp/closure-task.js index a1f0a9a6988..afe5e38b462 100644 --- a/js/gulp/closure-task.js +++ b/js/gulp/closure-task.js @@ -62,8 +62,10 @@ const createClosureArgs = (entry, externs) => ({ externs: `${externs}.js`, entry_point: `${entry}.js`, module_resolution: `NODE`, - // formatting: `PRETTY_PRINT`, + // formatting: `PRETTY_PRINT`, debug: true, compilation_level: `ADVANCED`, + // uncomment when google-closure-compiler releases a new version + // allow_method_call_decomposing: true, package_json_entry_names: `module,jsnext:main,main`, assume_function_wrapper: true, js_output_file: `${mainExport}.js`, diff --git a/js/gulp/package-task.js b/js/gulp/package-task.js index 7b4b15a33e6..ad56d172eae 100644 --- a/js/gulp/package-task.js +++ b/js/gulp/package-task.js @@ -49,6 +49,12 @@ const createMainPackageJson = (target, format) => (orig) => ({ browser: `${mainExport}.es5.min.js`, [`browser:es2015`]: `${mainExport}.es2015.min.js`, [`@std/esm`]: { esm: `mjs` }, + // Temporary workaround until https://github.com/Microsoft/tslib/pull/44 is merged + scripts: { + postinstall: `npm i shx && npm run tslib_mjs && npm run tslib_pkg && npm r shx`, + tslib_mjs: `shx cp $(node -e \"console.log(require.resolve('tslib/tslib.es6.js'))\") $(node -e \"var r=require,p=r('path');console.log(p.join(p.dirname(r.resolve('tslib')),'tslib.mjs'))\")`, + tslib_pkg: `node -e \"var r=require,p=r('path'),f=r('fs'),k=p.join(p.dirname(r.resolve('tslib')),'package.json'),x=JSON.parse(f.readFileSync(k));x.main='tslib';f.writeFileSync(k,JSON.stringify(x))\"` + } }); const createTypeScriptPackageJson = (target, format) => (orig) => ({ diff --git a/js/gulp/typescript-task.js b/js/gulp/typescript-task.js index 2fd9f1350a6..8b755cf7f16 100644 --- a/js/gulp/typescript-task.js +++ b/js/gulp/typescript-task.js @@ -52,10 +52,10 @@ function maybeCopyRawJSArrowFormatFiles(target, format) { return Observable.empty(); } return Observable.defer(async () => { - const outFormatDir = path.join(targetDir(target, format), `format`); + const outFormatDir = path.join(targetDir(target, format), `format`, `fb`); await del(path.join(outFormatDir, '*.js')); await observableFromStreams( - gulp.src(path.join(`src`, `format`, `*_generated.js`)), + gulp.src(path.join(`src`, `format`, `fb`, `*_generated.js`)), gulpRename((p) => { p.basename = p.basename.replace(`_generated`, ``); }), gulp.dest(outFormatDir) ).toPromise(); diff --git a/js/package.json b/js/package.json index 44866d9d4db..1914eb48b88 100644 --- a/js/package.json +++ b/js/package.json @@ -25,7 +25,8 @@ "lint": "npm-run-all -p lint:*", "lint:src": "tslint --fix --project -p tsconfig.json -c tslint.json \"src/**/*.ts\"", "lint:test": "tslint --fix --project -p test/tsconfig.json -c tslint.json \"test/**/*.ts\"", - "prepublishOnly": "echo \"Error: do 'npm run release' instead of 'npm publish'\" && exit 1" + "prepublishOnly": "echo \"Error: do 'npm run release' instead of 'npm publish'\" && exit 1", + "postinstall": "shx cp node_modules/tslib/tslib.es6.js node_modules/tslib/tslib.mjs" }, "repository": { "type": "git", @@ -54,7 +55,8 @@ }, "dependencies": { "flatbuffers": "trxcllnt/flatbuffers-esm", - "text-encoding": "0.6.4" + "text-encoding-utf-8": "^1.0.2", + "tslib": "^1.8.0" }, "devDependencies": { "@std/esm": "0.13.0", @@ -91,10 +93,8 @@ "rxjs": "5.5.2", "shx": "0.2.2", "source-map-loader": "0.2.3", - "text-encoding-utf-8": "1.0.1", "trash": "4.1.0", "ts-jest": "21.2.1", - "tslib": "1.8.0", "tslint": "5.8.0", "typescript": "2.6.1", "uglifyjs-webpack-plugin": "1.0.1", diff --git a/js/src/Arrow.ts b/js/src/Arrow.ts index 8e31ec1726f..4d2fec7fae9 100644 --- a/js/src/Arrow.ts +++ b/js/src/Arrow.ts @@ -45,6 +45,13 @@ import { TimestampVector, } from './vector/numeric'; +// closure compiler always erases static method names: +// https://github.com/google/closure-compiler/issues/1776 +// set them via string indexers to save them from the mangler +Table['from'] = Table.from; +Table['fromAsync'] = Table.fromAsync; +BoolVector['pack'] = BoolVector.pack; + export { Table, Vector, StructRow }; export { readVectors, readVectorsAsync }; export { readJSON }; diff --git a/js/src/format/arrow.ts b/js/src/format/arrow.ts index 7bd5a67a15d..b7726b4ba84 100644 --- a/js/src/format/arrow.ts +++ b/js/src/format/arrow.ts @@ -1,14 +1,11 @@ import { flatbuffers } from 'flatbuffers'; -import * as Schema_ from './Schema'; -import * as Message_ from './Message'; -import * as File_ from './Message'; +import * as Schema_ from './fb/Schema'; +import * as Message_ from './fb/Message'; -export namespace fb { - export import Schema = Schema_.org.apache.arrow.flatbuf; - export import Message = Message_.org.apache.arrow.flatbuf; - export import File = File_.org.apache.arrow.flatbuf; -} +import Type = Schema_.org.apache.arrow.flatbuf.Type; +import Field = Schema_.org.apache.arrow.flatbuf.Field; +import FieldNode = Message_.org.apache.arrow.flatbuf.FieldNode; export class Metadatum { constructor(private key_: string, private value_: any) { @@ -24,7 +21,7 @@ export class Metadatum { } export class FieldBuilder { - constructor(private name_: string, private typeType_: fb.Schema.Type, private nullable_: boolean, private metadata_: Metadatum[]) {} + constructor(private name_: string, private typeType_: Type, private nullable_: boolean, private metadata_: Metadatum[]) {} name(): string { return this.name_; } @@ -41,9 +38,9 @@ export class FieldBuilder { return this.metadata_[i]; } write(builder: flatbuffers.Builder): flatbuffers.Offset { - fb.Schema.Field.startField(builder); + Field.startField(builder); // TODO.. - return fb.Schema.Field.endField(builder); + return Field.endField(builder); } } @@ -56,6 +53,6 @@ export class FieldNodeBuilder { return this.nullCount_; } write(builder: flatbuffers.Builder): flatbuffers.Offset { - return fb.Message.FieldNode.createFieldNode(builder, this.length(), this.nullCount()); + return FieldNode.createFieldNode(builder, this.length(), this.nullCount()); } } diff --git a/js/src/format/File.ts b/js/src/format/fb/File.ts similarity index 100% rename from js/src/format/File.ts rename to js/src/format/fb/File.ts diff --git a/js/src/format/File_generated.js b/js/src/format/fb/File_generated.js similarity index 100% rename from js/src/format/File_generated.js rename to js/src/format/fb/File_generated.js diff --git a/js/src/format/Message.ts b/js/src/format/fb/Message.ts similarity index 100% rename from js/src/format/Message.ts rename to js/src/format/fb/Message.ts diff --git a/js/src/format/Message_generated.js b/js/src/format/fb/Message_generated.js similarity index 100% rename from js/src/format/Message_generated.js rename to js/src/format/fb/Message_generated.js diff --git a/js/src/format/Schema.ts b/js/src/format/fb/Schema.ts similarity index 100% rename from js/src/format/Schema.ts rename to js/src/format/fb/Schema.ts diff --git a/js/src/format/Schema_generated.js b/js/src/format/fb/Schema_generated.js similarity index 100% rename from js/src/format/Schema_generated.js rename to js/src/format/fb/Schema_generated.js diff --git a/js/src/reader/arrow.ts b/js/src/reader/arrow.ts index 5a02a425dbf..ee735434ad2 100644 --- a/js/src/reader/arrow.ts +++ b/js/src/reader/arrow.ts @@ -18,20 +18,20 @@ import { Vector } from '../vector/vector'; import { flatbuffers } from 'flatbuffers'; import { readVector, readValueVector } from './vector'; +import { TypedArray, TypedArrayConstructor } from '../vector/types'; import { readFileFooter, readFileMessages, readStreamSchema, readStreamMessages } from './format'; -import * as File_ from '../format/File'; -import * as Schema_ from '../format/Schema'; -import * as Message_ from '../format/Message'; +import * as File_ from '../format/fb/File'; +import * as Schema_ from '../format/fb/Schema'; +import * as Message_ from '../format/fb/Message'; import ByteBuffer = flatbuffers.ByteBuffer; import Footer = File_.org.apache.arrow.flatbuf.Footer; import Field = Schema_.org.apache.arrow.flatbuf.Field; import Schema = Schema_.org.apache.arrow.flatbuf.Schema; -import Buffer = Schema_.org.apache.arrow.flatbuf.Buffer; import Message = Message_.org.apache.arrow.flatbuf.Message; import ArrowBuffer = Schema_.org.apache.arrow.flatbuf.Buffer; import FieldNode = Message_.org.apache.arrow.flatbuf.FieldNode; @@ -50,12 +50,12 @@ export type ArrowReaderContext = { }; export interface VectorReaderContext { - offset: number; - bytes: Uint8Array; batch: RecordBatch; dictionaries: Map; readNextNode(): FieldNode; readNextBuffer(): ArrowBuffer; + createValidityArray(field: Field, fieldNode: FieldNode, buffer: ArrowBuffer): Uint8Array | null; + createTypedArray(field: Field, fieldNode: FieldNode, buffer: ArrowBuffer, ArrayConstructor: TypedArrayConstructor): T; } export function* readVectors(buffers: Iterable, context?: ArrowReaderContext) { @@ -166,24 +166,27 @@ function toByteBuffer(bytes?: Uint8Array | Buffer | string) { } class BufferReaderContext implements VectorReaderContext { - public offset: number; public batch: RecordBatch; + public dictionaries: Map; + constructor(bytes: Uint8Array, dictionaries: Map) { + this.bytes = bytes; + this.dictionaries = dictionaries; + } + private offset: number; + private bytes: Uint8Array; private nodeIndex: number; private bufferIndex: number; private metadataVersion: MetadataVersion; - constructor(public bytes: Uint8Array, - public dictionaries: Map) { - } set message(m: Message) { this.nodeIndex = 0; this.bufferIndex = 0; this.offset = m.bb.position(); this.metadataVersion = m.version(); } - public readNextNode() { + readNextNode() { return this.batch.nodes(this.nodeIndex++)!; } - public readNextBuffer() { + readNextBuffer() { const buffer = this.batch.buffers(this.bufferIndex++)!; // If this Arrow buffer was written before version 4, // advance the buffer's bb_pos 8 bytes to skip past @@ -193,4 +196,15 @@ class BufferReaderContext implements VectorReaderContext { } return buffer; } + createValidityArray(field: Field, fieldNode: FieldNode, buffer: ArrowBuffer): Uint8Array | null { + return field.nullable() && fieldNode.nullCount().low > 0 && this.createTypedArray(field, fieldNode, buffer, Uint8Array) || null; + } + createTypedArray(_field: Field, _fieldNode: FieldNode, buffer: ArrowBuffer, ArrayConstructor: TypedArrayConstructor): T { + const { bytes, offset } = this; + return new ArrayConstructor( + bytes.buffer, + bytes.byteOffset + offset + buffer.offset().low, + buffer.length().low / ArrayConstructor.BYTES_PER_ELEMENT + ); + } } diff --git a/js/src/reader/format.ts b/js/src/reader/format.ts index fd8f1b40d91..dc6d4b291ad 100644 --- a/js/src/reader/format.ts +++ b/js/src/reader/format.ts @@ -16,9 +16,9 @@ // under the License. import { flatbuffers } from 'flatbuffers'; -import * as File_ from '../format/File'; -import * as Schema_ from '../format/Schema'; -import * as Message_ from '../format/Message'; +import * as File_ from '../format/fb/File'; +import * as Schema_ from '../format/fb/Schema'; +import * as Message_ from '../format/fb/Message'; import ByteBuffer = flatbuffers.ByteBuffer; import Footer = File_.org.apache.arrow.flatbuf.Footer; import Schema = Schema_.org.apache.arrow.flatbuf.Schema; diff --git a/js/src/reader/json.ts b/js/src/reader/json.ts index ab3b9e855a2..2813a5d8f4f 100644 --- a/js/src/reader/json.ts +++ b/js/src/reader/json.ts @@ -23,13 +23,14 @@ import { BinaryVector, BoolVector, Utf8Vector, Int8Vector, Uint16Vector, Uint32Vector, Uint64Vector, Float32Vector, Float64Vector, ListVector, StructVector } from '../vector/arrow'; -import { fb, FieldBuilder, FieldNodeBuilder } from '../format/arrow'; +import * as Schema_ from '../format/fb/Schema'; +import Type = Schema_.org.apache.arrow.flatbuf.Type; +import { FieldBuilder, FieldNodeBuilder } from '../format/arrow'; import { TextEncoder } from 'text-encoding-utf-8'; const encoder = new TextEncoder('utf-8'); -export function* readJSON(jsonString: string): IterableIterator[]> { - let obj: any = JSON.parse(jsonString); +export function* readJSON(obj: any): IterableIterator[]> { let schema: any = {}; for (const field of obj.schema.fields) { schema[field.name] = field; @@ -203,24 +204,24 @@ function readBoolean(arr: Array) { return rtrn; } -const TYPE_LOOKUP: {[index: string]: fb.Schema.Type} = { - 'NONE': fb.Schema.Type.NONE, - 'null': fb.Schema.Type.Null, - 'map': fb.Schema.Type.Map, - 'int': fb.Schema.Type.Int, - 'bool': fb.Schema.Type.Bool, - 'date': fb.Schema.Type.Date, - 'list': fb.Schema.Type.List, - 'utf8': fb.Schema.Type.Utf8, - 'time': fb.Schema.Type.Time, - 'union': fb.Schema.Type.Union, - 'binary': fb.Schema.Type.Binary, - 'decimal': fb.Schema.Type.Decimal, - 'struct_': fb.Schema.Type.Struct_, - 'floatingpoint': fb.Schema.Type.FloatingPoint, - 'timestamp': fb.Schema.Type.Timestamp, - 'fixedsizelist': fb.Schema.Type.FixedSizeList, - 'fixedsizebinary': fb.Schema.Type.FixedSizeBinary +const TYPE_LOOKUP: {[index: string]: Type} = { + 'NONE': Type.NONE, + 'null': Type.Null, + 'map': Type.Map, + 'int': Type.Int, + 'bool': Type.Bool, + 'date': Type.Date, + 'list': Type.List, + 'utf8': Type.Utf8, + 'time': Type.Time, + 'union': Type.Union, + 'binary': Type.Binary, + 'decimal': Type.Decimal, + 'struct_': Type.Struct_, + 'floatingpoint': Type.FloatingPoint, + 'timestamp': Type.Timestamp, + 'fixedsizelist': Type.FixedSizeList, + 'fixedsizebinary': Type.FixedSizeBinary }; function fieldFromJSON(obj: any): FieldBuilder { diff --git a/js/src/reader/vector.ts b/js/src/reader/vector.ts index 0f95b769e14..1ff49009c49 100644 --- a/js/src/reader/vector.ts +++ b/js/src/reader/vector.ts @@ -16,8 +16,8 @@ // under the License. import { VectorReaderContext } from './arrow'; -import * as Schema_ from '../format/Schema'; -import * as Message_ from '../format/Message'; +import * as Schema_ from '../format/fb/Schema'; +import * as Message_ from '../format/fb/Message'; import { TypedArray, TypedArrayConstructor } from '../vector/types'; import { Vector, BoolVector, BinaryVector, DictionaryVector, @@ -33,7 +33,6 @@ import Date = Schema_.org.apache.arrow.flatbuf.Date; import Time = Schema_.org.apache.arrow.flatbuf.Time; import Type = Schema_.org.apache.arrow.flatbuf.Type; import Field = Schema_.org.apache.arrow.flatbuf.Field; -import Buffer = Schema_.org.apache.arrow.flatbuf.Buffer; import Decimal = Schema_.org.apache.arrow.flatbuf.Decimal; import DateUnit = Schema_.org.apache.arrow.flatbuf.DateUnit; import TimeUnit = Schema_.org.apache.arrow.flatbuf.TimeUnit; @@ -231,41 +230,29 @@ export function readIntVector(field: Field, state: VectorReaderContext) { function readListBuffers(field: Field, state: VectorReaderContext) { const fieldNode = state.readNextNode(); const validity = readValidityBuffer(field, fieldNode, state); - const offsets = readDataBuffer(Int32Array, state); + const offsets = readDataBuffer(field, fieldNode, state, Int32Array); return { field, fieldNode, validity, offsets }; } function readBinaryBuffers(field: Field, state: VectorReaderContext) { const fieldNode = state.readNextNode(); const validity = readValidityBuffer(field, fieldNode, state); - const offsets = readDataBuffer(Int32Array, state); - const data = readDataBuffer(Uint8Array, state); + const offsets = readDataBuffer(field, fieldNode, state, Int32Array); + const data = readDataBuffer(field, fieldNode, state, Uint8Array); return { field, fieldNode, validity, offsets, data }; } function readNumericBuffers(field: Field, state: VectorReaderContext, ArrayConstructor: TypedArrayConstructor) { const fieldNode = state.readNextNode(); const validity = readValidityBuffer(field, fieldNode, state); - const data = readDataBuffer(ArrayConstructor, state); + const data = readDataBuffer(field, fieldNode, state, ArrayConstructor); return { field, fieldNode, validity, data }; } -function readDataBuffer(ArrayConstructor: TypedArrayConstructor, state: VectorReaderContext) { - return createTypedArray(ArrayConstructor, state.bytes, state.offset, state.readNextBuffer()); +function readDataBuffer(field: Field, fieldNode: FieldNode, state: VectorReaderContext, ArrayConstructor: TypedArrayConstructor) { + return state.createTypedArray(field, fieldNode, state.readNextBuffer(), ArrayConstructor); } function readValidityBuffer(field: Field, fieldNode: FieldNode, state: VectorReaderContext) { - return createValidityArray(field, fieldNode, state.bytes, state.offset, state.readNextBuffer()); -} - -function createValidityArray(field: Field, fieldNode: FieldNode, bytes: Uint8Array, offset: number, buffer: Buffer) { - return field.nullable() && fieldNode.nullCount().low > 0 && createTypedArray(Uint8Array, bytes, offset, buffer) || null; -} - -function createTypedArray(ArrayConstructor: TypedArrayConstructor, bytes: Uint8Array, offset: number, buffer: Buffer) { - return new ArrayConstructor( - bytes.buffer, - bytes.byteOffset + offset + buffer.offset().low, - buffer.length().low / ArrayConstructor.BYTES_PER_ELEMENT - ); + return state.createValidityArray(field, fieldNode, state.readNextBuffer()); } diff --git a/js/src/vector/arrow.ts b/js/src/vector/arrow.ts index 3ebc4ac8706..db918c8e2cc 100644 --- a/js/src/vector/arrow.ts +++ b/js/src/vector/arrow.ts @@ -14,48 +14,23 @@ // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. - import { Vector } from './vector'; -import { Utf8Vector as Utf8VectorBase } from './utf8'; -import { StructVector as StructVectorBase } from './struct'; -import { DictionaryVector as DictionaryVectorBase } from './dictionary'; -import { - ListVector as ListVectorBase, - BinaryVector as BinaryVectorBase, - FixedSizeListVector as FixedSizeListVectorBase -} from './list'; - -import { - BoolVector as BoolVectorBase, - Int8Vector as Int8VectorBase, - Int16Vector as Int16VectorBase, - Int32Vector as Int32VectorBase, - Int64Vector as Int64VectorBase, - Uint8Vector as Uint8VectorBase, - Uint16Vector as Uint16VectorBase, - Uint32Vector as Uint32VectorBase, - Uint64Vector as Uint64VectorBase, - Float16Vector as Float16VectorBase, - Float32Vector as Float32VectorBase, - Float64Vector as Float64VectorBase, - Date32Vector as Date32VectorBase, - Date64Vector as Date64VectorBase, - Time32Vector as Time32VectorBase, - Time64Vector as Time64VectorBase, - DecimalVector as DecimalVectorBase, - TimestampVector as TimestampVectorBase, -} from './numeric'; +import * as vectors from './traits/vectors'; +import * as fieldVectors from './traits/field'; +import * as nullableVectors from './traits/nullable'; +import * as nullableFieldVectors from './traits/nullablefield'; +import { isFieldArgv, isNullableArgv } from './traits/mixins'; -import { nullableMixin, fieldMixin, Field, FieldNode, isFieldArgv, isNullableArgv} from './traits'; +import * as Schema_ from '../format/fb/Schema'; +import * as Message_ from '../format/fb/Message'; +export import Field = Schema_.org.apache.arrow.flatbuf.Field; +export import FieldNode = Message_.org.apache.arrow.flatbuf.FieldNode; -export { Field, FieldNode }; -import { fb, FieldBuilder, FieldNodeBuilder } from '../format/arrow'; -export { fb, FieldBuilder, FieldNodeBuilder }; function MixinArrowTraits, TArgv>( Base: new (argv: TArgv) => T, Field: new (argv: TArgv & { field: Field, fieldNode: FieldNode }) => T, Nullable: new (argv: TArgv & { validity: Uint8Array }) => T, - NullableField: new (argv: TArgv & { validity: Uint8Array, field: Field, fieldNode: FieldNode }) => T, + NullableField: new (argv: TArgv & { validity: Uint8Array, field: Field, fieldNode: FieldNode }) => T ) { return function(argv: TArgv | (TArgv & { validity: Uint8Array }) | (TArgv & { field: Field, fieldNode: FieldNode })) { return new (!isFieldArgv(argv) @@ -65,171 +40,52 @@ function MixinArrowTraits, TArgv>( } as any as { new (argv: TArgv | (TArgv & { validity: Uint8Array }) | (TArgv & { field: Field, fieldNode: FieldNode })): T }; } -export { Vector }; -export class ListVector extends MixinArrowTraits( - ListVectorBase, - class ListVector extends fieldMixin(ListVectorBase) {} as any, - class ListVector extends nullableMixin(ListVectorBase) {} as any, - class ListVector extends nullableMixin(fieldMixin(ListVectorBase)) {} as any -) {} - -export class BinaryVector extends MixinArrowTraits( - BinaryVectorBase, - class BinaryVector extends fieldMixin(BinaryVectorBase) {} as any, - class BinaryVector extends nullableMixin(BinaryVectorBase) {} as any, - class BinaryVector extends nullableMixin(fieldMixin(BinaryVectorBase)) {} as any -) {} - -export class Utf8Vector extends MixinArrowTraits( - Utf8VectorBase, - class Utf8Vector extends fieldMixin(Utf8VectorBase) {} as any, - class Utf8Vector extends nullableMixin(Utf8VectorBase) {} as any, - class Utf8Vector extends nullableMixin(fieldMixin(Utf8VectorBase)) {} as any -) {} - -export class BoolVector extends MixinArrowTraits( - BoolVectorBase, - class BoolVector extends fieldMixin(BoolVectorBase) {} as any, - class BoolVector extends nullableMixin(BoolVectorBase) {} as any, - class BoolVector extends nullableMixin(fieldMixin(BoolVectorBase)) {} as any -) {} - -export class Int8Vector extends MixinArrowTraits( - Int8VectorBase, - class Int8Vector extends fieldMixin(Int8VectorBase) {} as any, - class Int8Vector extends nullableMixin(Int8VectorBase) {} as any, - class Int8Vector extends nullableMixin(fieldMixin(Int8VectorBase)) {} as any -) {} - -export class Int16Vector extends MixinArrowTraits( - Int16VectorBase, - class Int16Vector extends fieldMixin(Int16VectorBase) {} as any, - class Int16Vector extends nullableMixin(Int16VectorBase) {} as any, - class Int16Vector extends nullableMixin(fieldMixin(Int16VectorBase)) {} as any -) {} - -export class Int32Vector extends MixinArrowTraits( - Int32VectorBase, - class Int32Vector extends fieldMixin(Int32VectorBase) {} as any, - class Int32Vector extends nullableMixin(Int32VectorBase) {} as any, - class Int32Vector extends nullableMixin(fieldMixin(Int32VectorBase)) {} as any -) {} - -export class Int64Vector extends MixinArrowTraits( - Int64VectorBase, - class Int64Vector extends fieldMixin(Int64VectorBase) {} as any, - class Int64Vector extends nullableMixin(Int64VectorBase) {} as any, - class Int64Vector extends nullableMixin(fieldMixin(Int64VectorBase)) {} as any -) {} - -export class Uint8Vector extends MixinArrowTraits( - Uint8VectorBase, - class Uint8Vector extends fieldMixin(Uint8VectorBase) {} as any, - class Uint8Vector extends nullableMixin(Uint8VectorBase) {} as any, - class Uint8Vector extends nullableMixin(fieldMixin(Uint8VectorBase)) {} as any -) {} - -export class Uint16Vector extends MixinArrowTraits( - Uint16VectorBase, - class Uint16Vector extends fieldMixin(Uint16VectorBase) {} as any, - class Uint16Vector extends nullableMixin(Uint16VectorBase) {} as any, - class Uint16Vector extends nullableMixin(fieldMixin(Uint16VectorBase)) {} as any -) {} - -export class Uint32Vector extends MixinArrowTraits( - Uint32VectorBase, - class Uint32Vector extends fieldMixin(Uint32VectorBase) {} as any, - class Uint32Vector extends nullableMixin(Uint32VectorBase) {} as any, - class Uint32Vector extends nullableMixin(fieldMixin(Uint32VectorBase)) {} as any -) {} - -export class Uint64Vector extends MixinArrowTraits( - Uint64VectorBase, - class Uint64Vector extends fieldMixin(Uint64VectorBase) {} as any, - class Uint64Vector extends nullableMixin(Uint64VectorBase) {} as any, - class Uint64Vector extends nullableMixin(fieldMixin(Uint64VectorBase)) {} as any -) {} - -export class Date32Vector extends MixinArrowTraits( - Date32VectorBase, - class Date32Vector extends fieldMixin(Date32VectorBase) {} as any, - class Date32Vector extends nullableMixin(Date32VectorBase) {} as any, - class Date32Vector extends nullableMixin(fieldMixin(Date32VectorBase)) {} as any -) {} - -export class Date64Vector extends MixinArrowTraits( - Date64VectorBase, - class Date64Vector extends fieldMixin(Date64VectorBase) {} as any, - class Date64Vector extends nullableMixin(Date64VectorBase) {} as any, - class Date64Vector extends nullableMixin(fieldMixin(Date64VectorBase)) {} as any -) {} - -export class Time32Vector extends MixinArrowTraits( - Time32VectorBase, - class Time32Vector extends fieldMixin(Time32VectorBase) {} as any, - class Time32Vector extends nullableMixin(Time32VectorBase) {} as any, - class Time32Vector extends nullableMixin(fieldMixin(Time32VectorBase)) {} as any -) {} - -export class Time64Vector extends MixinArrowTraits( - Time64VectorBase, - class Time64Vector extends fieldMixin(Time64VectorBase) {} as any, - class Time64Vector extends nullableMixin(Time64VectorBase) {} as any, - class Time64Vector extends nullableMixin(fieldMixin(Time64VectorBase)) {} as any -) {} - -export class Float16Vector extends MixinArrowTraits( - Float16VectorBase, - class Float16Vector extends fieldMixin(Float16VectorBase) {} as any, - class Float16Vector extends nullableMixin(Float16VectorBase) {} as any, - class Float16Vector extends nullableMixin(fieldMixin(Float16VectorBase)) {} as any -) {} - -export class Float32Vector extends MixinArrowTraits( - Float32VectorBase, - class Float32Vector extends fieldMixin(Float32VectorBase) {} as any, - class Float32Vector extends nullableMixin(Float32VectorBase) {} as any, - class Float32Vector extends nullableMixin(fieldMixin(Float32VectorBase)) {} as any -) {} - -export class Float64Vector extends MixinArrowTraits( - Float64VectorBase, - class Float64Vector extends fieldMixin(Float64VectorBase) {} as any, - class Float64Vector extends nullableMixin(Float64VectorBase) {} as any, - class Float64Vector extends nullableMixin(fieldMixin(Float64VectorBase)) {} as any -) {} - -export class StructVector extends MixinArrowTraits( - StructVectorBase, - class StructVector extends fieldMixin(StructVectorBase) {} as any, - class StructVector extends nullableMixin(StructVectorBase) {} as any, - class StructVector extends nullableMixin(fieldMixin(StructVectorBase)) {} as any -) {} - -export class DecimalVector extends MixinArrowTraits( - DecimalVectorBase, - class DecimalVector extends fieldMixin(DecimalVectorBase) {} as any, - class DecimalVector extends nullableMixin(DecimalVectorBase) {} as any, - class DecimalVector extends nullableMixin(fieldMixin(DecimalVectorBase)) {} as any -) {} - -export class TimestampVector extends MixinArrowTraits( - TimestampVectorBase, - class TimestampVector extends fieldMixin(TimestampVectorBase) {} as any, - class TimestampVector extends nullableMixin(TimestampVectorBase) {} as any, - class TimestampVector extends nullableMixin(fieldMixin(TimestampVectorBase)) {} as any -) {} - -export class DictionaryVector extends MixinArrowTraits( - DictionaryVectorBase, - class DictionaryVector extends fieldMixin(DictionaryVectorBase) {} as any, - class DictionaryVector extends nullableMixin(DictionaryVectorBase) {} as any, - class DictionaryVector extends nullableMixin(fieldMixin(DictionaryVectorBase)) {} as any -) {} - -export class FixedSizeListVector extends MixinArrowTraits( - FixedSizeListVectorBase, - class FixedSizeListVector extends fieldMixin(FixedSizeListVectorBase) {} as any, - class FixedSizeListVector extends nullableMixin(FixedSizeListVectorBase) {} as any, - class FixedSizeListVector extends nullableMixin(fieldMixin(FixedSizeListVectorBase)) {} as any -) {} +export { Vector } +export const MixinListVector = MixinArrowTraits(vectors.ListVector as any, fieldVectors.ListVector as any, nullableVectors.ListVector as any, nullableFieldVectors.ListVector as any); +export class ListVector extends MixinListVector {} +export const MixinBinaryVector = MixinArrowTraits(vectors.BinaryVector as any, fieldVectors.BinaryVector as any, nullableVectors.BinaryVector as any, nullableFieldVectors.BinaryVector as any); +export class BinaryVector extends MixinBinaryVector {} +export const MixinUtf8Vector = MixinArrowTraits(vectors.Utf8Vector as any, fieldVectors.Utf8Vector as any, nullableVectors.Utf8Vector as any, nullableFieldVectors.Utf8Vector as any); +export class Utf8Vector extends MixinUtf8Vector {} +export const MixinBoolVector = MixinArrowTraits(vectors.BoolVector as any, fieldVectors.BoolVector as any, nullableVectors.BoolVector as any, nullableFieldVectors.BoolVector as any); +export class BoolVector extends MixinBoolVector {} +export const MixinInt8Vector = MixinArrowTraits(vectors.Int8Vector as any, fieldVectors.Int8Vector as any, nullableVectors.Int8Vector as any, nullableFieldVectors.Int8Vector as any); +export class Int8Vector extends MixinInt8Vector {} +export const MixinInt16Vector = MixinArrowTraits(vectors.Int16Vector as any, fieldVectors.Int16Vector as any, nullableVectors.Int16Vector as any, nullableFieldVectors.Int16Vector as any); +export class Int16Vector extends MixinInt16Vector {} +export const MixinInt32Vector = MixinArrowTraits(vectors.Int32Vector as any, fieldVectors.Int32Vector as any, nullableVectors.Int32Vector as any, nullableFieldVectors.Int32Vector as any); +export class Int32Vector extends MixinInt32Vector {} +export const MixinInt64Vector = MixinArrowTraits(vectors.Int64Vector as any, fieldVectors.Int64Vector as any, nullableVectors.Int64Vector as any, nullableFieldVectors.Int64Vector as any); +export class Int64Vector extends MixinInt64Vector {} +export const MixinUint8Vector = MixinArrowTraits(vectors.Uint8Vector as any, fieldVectors.Uint8Vector as any, nullableVectors.Uint8Vector as any, nullableFieldVectors.Uint8Vector as any); +export class Uint8Vector extends MixinUint8Vector {} +export const MixinUint16Vector = MixinArrowTraits(vectors.Uint16Vector as any, fieldVectors.Uint16Vector as any, nullableVectors.Uint16Vector as any, nullableFieldVectors.Uint16Vector as any); +export class Uint16Vector extends MixinUint16Vector {} +export const MixinUint32Vector = MixinArrowTraits(vectors.Uint32Vector as any, fieldVectors.Uint32Vector as any, nullableVectors.Uint32Vector as any, nullableFieldVectors.Uint32Vector as any); +export class Uint32Vector extends MixinUint32Vector {} +export const MixinUint64Vector = MixinArrowTraits(vectors.Uint64Vector as any, fieldVectors.Uint64Vector as any, nullableVectors.Uint64Vector as any, nullableFieldVectors.Uint64Vector as any); +export class Uint64Vector extends MixinUint64Vector {} +export const MixinDate32Vector = MixinArrowTraits(vectors.Date32Vector as any, fieldVectors.Date32Vector as any, nullableVectors.Date32Vector as any, nullableFieldVectors.Date32Vector as any); +export class Date32Vector extends MixinDate32Vector {} +export const MixinDate64Vector = MixinArrowTraits(vectors.Date64Vector as any, fieldVectors.Date64Vector as any, nullableVectors.Date64Vector as any, nullableFieldVectors.Date64Vector as any); +export class Date64Vector extends MixinDate64Vector {} +export const MixinTime32Vector = MixinArrowTraits(vectors.Time32Vector as any, fieldVectors.Time32Vector as any, nullableVectors.Time32Vector as any, nullableFieldVectors.Time32Vector as any); +export class Time32Vector extends MixinTime32Vector {} +export const MixinTime64Vector = MixinArrowTraits(vectors.Time64Vector as any, fieldVectors.Time64Vector as any, nullableVectors.Time64Vector as any, nullableFieldVectors.Time64Vector as any); +export class Time64Vector extends MixinTime64Vector {} +export const MixinFloat16Vector = MixinArrowTraits(vectors.Float16Vector as any, fieldVectors.Float16Vector as any, nullableVectors.Float16Vector as any, nullableFieldVectors.Float16Vector as any); +export class Float16Vector extends MixinFloat16Vector {} +export const MixinFloat32Vector = MixinArrowTraits(vectors.Float32Vector as any, fieldVectors.Float32Vector as any, nullableVectors.Float32Vector as any, nullableFieldVectors.Float32Vector as any); +export class Float32Vector extends MixinFloat32Vector {} +export const MixinFloat64Vector = MixinArrowTraits(vectors.Float64Vector as any, fieldVectors.Float64Vector as any, nullableVectors.Float64Vector as any, nullableFieldVectors.Float64Vector as any); +export class Float64Vector extends MixinFloat64Vector {} +export const MixinStructVector = MixinArrowTraits(vectors.StructVector as any, fieldVectors.StructVector as any, nullableVectors.StructVector as any, nullableFieldVectors.StructVector as any); +export class StructVector extends MixinStructVector {} +export const MixinDecimalVector = MixinArrowTraits(vectors.DecimalVector as any, fieldVectors.DecimalVector as any, nullableVectors.DecimalVector as any, nullableFieldVectors.DecimalVector as any); +export class DecimalVector extends MixinDecimalVector {} +export const MixinTimestampVector = MixinArrowTraits(vectors.TimestampVector as any, fieldVectors.TimestampVector as any, nullableVectors.TimestampVector as any, nullableFieldVectors.TimestampVector as any); +export class TimestampVector extends MixinTimestampVector {} +export const MixinDictionaryVector = MixinArrowTraits(vectors.DictionaryVector as any, fieldVectors.DictionaryVector as any, nullableVectors.DictionaryVector as any, nullableFieldVectors.DictionaryVector as any); +export class DictionaryVector extends MixinDictionaryVector {} +export const MixinFixedSizeListVector = MixinArrowTraits(vectors.FixedSizeListVector as any, fieldVectors.FixedSizeListVector as any, nullableVectors.FixedSizeListVector as any, nullableFieldVectors.FixedSizeListVector as any); +export class FixedSizeListVector extends MixinFixedSizeListVector {} diff --git a/js/src/vector/table.ts b/js/src/vector/table.ts index ca19984bb9b..d77c8ed24e2 100644 --- a/js/src/vector/table.ts +++ b/js/src/vector/table.ts @@ -20,24 +20,28 @@ import { StructVector, StructRow } from './struct'; import { readVectors, readVectorsAsync } from '../reader/arrow'; import { readJSON } from '../reader/json'; +function concatVectors(tableVectors: Vector[], batchVectors: Vector[]) { + return tableVectors.length === 0 ? batchVectors : batchVectors.map((vec, i, _vs, col = tableVectors[i]) => + vec && col && col.concat(vec) || col || vec + ) as Vector[] +} + export class Table extends StructVector { - static from(buffers?: Iterable) { + static from(buffersOrJSON?: Iterable | object | string) { + let input: any = buffersOrJSON; let columns: Vector[] = []; - if (buffers) { - for (let vectors of readVectors(buffers)) { - columns = columns.length === 0 ? vectors : vectors.map((vec, i, _vs, col = columns[i]) => - vec && col && col.concat(vec) || col || vec - ) as Vector[]; - } + let batches: Iterable; + if (typeof input === 'string') { + try { input = JSON.parse(input); } + catch (e) { input = buffersOrJSON; } } - return new Table({ columns }); - } - static fromJSON(jsonString: string) { - let columns: Vector[] = []; - for (let vectors of readJSON(jsonString)) { - columns = columns.length === 0 ? vectors : vectors.map((vec, i, _vs, col = columns[i]) => - vec && col && col.concat(vec) || col || vec - ) as Vector[]; + if (!input || typeof input !== 'object') { + batches = (typeof input === 'string') ? readVectors([input]) : []; + } else { + batches = (typeof input[Symbol.iterator] === 'function') ? readVectors(input) : readJSON(input); + } + for (let vectors of batches) { + columns = concatVectors(columns, vectors); } return new Table({ columns }); } @@ -45,9 +49,7 @@ export class Table extends StructVector { let columns: Vector[] = []; if (buffers) { for await (let vectors of readVectorsAsync(buffers)) { - columns = columns.length === 0 ? vectors : vectors.map((vec, i, _vs, col = columns[i]) => - vec && col && col.concat(vec) || col || vec - ) as Vector[]; + columns = columns = concatVectors(columns, vectors); } } return new Table({ columns }); diff --git a/js/src/vector/traits/field.ts b/js/src/vector/traits/field.ts new file mode 100644 index 00000000000..cf5305e02a4 --- /dev/null +++ b/js/src/vector/traits/field.ts @@ -0,0 +1,59 @@ +import { Vector } from '../vector'; +import * as vectors from './vectors'; +import { fieldMixin } from './mixins'; +import { FieldBuilder, FieldNodeBuilder } from '../../format/arrow'; + +export { Vector, FieldBuilder, FieldNodeBuilder }; +import * as Schema_ from '../../format/fb/Schema'; +import * as Message_ from '../../format/fb/Message'; +export import Field = Schema_.org.apache.arrow.flatbuf.Field; +export import FieldNode = Message_.org.apache.arrow.flatbuf.FieldNode; + +export const FieldListVector = fieldMixin(vectors.ListVector); +export class ListVector extends FieldListVector {} +export const FieldBinaryVector = fieldMixin(vectors.BinaryVector); +export class BinaryVector extends FieldBinaryVector {} +export const FieldUtf8Vector = fieldMixin(vectors.Utf8Vector); +export class Utf8Vector extends FieldUtf8Vector {} +export const FieldBoolVector = fieldMixin(vectors.BoolVector); +export class BoolVector extends FieldBoolVector {} +export const FieldInt8Vector = fieldMixin(vectors.Int8Vector); +export class Int8Vector extends FieldInt8Vector {} +export const FieldInt16Vector = fieldMixin(vectors.Int16Vector); +export class Int16Vector extends FieldInt16Vector {} +export const FieldInt32Vector = fieldMixin(vectors.Int32Vector); +export class Int32Vector extends FieldInt32Vector {} +export const FieldInt64Vector = fieldMixin(vectors.Int64Vector); +export class Int64Vector extends FieldInt64Vector {} +export const FieldUint8Vector = fieldMixin(vectors.Uint8Vector); +export class Uint8Vector extends FieldUint8Vector {} +export const FieldUint16Vector = fieldMixin(vectors.Uint16Vector); +export class Uint16Vector extends FieldUint16Vector {} +export const FieldUint32Vector = fieldMixin(vectors.Uint32Vector); +export class Uint32Vector extends FieldUint32Vector {} +export const FieldUint64Vector = fieldMixin(vectors.Uint64Vector); +export class Uint64Vector extends FieldUint64Vector {} +export const FieldDate32Vector = fieldMixin(vectors.Date32Vector); +export class Date32Vector extends FieldDate32Vector {} +export const FieldDate64Vector = fieldMixin(vectors.Date64Vector); +export class Date64Vector extends FieldDate64Vector {} +export const FieldTime32Vector = fieldMixin(vectors.Time32Vector); +export class Time32Vector extends FieldTime32Vector {} +export const FieldTime64Vector = fieldMixin(vectors.Time64Vector); +export class Time64Vector extends FieldTime64Vector {} +export const FieldFloat16Vector = fieldMixin(vectors.Float16Vector); +export class Float16Vector extends FieldFloat16Vector {} +export const FieldFloat32Vector = fieldMixin(vectors.Float32Vector); +export class Float32Vector extends FieldFloat32Vector {} +export const FieldFloat64Vector = fieldMixin(vectors.Float64Vector); +export class Float64Vector extends FieldFloat64Vector {} +export const FieldStructVector = fieldMixin(vectors.StructVector); +export class StructVector extends FieldStructVector {} +export const FieldDecimalVector = fieldMixin(vectors.DecimalVector); +export class DecimalVector extends FieldDecimalVector {} +export const FieldTimestampVector = fieldMixin(vectors.TimestampVector); +export class TimestampVector extends FieldTimestampVector {} +export const FieldDictionaryVector = fieldMixin(vectors.DictionaryVector); +export class DictionaryVector extends FieldDictionaryVector {} +export const FieldFixedSizeListVector = fieldMixin(vectors.FixedSizeListVector); +export class FixedSizeListVector extends FieldFixedSizeListVector {} \ No newline at end of file diff --git a/js/src/vector/traits.ts b/js/src/vector/traits/mixins.ts similarity index 80% rename from js/src/vector/traits.ts rename to js/src/vector/traits/mixins.ts index b911823a3c7..bb431dc40f6 100644 --- a/js/src/vector/traits.ts +++ b/js/src/vector/traits/mixins.ts @@ -15,19 +15,26 @@ // specific language governing permissions and limitations // under the License. -import { Vector } from './vector'; -import { BoolVector } from './numeric'; -import { fb, FieldBuilder, FieldNodeBuilder } from '../format/arrow'; +import { Vector } from '../vector'; +import { BoolVector } from '../numeric'; +import { FieldBuilder, FieldNodeBuilder } from '../../format/arrow'; -export type Field = ( fb.Schema.Field | FieldBuilder ); -export type FieldNode = ( fb.Message.FieldNode | FieldNodeBuilder ); +import * as Schema_ from '../../format/fb/Schema'; +import * as Message_ from '../../format/fb/Message'; + +import Type = Schema_.org.apache.arrow.flatbuf.Type; +import Field_ = Schema_.org.apache.arrow.flatbuf.Field; +import FieldNode_ = Message_.org.apache.arrow.flatbuf.FieldNode; + +export type Field = Field_ | FieldBuilder; +export type FieldNode = FieldNode_ | FieldNodeBuilder; function isField(x: any): x is Field { - return x instanceof fb.Schema.Field || x instanceof FieldBuilder; + return x instanceof Field_ || x instanceof FieldBuilder; } function isFieldNode(x: any): x is FieldNode { - return x instanceof fb.Message.FieldNode || x instanceof FieldNodeBuilder; + return x instanceof FieldNode_ || x instanceof FieldNodeBuilder; } export function isFieldArgv(x: any): x is { field: Field, fieldNode: FieldNode } { @@ -67,7 +74,7 @@ export const fieldMixin = (superclass: new (argv: TArgv this.field = field; this.fieldNode = fieldNode; this.nullable = field.nullable(); - this.type = fb.Schema.Type[field.typeType()]; + this.type = Type[field.typeType()]; this.length = fieldNode.length().low | 0; this.nullCount = fieldNode.nullCount().low; } diff --git a/js/src/vector/traits/nullable.ts b/js/src/vector/traits/nullable.ts new file mode 100644 index 00000000000..3d6b7863afa --- /dev/null +++ b/js/src/vector/traits/nullable.ts @@ -0,0 +1,53 @@ +import { Vector } from '../vector'; +import * as vectors from './vectors'; +import { nullableMixin } from './mixins'; + +export { Vector }; +export const NullableListVector = nullableMixin(vectors.ListVector); +export class ListVector extends NullableListVector {} +export const NullableBinaryVector = nullableMixin(vectors.BinaryVector); +export class BinaryVector extends NullableBinaryVector {} +export const NullableUtf8Vector = nullableMixin(vectors.Utf8Vector); +export class Utf8Vector extends NullableUtf8Vector {} +export const NullableBoolVector = nullableMixin(vectors.BoolVector); +export class BoolVector extends NullableBoolVector {} +export const NullableInt8Vector = nullableMixin(vectors.Int8Vector); +export class Int8Vector extends NullableInt8Vector {} +export const NullableInt16Vector = nullableMixin(vectors.Int16Vector); +export class Int16Vector extends NullableInt16Vector {} +export const NullableInt32Vector = nullableMixin(vectors.Int32Vector); +export class Int32Vector extends NullableInt32Vector {} +export const NullableInt64Vector = nullableMixin(vectors.Int64Vector); +export class Int64Vector extends NullableInt64Vector {} +export const NullableUint8Vector = nullableMixin(vectors.Uint8Vector); +export class Uint8Vector extends NullableUint8Vector {} +export const NullableUint16Vector = nullableMixin(vectors.Uint16Vector); +export class Uint16Vector extends NullableUint16Vector {} +export const NullableUint32Vector = nullableMixin(vectors.Uint32Vector); +export class Uint32Vector extends NullableUint32Vector {} +export const NullableUint64Vector = nullableMixin(vectors.Uint64Vector); +export class Uint64Vector extends NullableUint64Vector {} +export const NullableDate32Vector = nullableMixin(vectors.Date32Vector); +export class Date32Vector extends NullableDate32Vector {} +export const NullableDate64Vector = nullableMixin(vectors.Date64Vector); +export class Date64Vector extends NullableDate64Vector {} +export const NullableTime32Vector = nullableMixin(vectors.Time32Vector); +export class Time32Vector extends NullableTime32Vector {} +export const NullableTime64Vector = nullableMixin(vectors.Time64Vector); +export class Time64Vector extends NullableTime64Vector {} +export const NullableFloat16Vector = nullableMixin(vectors.Float16Vector); +export class Float16Vector extends NullableFloat16Vector {} +export const NullableFloat32Vector = nullableMixin(vectors.Float32Vector); +export class Float32Vector extends NullableFloat32Vector {} +export const NullableFloat64Vector = nullableMixin(vectors.Float64Vector); +export class Float64Vector extends NullableFloat64Vector {} +export const NullableStructVector = nullableMixin(vectors.StructVector); +export class StructVector extends NullableStructVector {} +export const NullableDecimalVector = nullableMixin(vectors.DecimalVector); +export class DecimalVector extends NullableDecimalVector {} +export const NullableTimestampVector = nullableMixin(vectors.TimestampVector); +export class TimestampVector extends NullableTimestampVector {} +export const NullableDictionaryVector = nullableMixin(vectors.DictionaryVector); +export class DictionaryVector extends NullableDictionaryVector {} +export const NullableFixedSizeListVector = nullableMixin(vectors.FixedSizeListVector); +export class FixedSizeListVector extends NullableFixedSizeListVector {} \ No newline at end of file diff --git a/js/src/vector/traits/nullablefield.ts b/js/src/vector/traits/nullablefield.ts new file mode 100644 index 00000000000..1035399d8cf --- /dev/null +++ b/js/src/vector/traits/nullablefield.ts @@ -0,0 +1,59 @@ +import { Vector } from '../vector'; +import * as vectors from './vectors'; +import { nullableMixin, fieldMixin } from './mixins'; +import { FieldBuilder, FieldNodeBuilder } from '../../format/arrow'; + +export { Vector, FieldBuilder, FieldNodeBuilder }; +import * as Schema_ from '../../format/fb/Schema'; +import * as Message_ from '../../format/fb/Message'; +export import Field = Schema_.org.apache.arrow.flatbuf.Field; +export import FieldNode = Message_.org.apache.arrow.flatbuf.FieldNode; + +export const NullableFieldListVector = nullableMixin(fieldMixin(vectors.ListVector)); +export class ListVector extends NullableFieldListVector {} +export const NullableFieldBinaryVector = nullableMixin(fieldMixin(vectors.BinaryVector)); +export class BinaryVector extends NullableFieldBinaryVector {} +export const NullableFieldUtf8Vector = nullableMixin(fieldMixin(vectors.Utf8Vector)); +export class Utf8Vector extends NullableFieldUtf8Vector {} +export const NullableFieldBoolVector = nullableMixin(fieldMixin(vectors.BoolVector)); +export class BoolVector extends NullableFieldBoolVector {} +export const NullableFieldInt8Vector = nullableMixin(fieldMixin(vectors.Int8Vector)); +export class Int8Vector extends NullableFieldInt8Vector {} +export const NullableFieldInt16Vector = nullableMixin(fieldMixin(vectors.Int16Vector)); +export class Int16Vector extends NullableFieldInt16Vector {} +export const NullableFieldInt32Vector = nullableMixin(fieldMixin(vectors.Int32Vector)); +export class Int32Vector extends NullableFieldInt32Vector {} +export const NullableFieldInt64Vector = nullableMixin(fieldMixin(vectors.Int64Vector)); +export class Int64Vector extends NullableFieldInt64Vector {} +export const NullableFieldUint8Vector = nullableMixin(fieldMixin(vectors.Uint8Vector)); +export class Uint8Vector extends NullableFieldUint8Vector {} +export const NullableFieldUint16Vector = nullableMixin(fieldMixin(vectors.Uint16Vector)); +export class Uint16Vector extends NullableFieldUint16Vector {} +export const NullableFieldUint32Vector = nullableMixin(fieldMixin(vectors.Uint32Vector)); +export class Uint32Vector extends NullableFieldUint32Vector {} +export const NullableFieldUint64Vector = nullableMixin(fieldMixin(vectors.Uint64Vector)); +export class Uint64Vector extends NullableFieldUint64Vector {} +export const NullableFieldDate32Vector = nullableMixin(fieldMixin(vectors.Date32Vector)); +export class Date32Vector extends NullableFieldDate32Vector {} +export const NullableFieldDate64Vector = nullableMixin(fieldMixin(vectors.Date64Vector)); +export class Date64Vector extends NullableFieldDate64Vector {} +export const NullableFieldTime32Vector = nullableMixin(fieldMixin(vectors.Time32Vector)); +export class Time32Vector extends NullableFieldTime32Vector {} +export const NullableFieldTime64Vector = nullableMixin(fieldMixin(vectors.Time64Vector)); +export class Time64Vector extends NullableFieldTime64Vector {} +export const NullableFieldFloat16Vector = nullableMixin(fieldMixin(vectors.Float16Vector)); +export class Float16Vector extends NullableFieldFloat16Vector {} +export const NullableFieldFloat32Vector = nullableMixin(fieldMixin(vectors.Float32Vector)); +export class Float32Vector extends NullableFieldFloat32Vector {} +export const NullableFieldFloat64Vector = nullableMixin(fieldMixin(vectors.Float64Vector)); +export class Float64Vector extends NullableFieldFloat64Vector {} +export const NullableFieldStructVector = nullableMixin(fieldMixin(vectors.StructVector)); +export class StructVector extends NullableFieldStructVector {} +export const NullableFieldDecimalVector = nullableMixin(fieldMixin(vectors.DecimalVector)); +export class DecimalVector extends NullableFieldDecimalVector {} +export const NullableFieldTimestampVector = nullableMixin(fieldMixin(vectors.TimestampVector)); +export class TimestampVector extends NullableFieldTimestampVector {} +export const NullableFieldDictionaryVector = nullableMixin(fieldMixin(vectors.DictionaryVector)); +export class DictionaryVector extends NullableFieldDictionaryVector {} +export const NullableFieldFixedSizeListVector = nullableMixin(fieldMixin(vectors.FixedSizeListVector)); +export class FixedSizeListVector extends NullableFieldFixedSizeListVector {} \ No newline at end of file diff --git a/js/src/vector/traits/vectors.ts b/js/src/vector/traits/vectors.ts new file mode 100644 index 00000000000..558efa62a94 --- /dev/null +++ b/js/src/vector/traits/vectors.ts @@ -0,0 +1,58 @@ +import { Vector } from '../vector'; +import { Utf8Vector } from '../utf8'; +import { StructVector } from '../struct'; +import { DictionaryVector } from '../dictionary'; +import { + ListVector, + BinaryVector, + FixedSizeListVector +} from '../list'; + +import { + BoolVector, + Int8Vector, + Int16Vector, + Int32Vector, + Int64Vector, + Uint8Vector, + Uint16Vector, + Uint32Vector, + Uint64Vector, + Float16Vector, + Float32Vector, + Float64Vector, + Date32Vector, + Date64Vector, + Time32Vector, + Time64Vector, + DecimalVector, + TimestampVector, +} from '../numeric'; + +export { + Vector, + BoolVector, + ListVector, + Utf8Vector, + Int8Vector, + Int16Vector, + Int32Vector, + Int64Vector, + Uint8Vector, + Uint16Vector, + Uint32Vector, + Uint64Vector, + Date32Vector, + Date64Vector, + Time32Vector, + Time64Vector, + BinaryVector, + StructVector, + Float16Vector, + Float32Vector, + Float64Vector, + DecimalVector, + TimestampVector, + DictionaryVector, + FixedSizeListVector, +}; diff --git a/js/src/vector/vector.ts b/js/src/vector/vector.ts index 8047c899232..0810f5cd80d 100644 --- a/js/src/vector/vector.ts +++ b/js/src/vector/vector.ts @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -import * as Schema_ from '../format/Schema'; +import * as Schema_ from '../format/fb/Schema'; import Type = Schema_.org.apache.arrow.flatbuf.Type; export interface Vector extends Iterable { diff --git a/js/tsconfig/tsconfig.es5.cls.json b/js/tsconfig/tsconfig.es5.cls.json index 502432da043..e244914ad10 100644 --- a/js/tsconfig/tsconfig.es5.cls.json +++ b/js/tsconfig/tsconfig.es5.cls.json @@ -3,6 +3,8 @@ "extends": "./tsconfig.base.json", "compilerOptions": { "target": "ES5", + // uncomment when google-closure-compiler releases a new version + // "target": "es2015", "module": "es2015", "declaration": false } diff --git a/python/pyarrow/includes/libarrow.pxd b/python/pyarrow/includes/libarrow.pxd index dbfd89cc378..73e34c7b210 100644 --- a/python/pyarrow/includes/libarrow.pxd +++ b/python/pyarrow/includes/libarrow.pxd @@ -403,8 +403,10 @@ cdef extern from "arrow/api.h" namespace "arrow" nogil: shared_ptr[CChunkedArray] data() cdef cppclass CRecordBatch" arrow::RecordBatch": - CRecordBatch(const shared_ptr[CSchema]& schema, int64_t num_rows, - const vector[shared_ptr[CArray]]& columns) + @staticmethod + shared_ptr[CRecordBatch] Make( + const shared_ptr[CSchema]& schema, int64_t num_rows, + const vector[shared_ptr[CArray]]& columns) c_bool Equals(const CRecordBatch& other) @@ -427,6 +429,11 @@ cdef extern from "arrow/api.h" namespace "arrow" nogil: CTable(const shared_ptr[CSchema]& schema, const vector[shared_ptr[CColumn]]& columns) + @staticmethod + shared_ptr[CTable] Make( + const shared_ptr[CSchema]& schema, + const vector[shared_ptr[CColumn]]& columns) + @staticmethod CStatus FromRecordBatches( const vector[shared_ptr[CRecordBatch]]& batches, diff --git a/python/pyarrow/table.pxi b/python/pyarrow/table.pxi index 591f3297587..8c5b8bbc343 100644 --- a/python/pyarrow/table.pxi +++ b/python/pyarrow/table.pxi @@ -724,7 +724,6 @@ cdef class RecordBatch: Array arr c_string c_name shared_ptr[CSchema] schema - shared_ptr[CRecordBatch] batch vector[shared_ptr[CArray]] c_arrays int64_t num_rows int64_t i @@ -740,8 +739,8 @@ cdef class RecordBatch: for arr in arrays: c_arrays.push_back(arr.sp_array) - batch.reset(new CRecordBatch(schema, num_rows, c_arrays)) - return pyarrow_wrap_batch(batch) + return pyarrow_wrap_batch( + CRecordBatch.Make(schema, num_rows, c_arrays)) def table_to_blocks(PandasOptions options, Table table, int nthreads, @@ -946,8 +945,7 @@ cdef class Table: else: raise ValueError(type(arrays[i])) - table.reset(new CTable(c_schema, columns)) - return pyarrow_wrap_table(table) + return pyarrow_wrap_table(CTable.Make(c_schema, columns)) @staticmethod def from_batches(batches): diff --git a/site/_posts/2017-07-26-spark-arrow.md b/site/_posts/2017-07-26-spark-arrow.md index c4b16c0738c..211e5a481b4 100644 --- a/site/_posts/2017-07-26-spark-arrow.md +++ b/site/_posts/2017-07-26-spark-arrow.md @@ -57,7 +57,7 @@ the conversion to Arrow data can be done on the JVM and pushed back for the Spar executors to perform in parallel, drastically reducing the load on the driver. As of the merging of [SPARK-13534][5], the use of Arrow when calling `toPandas()` -needs to be enabled by setting the SQLConf "spark.sql.execution.arrow.enable" to +needs to be enabled by setting the SQLConf "spark.sql.execution.arrow.enabled" to "true". Let's look at a simple usage example. ``` @@ -84,7 +84,7 @@ In [2]: %time pdf = df.toPandas() CPU times: user 17.4 s, sys: 792 ms, total: 18.1 s Wall time: 20.7 s -In [3]: spark.conf.set("spark.sql.execution.arrow.enable", "true") +In [3]: spark.conf.set("spark.sql.execution.arrow.enabled", "true") In [4]: %time pdf = df.toPandas() CPU times: user 40 ms, sys: 32 ms, total: 72 ms @@ -118,7 +118,7 @@ It is planned to add pyarrow as a pyspark dependency so that Currently, the controlling SQLConf is disabled by default. This can be enabled programmatically as in the example above or by adding the line -"spark.sql.execution.arrow.enable=true" to `SPARK_HOME/conf/spark-defaults.conf`. +"spark.sql.execution.arrow.enabled=true" to `SPARK_HOME/conf/spark-defaults.conf`. Also, not all Spark data types are currently supported and limited to primitive types. Expanded type support is in the works and expected to also be in the Spark diff --git a/site/install.md b/site/install.md index 0ef2008db90..67b26983b46 100644 --- a/site/install.md +++ b/site/install.md @@ -53,7 +53,7 @@ Install them with: ```shell conda install arrow-cpp=0.7.* -c conda-forge -conda install pyarrow==0.7.* -c conda-forge +conda install pyarrow=0.7.* -c conda-forge ``` ### Python Wheels on PyPI (Unofficial)