diff --git a/cpp/src/arrow/io/hdfs-io-test.cc b/cpp/src/arrow/io/hdfs-io-test.cc index d1bf140ae68..e48a28142fa 100644 --- a/cpp/src/arrow/io/hdfs-io-test.cc +++ b/cpp/src/arrow/io/hdfs-io-test.cc @@ -266,7 +266,7 @@ TEST_F(TestHdfsClient, ReadableMethods) { ASSERT_EQ(size, file_size); uint8_t buffer[50]; - int32_t bytes_read = 0; + int64_t bytes_read = 0; ASSERT_OK(file->Read(50, &bytes_read, buffer)); ASSERT_EQ(0, std::memcmp(buffer, data.data(), 50)); diff --git a/cpp/src/arrow/io/hdfs.cc b/cpp/src/arrow/io/hdfs.cc index 6da6ea4e71b..800c3edf4f3 100644 --- a/cpp/src/arrow/io/hdfs.cc +++ b/cpp/src/arrow/io/hdfs.cc @@ -100,7 +100,7 @@ class HdfsReadableFile::HdfsReadableFileImpl : public HdfsAnyFileImpl { return Status::OK(); } - Status ReadAt(int64_t position, int32_t nbytes, int32_t* bytes_read, uint8_t* buffer) { + Status ReadAt(int64_t position, int64_t nbytes, int64_t* bytes_read, uint8_t* buffer) { tSize ret = hdfsPread(fs_, file_, static_cast(position), reinterpret_cast(buffer), nbytes); RETURN_NOT_OK(CheckReadResult(ret)); @@ -108,7 +108,7 @@ class HdfsReadableFile::HdfsReadableFileImpl : public HdfsAnyFileImpl { return Status::OK(); } - Status Read(int32_t nbytes, int32_t* bytes_read, uint8_t* buffer) { + Status Read(int64_t nbytes, int64_t* bytes_read, uint8_t* buffer) { tSize ret = hdfsRead(fs_, file_, reinterpret_cast(buffer), nbytes); RETURN_NOT_OK(CheckReadResult(ret)); *bytes_read = ret; @@ -138,11 +138,11 @@ Status HdfsReadableFile::Close() { } Status HdfsReadableFile::ReadAt( - int64_t position, int32_t nbytes, int32_t* bytes_read, uint8_t* buffer) { + int64_t position, int64_t nbytes, int64_t* bytes_read, uint8_t* buffer) { return impl_->ReadAt(position, nbytes, bytes_read, buffer); } -Status HdfsReadableFile::Read(int32_t nbytes, int32_t* bytes_read, uint8_t* buffer) { +Status HdfsReadableFile::Read(int64_t nbytes, int64_t* bytes_read, uint8_t* buffer) { return impl_->Read(nbytes, bytes_read, buffer); } @@ -177,7 +177,7 @@ class HdfsWriteableFile::HdfsWriteableFileImpl : public HdfsAnyFileImpl { return Status::OK(); } - Status Write(const uint8_t* buffer, int32_t nbytes, int32_t* bytes_written) { + Status Write(const uint8_t* buffer, int64_t nbytes, int64_t* bytes_written) { tSize ret = hdfsWrite(fs_, file_, reinterpret_cast(buffer), nbytes); CHECK_FAILURE(ret, "Write"); *bytes_written = ret; @@ -198,12 +198,12 @@ Status HdfsWriteableFile::Close() { } Status HdfsWriteableFile::Write( - const uint8_t* buffer, int32_t nbytes, int32_t* bytes_read) { + const uint8_t* buffer, int64_t nbytes, int64_t* bytes_read) { return impl_->Write(buffer, nbytes, bytes_read); } -Status HdfsWriteableFile::Write(const uint8_t* buffer, int32_t nbytes) { - int32_t bytes_written_dummy = 0; +Status HdfsWriteableFile::Write(const uint8_t* buffer, int64_t nbytes) { + int64_t bytes_written_dummy = 0; return Write(buffer, nbytes, &bytes_written_dummy); } diff --git a/cpp/src/arrow/io/hdfs.h b/cpp/src/arrow/io/hdfs.h index 532e3c536a1..b6449fcb88a 100644 --- a/cpp/src/arrow/io/hdfs.h +++ b/cpp/src/arrow/io/hdfs.h @@ -164,14 +164,14 @@ class ARROW_EXPORT HdfsReadableFile : public RandomAccessFile { Status GetSize(int64_t* size) override; Status ReadAt( - int64_t position, int32_t nbytes, int32_t* bytes_read, uint8_t* buffer) override; + int64_t position, int64_t nbytes, int64_t* bytes_read, uint8_t* buffer) override; Status Seek(int64_t position) override; Status Tell(int64_t* position) override; // NOTE: If you wish to read a particular range of a file in a multithreaded // context, you may prefer to use ReadAt to avoid locking issues - Status Read(int32_t nbytes, int32_t* bytes_read, uint8_t* buffer) override; + Status Read(int64_t nbytes, int64_t* bytes_read, uint8_t* buffer) override; private: class ARROW_NO_EXPORT HdfsReadableFileImpl; @@ -189,9 +189,9 @@ class ARROW_EXPORT HdfsWriteableFile : public WriteableFile { Status Close() override; - Status Write(const uint8_t* buffer, int32_t nbytes) override; + Status Write(const uint8_t* buffer, int64_t nbytes) override; - Status Write(const uint8_t* buffer, int32_t nbytes, int32_t* bytes_written); + Status Write(const uint8_t* buffer, int64_t nbytes, int64_t* bytes_written); Status Tell(int64_t* position) override; diff --git a/cpp/src/arrow/io/interfaces.h b/cpp/src/arrow/io/interfaces.h index 4bd8a8ffc2f..25361d5633d 100644 --- a/cpp/src/arrow/io/interfaces.h +++ b/cpp/src/arrow/io/interfaces.h @@ -15,8 +15,8 @@ // specific language governing permissions and limitations // under the License. -#ifndef ARROW_IO_INTERFACES -#define ARROW_IO_INTERFACES +#ifndef ARROW_IO_INTERFACES_H +#define ARROW_IO_INTERFACES_H #include @@ -40,17 +40,17 @@ class FileSystemClient { }; class FileBase { + public: virtual Status Close() = 0; - virtual Status Tell(int64_t* position) = 0; }; class ReadableFile : public FileBase { public: virtual Status ReadAt( - int64_t position, int32_t nbytes, int32_t* bytes_read, uint8_t* buffer) = 0; + int64_t position, int64_t nbytes, int64_t* bytes_read, uint8_t* buffer) = 0; - virtual Status Read(int32_t nbytes, int32_t* bytes_read, uint8_t* buffer) = 0; + virtual Status Read(int64_t nbytes, int64_t* bytes_read, uint8_t* buffer) = 0; virtual Status GetSize(int64_t* size) = 0; }; @@ -62,10 +62,10 @@ class RandomAccessFile : public ReadableFile { class WriteableFile : public FileBase { public: - virtual Status Write(const uint8_t* buffer, int32_t nbytes) = 0; + virtual Status Write(const uint8_t* buffer, int64_t nbytes) = 0; }; } // namespace io } // namespace arrow -#endif // ARROW_IO_INTERFACES +#endif // ARROW_IO_INTERFACES_H diff --git a/cpp/src/arrow/parquet/CMakeLists.txt b/cpp/src/arrow/parquet/CMakeLists.txt index 00f19b354e3..f2a90b71a49 100644 --- a/cpp/src/arrow/parquet/CMakeLists.txt +++ b/cpp/src/arrow/parquet/CMakeLists.txt @@ -19,6 +19,7 @@ # arrow_parquet : Arrow <-> Parquet adapter set(PARQUET_SRCS + io.cc reader.cc schema.cc writer.cc @@ -48,8 +49,12 @@ ARROW_TEST_LINK_LIBRARIES(parquet-schema-test arrow_parquet) ADD_ARROW_TEST(parquet-io-test) ARROW_TEST_LINK_LIBRARIES(parquet-io-test arrow_parquet) +ADD_ARROW_TEST(parquet-reader-writer-test) +ARROW_TEST_LINK_LIBRARIES(parquet-reader-writer-test arrow_parquet) + # Headers: top level install(FILES + io.h reader.h schema.h utils.h diff --git a/cpp/src/arrow/parquet/io.cc b/cpp/src/arrow/parquet/io.cc new file mode 100644 index 00000000000..c81aa8c4da9 --- /dev/null +++ b/cpp/src/arrow/parquet/io.cc @@ -0,0 +1,94 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "arrow/parquet/io.h" + +#include +#include + +#include "parquet/api/io.h" + +#include "arrow/parquet/utils.h" +#include "arrow/util/memory-pool.h" +#include "arrow/util/status.h" + +// To assist with readability +using ArrowROFile = arrow::io::RandomAccessFile; + +namespace arrow { +namespace parquet { + +// ---------------------------------------------------------------------- +// ParquetAllocator + +ParquetAllocator::ParquetAllocator() : pool_(default_memory_pool()) {} + +ParquetAllocator::ParquetAllocator(MemoryPool* pool) : pool_(pool) {} + +ParquetAllocator::~ParquetAllocator() {} + +uint8_t* ParquetAllocator::Malloc(int64_t size) { + uint8_t* result; + PARQUET_THROW_NOT_OK(pool_->Allocate(size, &result)); + return result; +} + +void ParquetAllocator::Free(uint8_t* buffer, int64_t size) { + // Does not report Status + pool_->Free(buffer, size); +} + +// ---------------------------------------------------------------------- +// ParquetReadSource + +ParquetReadSource::ParquetReadSource( + const std::shared_ptr& file, ParquetAllocator* allocator) + : file_(file), allocator_(allocator) {} + +void ParquetReadSource::Close() { + PARQUET_THROW_NOT_OK(file_->Close()); +} + +int64_t ParquetReadSource::Tell() const { + int64_t position; + PARQUET_THROW_NOT_OK(file_->Tell(&position)); + return position; +} + +void ParquetReadSource::Seek(int64_t position) { + PARQUET_THROW_NOT_OK(file_->Seek(position)); +} + +int64_t ParquetReadSource::Read(int64_t nbytes, uint8_t* out) { + int64_t bytes_read; + PARQUET_THROW_NOT_OK(file_->Read(nbytes, &bytes_read, out)); + return bytes_read; +} + +std::shared_ptr<::parquet::Buffer> ParquetReadSource::Read(int64_t nbytes) { + // TODO(wesm): This code is duplicated from parquet/util/input.cc; suggests + // that there should be more code sharing amongst file-like sources + auto result = std::make_shared<::parquet::OwnedMutableBuffer>(0, allocator_); + result->Resize(nbytes); + + int64_t bytes_read = Read(nbytes, result->mutable_data()); + if (bytes_read < nbytes) { result->Resize(bytes_read); } + return result; +} + +} // namespace parquet +} // namespace arrow diff --git a/cpp/src/arrow/parquet/io.h b/cpp/src/arrow/parquet/io.h new file mode 100644 index 00000000000..ef8871da4df --- /dev/null +++ b/cpp/src/arrow/parquet/io.h @@ -0,0 +1,80 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// Bridges Arrow's IO interfaces and Parquet-cpp's IO interfaces + +#ifndef ARROW_PARQUET_IO_H +#define ARROW_PARQUET_IO_H + +#include +#include + +#include "parquet/api/io.h" + +#include "arrow/io/interfaces.h" +#include "arrow/util/visibility.h" + +namespace arrow { + +class MemoryPool; + +namespace parquet { + +// An implementation of the Parquet MemoryAllocator API that plugs into an +// existing Arrow memory pool. This way we can direct all allocations to a +// single place rather than tracking allocations in different locations (for +// example: without utilizing parquet-cpp's default allocator) +class ARROW_EXPORT ParquetAllocator : public ::parquet::MemoryAllocator { + public: + // Uses the default memory pool + ParquetAllocator(); + + explicit ParquetAllocator(MemoryPool* pool); + virtual ~ParquetAllocator(); + + uint8_t* Malloc(int64_t size) override; + void Free(uint8_t* buffer, int64_t size) override; + + MemoryPool* pool() { return pool_; } + + private: + MemoryPool* pool_; +}; + +class ARROW_EXPORT ParquetReadSource : public ::parquet::RandomAccessSource { + public: + ParquetReadSource( + const std::shared_ptr& file, ParquetAllocator* allocator); + + void Close() override; + int64_t Tell() const override; + void Seek(int64_t pos) override; + int64_t Read(int64_t nbytes, uint8_t* out) override; + std::shared_ptr<::parquet::Buffer> Read(int64_t nbytes) override; + + private: + // An Arrow readable file of some kind + std::shared_ptr file_; + + // The allocator is required for creating managed buffers + ParquetAllocator* allocator_; +}; + +} // namespace parquet +} // namespace arrow + +#endif // ARROW_PARQUET_IO_H diff --git a/cpp/src/arrow/parquet/parquet-io-test.cc b/cpp/src/arrow/parquet/parquet-io-test.cc index bfc27d26d63..7e724b31e38 100644 --- a/cpp/src/arrow/parquet/parquet-io-test.cc +++ b/cpp/src/arrow/parquet/parquet-io-test.cc @@ -15,475 +15,164 @@ // specific language governing permissions and limitations // under the License. +#include +#include +#include +#include + #include "gtest/gtest.h" -#include "arrow/test-util.h" -#include "arrow/parquet/test-util.h" -#include "arrow/parquet/reader.h" -#include "arrow/parquet/writer.h" -#include "arrow/types/construct.h" -#include "arrow/types/primitive.h" -#include "arrow/types/string.h" +#include "arrow/parquet/io.h" #include "arrow/util/memory-pool.h" #include "arrow/util/status.h" -#include "parquet/api/reader.h" -#include "parquet/api/writer.h" - -using ParquetBuffer = parquet::Buffer; -using parquet::BufferReader; -using parquet::default_writer_properties; -using parquet::InMemoryOutputStream; -using parquet::LogicalType; -using parquet::ParquetFileReader; -using parquet::ParquetFileWriter; -using parquet::RandomAccessSource; -using parquet::Repetition; -using parquet::SchemaDescriptor; -using parquet::ParquetVersion; -using ParquetType = parquet::Type; -using parquet::schema::GroupNode; -using parquet::schema::NodePtr; -using parquet::schema::PrimitiveNode; +#include "parquet/api/io.h" namespace arrow { - namespace parquet { -const int SMALL_SIZE = 100; -const int LARGE_SIZE = 10000; - -template -struct test_traits {}; +// Allocator tests -template <> -struct test_traits { - static constexpr ParquetType::type parquet_enum = ParquetType::BOOLEAN; - static constexpr LogicalType::type logical_enum = LogicalType::NONE; - static uint8_t const value; -}; - -const uint8_t test_traits::value(1); +TEST(TestParquetAllocator, DefaultCtor) { + ParquetAllocator allocator; -template <> -struct test_traits { - static constexpr ParquetType::type parquet_enum = ParquetType::INT32; - static constexpr LogicalType::type logical_enum = LogicalType::UINT_8; - static uint8_t const value; -}; + const int buffer_size = 10; -const uint8_t test_traits::value(64); - -template <> -struct test_traits { - static constexpr ParquetType::type parquet_enum = ParquetType::INT32; - static constexpr LogicalType::type logical_enum = LogicalType::INT_8; - static int8_t const value; -}; - -const int8_t test_traits::value(-64); - -template <> -struct test_traits { - static constexpr ParquetType::type parquet_enum = ParquetType::INT32; - static constexpr LogicalType::type logical_enum = LogicalType::UINT_16; - static uint16_t const value; -}; + uint8_t* buffer = nullptr; + ASSERT_NO_THROW(buffer = allocator.Malloc(buffer_size);); -const uint16_t test_traits::value(1024); + // valgrind will complain if we write into nullptr + memset(buffer, 0, buffer_size); -template <> -struct test_traits { - static constexpr ParquetType::type parquet_enum = ParquetType::INT32; - static constexpr LogicalType::type logical_enum = LogicalType::INT_16; - static int16_t const value; -}; - -const int16_t test_traits::value(-1024); - -template <> -struct test_traits { - static constexpr ParquetType::type parquet_enum = ParquetType::INT32; - static constexpr LogicalType::type logical_enum = LogicalType::UINT_32; - static uint32_t const value; -}; - -const uint32_t test_traits::value(1024); - -template <> -struct test_traits { - static constexpr ParquetType::type parquet_enum = ParquetType::INT32; - static constexpr LogicalType::type logical_enum = LogicalType::NONE; - static int32_t const value; -}; - -const int32_t test_traits::value(-1024); - -template <> -struct test_traits { - static constexpr ParquetType::type parquet_enum = ParquetType::INT64; - static constexpr LogicalType::type logical_enum = LogicalType::UINT_64; - static uint64_t const value; -}; - -const uint64_t test_traits::value(1024); - -template <> -struct test_traits { - static constexpr ParquetType::type parquet_enum = ParquetType::INT64; - static constexpr LogicalType::type logical_enum = LogicalType::NONE; - static int64_t const value; -}; - -const int64_t test_traits::value(-1024); - -template <> -struct test_traits { - static constexpr ParquetType::type parquet_enum = ParquetType::FLOAT; - static constexpr LogicalType::type logical_enum = LogicalType::NONE; - static float const value; -}; - -const float test_traits::value(2.1f); - -template <> -struct test_traits { - static constexpr ParquetType::type parquet_enum = ParquetType::DOUBLE; - static constexpr LogicalType::type logical_enum = LogicalType::NONE; - static double const value; -}; - -const double test_traits::value(4.2); - -template <> -struct test_traits { - static constexpr ParquetType::type parquet_enum = ParquetType::BYTE_ARRAY; - static constexpr LogicalType::type logical_enum = LogicalType::UTF8; - static std::string const value; -}; - -const std::string test_traits::value("Test"); - -template -using ParquetDataType = ::parquet::DataType::parquet_enum>; - -template -using ParquetWriter = ::parquet::TypedColumnWriter>; + allocator.Free(buffer, buffer_size); +} -template -class TestParquetIO : public ::testing::Test { +// Pass through to the default memory pool +class TrackingPool : public MemoryPool { public: - virtual void SetUp() {} - - std::shared_ptr MakeSchema(Repetition::type repetition) { - auto pnode = PrimitiveNode::Make("column1", repetition, - test_traits::parquet_enum, test_traits::logical_enum); - NodePtr node_ = - GroupNode::Make("schema", Repetition::REQUIRED, std::vector({pnode})); - return std::static_pointer_cast(node_); - } - - std::unique_ptr MakeWriter( - const std::shared_ptr& schema) { - sink_ = std::make_shared(); - return ParquetFileWriter::Open(sink_, schema); - } - - std::unique_ptr ReaderFromSink() { - std::shared_ptr buffer = sink_->GetBuffer(); - std::unique_ptr source(new BufferReader(buffer)); - return ParquetFileReader::Open(std::move(source)); - } - - void ReadSingleColumnFile( - std::unique_ptr file_reader, std::shared_ptr* out) { - arrow::parquet::FileReader reader(default_memory_pool(), std::move(file_reader)); - std::unique_ptr column_reader; - ASSERT_OK_NO_THROW(reader.GetFlatColumn(0, &column_reader)); - ASSERT_NE(nullptr, column_reader.get()); - - ASSERT_OK(column_reader->NextBatch(SMALL_SIZE, out)); - ASSERT_NE(nullptr, out->get()); - } + TrackingPool() : pool_(default_memory_pool()), bytes_allocated_(0) {} - void ReadAndCheckSingleColumnFile(Array* values) { - std::shared_ptr out; - ReadSingleColumnFile(ReaderFromSink(), &out); - ASSERT_TRUE(values->Equals(out)); + Status Allocate(int64_t size, uint8_t** out) override { + RETURN_NOT_OK(pool_->Allocate(size, out)); + bytes_allocated_ += size; + return Status::OK(); } - void ReadTableFromFile( - std::unique_ptr file_reader, std::shared_ptr* out) { - arrow::parquet::FileReader reader(default_memory_pool(), std::move(file_reader)); - ASSERT_OK_NO_THROW(reader.ReadFlatTable(out)); - ASSERT_NE(nullptr, out->get()); + void Free(uint8_t* buffer, int64_t size) override { + pool_->Free(buffer, size); + bytes_allocated_ -= size; } - void ReadAndCheckSingleColumnTable(const std::shared_ptr& values) { - std::shared_ptr
out; - ReadTableFromFile(ReaderFromSink(), &out); - ASSERT_EQ(1, out->num_columns()); - ASSERT_EQ(values->length(), out->num_rows()); - - std::shared_ptr chunked_array = out->column(0)->data(); - ASSERT_EQ(1, chunked_array->num_chunks()); - ASSERT_TRUE(values->Equals(chunked_array->chunk(0))); - } + int64_t bytes_allocated() const override { return bytes_allocated_; } - template - void WriteFlatColumn(const std::shared_ptr& schema, - const std::shared_ptr& values) { - FileWriter writer(default_memory_pool(), MakeWriter(schema)); - ASSERT_OK_NO_THROW(writer.NewRowGroup(values->length())); - ASSERT_OK_NO_THROW(writer.WriteFlatColumnChunk(values.get())); - ASSERT_OK_NO_THROW(writer.Close()); - } - - std::shared_ptr sink_; + private: + MemoryPool* pool_; + int64_t bytes_allocated_; }; -// We habe separate tests for UInt32Type as this is currently the only type -// where a roundtrip does not yield the identical Array structure. -// There we write an UInt32 Array but receive an Int64 Array as result for -// Parquet version 1.0. +TEST(TestParquetAllocator, CustomPool) { + TrackingPool pool; -typedef ::testing::Types TestTypes; + ParquetAllocator allocator(&pool); -TYPED_TEST_CASE(TestParquetIO, TestTypes); + ASSERT_EQ(&pool, allocator.pool()); -TYPED_TEST(TestParquetIO, SingleColumnRequiredWrite) { - auto values = NonNullArray(SMALL_SIZE); + const int buffer_size = 10; - std::shared_ptr schema = this->MakeSchema(Repetition::REQUIRED); - this->WriteFlatColumn(schema, values); + uint8_t* buffer = nullptr; + ASSERT_NO_THROW(buffer = allocator.Malloc(buffer_size);); - this->ReadAndCheckSingleColumnFile(values.get()); -} - -TYPED_TEST(TestParquetIO, SingleColumnTableRequiredWrite) { - auto values = NonNullArray(SMALL_SIZE); - std::shared_ptr
table = MakeSimpleTable(values, false); - this->sink_ = std::make_shared(); - ASSERT_OK_NO_THROW(WriteFlatTable(table.get(), default_memory_pool(), this->sink_, - values->length(), default_writer_properties())); - - std::shared_ptr
out; - this->ReadTableFromFile(this->ReaderFromSink(), &out); - ASSERT_EQ(1, out->num_columns()); - ASSERT_EQ(100, out->num_rows()); - - std::shared_ptr chunked_array = out->column(0)->data(); - ASSERT_EQ(1, chunked_array->num_chunks()); - ASSERT_TRUE(values->Equals(chunked_array->chunk(0))); -} + ASSERT_EQ(buffer_size, pool.bytes_allocated()); -TYPED_TEST(TestParquetIO, SingleColumnOptionalReadWrite) { - // This also tests max_definition_level = 1 - auto values = NullableArray(SMALL_SIZE, 10); + // valgrind will complain if we write into nullptr + memset(buffer, 0, buffer_size); - std::shared_ptr schema = this->MakeSchema(Repetition::OPTIONAL); - this->WriteFlatColumn(schema, values); + allocator.Free(buffer, buffer_size); - this->ReadAndCheckSingleColumnFile(values.get()); + ASSERT_EQ(0, pool.bytes_allocated()); } -TYPED_TEST(TestParquetIO, SingleColumnTableOptionalReadWrite) { - // This also tests max_definition_level = 1 - std::shared_ptr values = NullableArray(SMALL_SIZE, 10); - std::shared_ptr
table = MakeSimpleTable(values, true); - this->sink_ = std::make_shared(); - ASSERT_OK_NO_THROW(WriteFlatTable(table.get(), default_memory_pool(), this->sink_, - values->length(), default_writer_properties())); +// ---------------------------------------------------------------------- +// Read source tests - this->ReadAndCheckSingleColumnTable(values); -} - -TYPED_TEST(TestParquetIO, SingleColumnRequiredChunkedWrite) { - auto values = NonNullArray(SMALL_SIZE); - int64_t chunk_size = values->length() / 4; +class BufferReader : public io::RandomAccessFile { + public: + BufferReader(const uint8_t* buffer, int buffer_size) + : buffer_(buffer), buffer_size_(buffer_size), position_(0) {} - std::shared_ptr schema = this->MakeSchema(Repetition::REQUIRED); - FileWriter writer(default_memory_pool(), this->MakeWriter(schema)); - for (int i = 0; i < 4; i++) { - ASSERT_OK_NO_THROW(writer.NewRowGroup(chunk_size)); - ASSERT_OK_NO_THROW( - writer.WriteFlatColumnChunk(values.get(), i * chunk_size, chunk_size)); + Status Close() override { + // no-op + return Status::OK(); } - ASSERT_OK_NO_THROW(writer.Close()); - - this->ReadAndCheckSingleColumnFile(values.get()); -} -TYPED_TEST(TestParquetIO, SingleColumnTableRequiredChunkedWrite) { - auto values = NonNullArray(LARGE_SIZE); - std::shared_ptr
table = MakeSimpleTable(values, false); - this->sink_ = std::make_shared(); - ASSERT_OK_NO_THROW(WriteFlatTable( - table.get(), default_memory_pool(), this->sink_, 512, default_writer_properties())); - - this->ReadAndCheckSingleColumnTable(values); -} - -TYPED_TEST(TestParquetIO, SingleColumnOptionalChunkedWrite) { - int64_t chunk_size = SMALL_SIZE / 4; - auto values = NullableArray(SMALL_SIZE, 10); - - std::shared_ptr schema = this->MakeSchema(Repetition::OPTIONAL); - FileWriter writer(default_memory_pool(), this->MakeWriter(schema)); - for (int i = 0; i < 4; i++) { - ASSERT_OK_NO_THROW(writer.NewRowGroup(chunk_size)); - ASSERT_OK_NO_THROW( - writer.WriteFlatColumnChunk(values.get(), i * chunk_size, chunk_size)); + Status Tell(int64_t* position) override { + *position = position_; + return Status::OK(); } - ASSERT_OK_NO_THROW(writer.Close()); - this->ReadAndCheckSingleColumnFile(values.get()); -} - -TYPED_TEST(TestParquetIO, SingleColumnTableOptionalChunkedWrite) { - // This also tests max_definition_level = 1 - auto values = NullableArray(LARGE_SIZE, 100); - std::shared_ptr
table = MakeSimpleTable(values, true); - this->sink_ = std::make_shared(); - ASSERT_OK_NO_THROW(WriteFlatTable( - table.get(), default_memory_pool(), this->sink_, 512, default_writer_properties())); - - this->ReadAndCheckSingleColumnTable(values); -} - -using TestUInt32ParquetIO = TestParquetIO; - -TEST_F(TestUInt32ParquetIO, Parquet_2_0_Compability) { - // This also tests max_definition_level = 1 - std::shared_ptr values = NullableArray(LARGE_SIZE, 100); - std::shared_ptr
table = MakeSimpleTable(values, true); - - // Parquet 2.0 roundtrip should yield an uint32_t column again - this->sink_ = std::make_shared(); - std::shared_ptr<::parquet::WriterProperties> properties = - ::parquet::WriterProperties::Builder() - .version(ParquetVersion::PARQUET_2_0) - ->build(); - ASSERT_OK_NO_THROW( - WriteFlatTable(table.get(), default_memory_pool(), this->sink_, 512, properties)); - this->ReadAndCheckSingleColumnTable(values); -} + Status ReadAt( + int64_t position, int64_t nbytes, int64_t* bytes_read, uint8_t* buffer) override { + RETURN_NOT_OK(Seek(position)); + return Read(nbytes, bytes_read, buffer); + } -TEST_F(TestUInt32ParquetIO, Parquet_1_0_Compability) { - // This also tests max_definition_level = 1 - std::shared_ptr values = NullableArray(LARGE_SIZE, 100); - std::shared_ptr
table = MakeSimpleTable(values, true); - - // Parquet 1.0 returns an int64_t column as there is no way to tell a Parquet 1.0 - // reader that a column is unsigned. - this->sink_ = std::make_shared(); - std::shared_ptr<::parquet::WriterProperties> properties = - ::parquet::WriterProperties::Builder() - .version(ParquetVersion::PARQUET_1_0) - ->build(); - ASSERT_OK_NO_THROW( - WriteFlatTable(table.get(), default_memory_pool(), this->sink_, 512, properties)); - - std::shared_ptr expected_values; - std::shared_ptr int64_data = - std::make_shared(default_memory_pool()); - { - ASSERT_OK(int64_data->Resize(sizeof(int64_t) * values->length())); - int64_t* int64_data_ptr = reinterpret_cast(int64_data->mutable_data()); - const uint32_t* uint32_data_ptr = - reinterpret_cast(values->data()->data()); - // std::copy might be faster but this is explicit on the casts) - for (int64_t i = 0; i < values->length(); i++) { - int64_data_ptr[i] = static_cast(uint32_data_ptr[i]); - } + Status Read(int64_t nbytes, int64_t* bytes_read, uint8_t* buffer) override { + memcpy(buffer, buffer_ + position_, nbytes); + *bytes_read = std::min(nbytes, buffer_size_ - position_); + position_ += *bytes_read; + return Status::OK(); } - ASSERT_OK(MakePrimitiveArray(std::make_shared(), values->length(), - int64_data, values->null_count(), values->null_bitmap(), &expected_values)); - this->ReadAndCheckSingleColumnTable(expected_values); -} -template -using ParquetCDataType = typename ParquetDataType::c_type; + Status GetSize(int64_t* size) override { + *size = buffer_size_; + return Status::OK(); + } -template -class TestPrimitiveParquetIO : public TestParquetIO { - public: - typedef typename TestType::c_type T; - - void MakeTestFile(std::vector& values, int num_chunks, - std::unique_ptr* file_reader) { - std::shared_ptr schema = this->MakeSchema(Repetition::REQUIRED); - std::unique_ptr file_writer = this->MakeWriter(schema); - size_t chunk_size = values.size() / num_chunks; - // Convert to Parquet's expected physical type - std::vector values_buffer( - sizeof(ParquetCDataType) * values.size()); - auto values_parquet = - reinterpret_cast*>(values_buffer.data()); - std::copy(values.cbegin(), values.cend(), values_parquet); - for (int i = 0; i < num_chunks; i++) { - auto row_group_writer = file_writer->AppendRowGroup(chunk_size); - auto column_writer = - static_cast*>(row_group_writer->NextColumn()); - ParquetCDataType* data = values_parquet + i * chunk_size; - column_writer->WriteBatch(chunk_size, nullptr, nullptr, data); - column_writer->Close(); - row_group_writer->Close(); + Status Seek(int64_t position) override { + if (position < 0 || position >= buffer_size_) { + return Status::IOError("position out of bounds"); } - file_writer->Close(); - *file_reader = this->ReaderFromSink(); + + position_ = position; + return Status::OK(); } - void CheckSingleColumnRequiredTableRead(int num_chunks) { - std::vector values(SMALL_SIZE, test_traits::value); - std::unique_ptr file_reader; - ASSERT_NO_THROW(MakeTestFile(values, num_chunks, &file_reader)); + private: + const uint8_t* buffer_; + int buffer_size_; + int64_t position_; +}; - std::shared_ptr
out; - this->ReadTableFromFile(std::move(file_reader), &out); - ASSERT_EQ(1, out->num_columns()); - ASSERT_EQ(SMALL_SIZE, out->num_rows()); +TEST(TestParquetReadSource, Basics) { + std::string data = "this is the data"; + auto data_buffer = reinterpret_cast(data.c_str()); - std::shared_ptr chunked_array = out->column(0)->data(); - ASSERT_EQ(1, chunked_array->num_chunks()); - ExpectArray(values.data(), chunked_array->chunk(0).get()); - } + ParquetAllocator allocator; + auto file = std::make_shared(data_buffer, data.size()); + auto source = std::make_shared(file, &allocator); - void CheckSingleColumnRequiredRead(int num_chunks) { - std::vector values(SMALL_SIZE, test_traits::value); - std::unique_ptr file_reader; - ASSERT_NO_THROW(MakeTestFile(values, num_chunks, &file_reader)); + ASSERT_EQ(0, source->Tell()); + ASSERT_NO_THROW(source->Seek(5)); + ASSERT_EQ(5, source->Tell()); + ASSERT_NO_THROW(source->Seek(0)); - std::shared_ptr out; - this->ReadSingleColumnFile(std::move(file_reader), &out); - - ExpectArray(values.data(), out.get()); - } -}; + // Seek out of bounds + ASSERT_THROW(source->Seek(100), ::parquet::ParquetException); -typedef ::testing::Types PrimitiveTestTypes; + uint8_t buffer[50]; -TYPED_TEST_CASE(TestPrimitiveParquetIO, PrimitiveTestTypes); + ASSERT_NO_THROW(source->Read(4, buffer)); + ASSERT_EQ(0, std::memcmp(buffer, "this", 4)); + ASSERT_EQ(4, source->Tell()); -TYPED_TEST(TestPrimitiveParquetIO, SingleColumnRequiredRead) { - this->CheckSingleColumnRequiredRead(1); -} + std::shared_ptr<::parquet::Buffer> pq_buffer; -TYPED_TEST(TestPrimitiveParquetIO, SingleColumnRequiredTableRead) { - this->CheckSingleColumnRequiredTableRead(1); -} + ASSERT_NO_THROW(pq_buffer = source->Read(7)); -TYPED_TEST(TestPrimitiveParquetIO, SingleColumnRequiredChunkedRead) { - this->CheckSingleColumnRequiredRead(4); -} + auto expected_buffer = std::make_shared<::parquet::Buffer>(data_buffer + 4, 7); -TYPED_TEST(TestPrimitiveParquetIO, SingleColumnRequiredChunkedTableRead) { - this->CheckSingleColumnRequiredTableRead(4); + ASSERT_TRUE(expected_buffer->Equals(*pq_buffer.get())); } } // namespace parquet - } // namespace arrow diff --git a/cpp/src/arrow/parquet/parquet-reader-writer-test.cc b/cpp/src/arrow/parquet/parquet-reader-writer-test.cc new file mode 100644 index 00000000000..bfc27d26d63 --- /dev/null +++ b/cpp/src/arrow/parquet/parquet-reader-writer-test.cc @@ -0,0 +1,489 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "gtest/gtest.h" + +#include "arrow/test-util.h" +#include "arrow/parquet/test-util.h" +#include "arrow/parquet/reader.h" +#include "arrow/parquet/writer.h" +#include "arrow/types/construct.h" +#include "arrow/types/primitive.h" +#include "arrow/types/string.h" +#include "arrow/util/memory-pool.h" +#include "arrow/util/status.h" + +#include "parquet/api/reader.h" +#include "parquet/api/writer.h" + +using ParquetBuffer = parquet::Buffer; +using parquet::BufferReader; +using parquet::default_writer_properties; +using parquet::InMemoryOutputStream; +using parquet::LogicalType; +using parquet::ParquetFileReader; +using parquet::ParquetFileWriter; +using parquet::RandomAccessSource; +using parquet::Repetition; +using parquet::SchemaDescriptor; +using parquet::ParquetVersion; +using ParquetType = parquet::Type; +using parquet::schema::GroupNode; +using parquet::schema::NodePtr; +using parquet::schema::PrimitiveNode; + +namespace arrow { + +namespace parquet { + +const int SMALL_SIZE = 100; +const int LARGE_SIZE = 10000; + +template +struct test_traits {}; + +template <> +struct test_traits { + static constexpr ParquetType::type parquet_enum = ParquetType::BOOLEAN; + static constexpr LogicalType::type logical_enum = LogicalType::NONE; + static uint8_t const value; +}; + +const uint8_t test_traits::value(1); + +template <> +struct test_traits { + static constexpr ParquetType::type parquet_enum = ParquetType::INT32; + static constexpr LogicalType::type logical_enum = LogicalType::UINT_8; + static uint8_t const value; +}; + +const uint8_t test_traits::value(64); + +template <> +struct test_traits { + static constexpr ParquetType::type parquet_enum = ParquetType::INT32; + static constexpr LogicalType::type logical_enum = LogicalType::INT_8; + static int8_t const value; +}; + +const int8_t test_traits::value(-64); + +template <> +struct test_traits { + static constexpr ParquetType::type parquet_enum = ParquetType::INT32; + static constexpr LogicalType::type logical_enum = LogicalType::UINT_16; + static uint16_t const value; +}; + +const uint16_t test_traits::value(1024); + +template <> +struct test_traits { + static constexpr ParquetType::type parquet_enum = ParquetType::INT32; + static constexpr LogicalType::type logical_enum = LogicalType::INT_16; + static int16_t const value; +}; + +const int16_t test_traits::value(-1024); + +template <> +struct test_traits { + static constexpr ParquetType::type parquet_enum = ParquetType::INT32; + static constexpr LogicalType::type logical_enum = LogicalType::UINT_32; + static uint32_t const value; +}; + +const uint32_t test_traits::value(1024); + +template <> +struct test_traits { + static constexpr ParquetType::type parquet_enum = ParquetType::INT32; + static constexpr LogicalType::type logical_enum = LogicalType::NONE; + static int32_t const value; +}; + +const int32_t test_traits::value(-1024); + +template <> +struct test_traits { + static constexpr ParquetType::type parquet_enum = ParquetType::INT64; + static constexpr LogicalType::type logical_enum = LogicalType::UINT_64; + static uint64_t const value; +}; + +const uint64_t test_traits::value(1024); + +template <> +struct test_traits { + static constexpr ParquetType::type parquet_enum = ParquetType::INT64; + static constexpr LogicalType::type logical_enum = LogicalType::NONE; + static int64_t const value; +}; + +const int64_t test_traits::value(-1024); + +template <> +struct test_traits { + static constexpr ParquetType::type parquet_enum = ParquetType::FLOAT; + static constexpr LogicalType::type logical_enum = LogicalType::NONE; + static float const value; +}; + +const float test_traits::value(2.1f); + +template <> +struct test_traits { + static constexpr ParquetType::type parquet_enum = ParquetType::DOUBLE; + static constexpr LogicalType::type logical_enum = LogicalType::NONE; + static double const value; +}; + +const double test_traits::value(4.2); + +template <> +struct test_traits { + static constexpr ParquetType::type parquet_enum = ParquetType::BYTE_ARRAY; + static constexpr LogicalType::type logical_enum = LogicalType::UTF8; + static std::string const value; +}; + +const std::string test_traits::value("Test"); + +template +using ParquetDataType = ::parquet::DataType::parquet_enum>; + +template +using ParquetWriter = ::parquet::TypedColumnWriter>; + +template +class TestParquetIO : public ::testing::Test { + public: + virtual void SetUp() {} + + std::shared_ptr MakeSchema(Repetition::type repetition) { + auto pnode = PrimitiveNode::Make("column1", repetition, + test_traits::parquet_enum, test_traits::logical_enum); + NodePtr node_ = + GroupNode::Make("schema", Repetition::REQUIRED, std::vector({pnode})); + return std::static_pointer_cast(node_); + } + + std::unique_ptr MakeWriter( + const std::shared_ptr& schema) { + sink_ = std::make_shared(); + return ParquetFileWriter::Open(sink_, schema); + } + + std::unique_ptr ReaderFromSink() { + std::shared_ptr buffer = sink_->GetBuffer(); + std::unique_ptr source(new BufferReader(buffer)); + return ParquetFileReader::Open(std::move(source)); + } + + void ReadSingleColumnFile( + std::unique_ptr file_reader, std::shared_ptr* out) { + arrow::parquet::FileReader reader(default_memory_pool(), std::move(file_reader)); + std::unique_ptr column_reader; + ASSERT_OK_NO_THROW(reader.GetFlatColumn(0, &column_reader)); + ASSERT_NE(nullptr, column_reader.get()); + + ASSERT_OK(column_reader->NextBatch(SMALL_SIZE, out)); + ASSERT_NE(nullptr, out->get()); + } + + void ReadAndCheckSingleColumnFile(Array* values) { + std::shared_ptr out; + ReadSingleColumnFile(ReaderFromSink(), &out); + ASSERT_TRUE(values->Equals(out)); + } + + void ReadTableFromFile( + std::unique_ptr file_reader, std::shared_ptr
* out) { + arrow::parquet::FileReader reader(default_memory_pool(), std::move(file_reader)); + ASSERT_OK_NO_THROW(reader.ReadFlatTable(out)); + ASSERT_NE(nullptr, out->get()); + } + + void ReadAndCheckSingleColumnTable(const std::shared_ptr& values) { + std::shared_ptr
out; + ReadTableFromFile(ReaderFromSink(), &out); + ASSERT_EQ(1, out->num_columns()); + ASSERT_EQ(values->length(), out->num_rows()); + + std::shared_ptr chunked_array = out->column(0)->data(); + ASSERT_EQ(1, chunked_array->num_chunks()); + ASSERT_TRUE(values->Equals(chunked_array->chunk(0))); + } + + template + void WriteFlatColumn(const std::shared_ptr& schema, + const std::shared_ptr& values) { + FileWriter writer(default_memory_pool(), MakeWriter(schema)); + ASSERT_OK_NO_THROW(writer.NewRowGroup(values->length())); + ASSERT_OK_NO_THROW(writer.WriteFlatColumnChunk(values.get())); + ASSERT_OK_NO_THROW(writer.Close()); + } + + std::shared_ptr sink_; +}; + +// We habe separate tests for UInt32Type as this is currently the only type +// where a roundtrip does not yield the identical Array structure. +// There we write an UInt32 Array but receive an Int64 Array as result for +// Parquet version 1.0. + +typedef ::testing::Types TestTypes; + +TYPED_TEST_CASE(TestParquetIO, TestTypes); + +TYPED_TEST(TestParquetIO, SingleColumnRequiredWrite) { + auto values = NonNullArray(SMALL_SIZE); + + std::shared_ptr schema = this->MakeSchema(Repetition::REQUIRED); + this->WriteFlatColumn(schema, values); + + this->ReadAndCheckSingleColumnFile(values.get()); +} + +TYPED_TEST(TestParquetIO, SingleColumnTableRequiredWrite) { + auto values = NonNullArray(SMALL_SIZE); + std::shared_ptr
table = MakeSimpleTable(values, false); + this->sink_ = std::make_shared(); + ASSERT_OK_NO_THROW(WriteFlatTable(table.get(), default_memory_pool(), this->sink_, + values->length(), default_writer_properties())); + + std::shared_ptr
out; + this->ReadTableFromFile(this->ReaderFromSink(), &out); + ASSERT_EQ(1, out->num_columns()); + ASSERT_EQ(100, out->num_rows()); + + std::shared_ptr chunked_array = out->column(0)->data(); + ASSERT_EQ(1, chunked_array->num_chunks()); + ASSERT_TRUE(values->Equals(chunked_array->chunk(0))); +} + +TYPED_TEST(TestParquetIO, SingleColumnOptionalReadWrite) { + // This also tests max_definition_level = 1 + auto values = NullableArray(SMALL_SIZE, 10); + + std::shared_ptr schema = this->MakeSchema(Repetition::OPTIONAL); + this->WriteFlatColumn(schema, values); + + this->ReadAndCheckSingleColumnFile(values.get()); +} + +TYPED_TEST(TestParquetIO, SingleColumnTableOptionalReadWrite) { + // This also tests max_definition_level = 1 + std::shared_ptr values = NullableArray(SMALL_SIZE, 10); + std::shared_ptr
table = MakeSimpleTable(values, true); + this->sink_ = std::make_shared(); + ASSERT_OK_NO_THROW(WriteFlatTable(table.get(), default_memory_pool(), this->sink_, + values->length(), default_writer_properties())); + + this->ReadAndCheckSingleColumnTable(values); +} + +TYPED_TEST(TestParquetIO, SingleColumnRequiredChunkedWrite) { + auto values = NonNullArray(SMALL_SIZE); + int64_t chunk_size = values->length() / 4; + + std::shared_ptr schema = this->MakeSchema(Repetition::REQUIRED); + FileWriter writer(default_memory_pool(), this->MakeWriter(schema)); + for (int i = 0; i < 4; i++) { + ASSERT_OK_NO_THROW(writer.NewRowGroup(chunk_size)); + ASSERT_OK_NO_THROW( + writer.WriteFlatColumnChunk(values.get(), i * chunk_size, chunk_size)); + } + ASSERT_OK_NO_THROW(writer.Close()); + + this->ReadAndCheckSingleColumnFile(values.get()); +} + +TYPED_TEST(TestParquetIO, SingleColumnTableRequiredChunkedWrite) { + auto values = NonNullArray(LARGE_SIZE); + std::shared_ptr
table = MakeSimpleTable(values, false); + this->sink_ = std::make_shared(); + ASSERT_OK_NO_THROW(WriteFlatTable( + table.get(), default_memory_pool(), this->sink_, 512, default_writer_properties())); + + this->ReadAndCheckSingleColumnTable(values); +} + +TYPED_TEST(TestParquetIO, SingleColumnOptionalChunkedWrite) { + int64_t chunk_size = SMALL_SIZE / 4; + auto values = NullableArray(SMALL_SIZE, 10); + + std::shared_ptr schema = this->MakeSchema(Repetition::OPTIONAL); + FileWriter writer(default_memory_pool(), this->MakeWriter(schema)); + for (int i = 0; i < 4; i++) { + ASSERT_OK_NO_THROW(writer.NewRowGroup(chunk_size)); + ASSERT_OK_NO_THROW( + writer.WriteFlatColumnChunk(values.get(), i * chunk_size, chunk_size)); + } + ASSERT_OK_NO_THROW(writer.Close()); + + this->ReadAndCheckSingleColumnFile(values.get()); +} + +TYPED_TEST(TestParquetIO, SingleColumnTableOptionalChunkedWrite) { + // This also tests max_definition_level = 1 + auto values = NullableArray(LARGE_SIZE, 100); + std::shared_ptr
table = MakeSimpleTable(values, true); + this->sink_ = std::make_shared(); + ASSERT_OK_NO_THROW(WriteFlatTable( + table.get(), default_memory_pool(), this->sink_, 512, default_writer_properties())); + + this->ReadAndCheckSingleColumnTable(values); +} + +using TestUInt32ParquetIO = TestParquetIO; + +TEST_F(TestUInt32ParquetIO, Parquet_2_0_Compability) { + // This also tests max_definition_level = 1 + std::shared_ptr values = NullableArray(LARGE_SIZE, 100); + std::shared_ptr
table = MakeSimpleTable(values, true); + + // Parquet 2.0 roundtrip should yield an uint32_t column again + this->sink_ = std::make_shared(); + std::shared_ptr<::parquet::WriterProperties> properties = + ::parquet::WriterProperties::Builder() + .version(ParquetVersion::PARQUET_2_0) + ->build(); + ASSERT_OK_NO_THROW( + WriteFlatTable(table.get(), default_memory_pool(), this->sink_, 512, properties)); + this->ReadAndCheckSingleColumnTable(values); +} + +TEST_F(TestUInt32ParquetIO, Parquet_1_0_Compability) { + // This also tests max_definition_level = 1 + std::shared_ptr values = NullableArray(LARGE_SIZE, 100); + std::shared_ptr
table = MakeSimpleTable(values, true); + + // Parquet 1.0 returns an int64_t column as there is no way to tell a Parquet 1.0 + // reader that a column is unsigned. + this->sink_ = std::make_shared(); + std::shared_ptr<::parquet::WriterProperties> properties = + ::parquet::WriterProperties::Builder() + .version(ParquetVersion::PARQUET_1_0) + ->build(); + ASSERT_OK_NO_THROW( + WriteFlatTable(table.get(), default_memory_pool(), this->sink_, 512, properties)); + + std::shared_ptr expected_values; + std::shared_ptr int64_data = + std::make_shared(default_memory_pool()); + { + ASSERT_OK(int64_data->Resize(sizeof(int64_t) * values->length())); + int64_t* int64_data_ptr = reinterpret_cast(int64_data->mutable_data()); + const uint32_t* uint32_data_ptr = + reinterpret_cast(values->data()->data()); + // std::copy might be faster but this is explicit on the casts) + for (int64_t i = 0; i < values->length(); i++) { + int64_data_ptr[i] = static_cast(uint32_data_ptr[i]); + } + } + ASSERT_OK(MakePrimitiveArray(std::make_shared(), values->length(), + int64_data, values->null_count(), values->null_bitmap(), &expected_values)); + this->ReadAndCheckSingleColumnTable(expected_values); +} + +template +using ParquetCDataType = typename ParquetDataType::c_type; + +template +class TestPrimitiveParquetIO : public TestParquetIO { + public: + typedef typename TestType::c_type T; + + void MakeTestFile(std::vector& values, int num_chunks, + std::unique_ptr* file_reader) { + std::shared_ptr schema = this->MakeSchema(Repetition::REQUIRED); + std::unique_ptr file_writer = this->MakeWriter(schema); + size_t chunk_size = values.size() / num_chunks; + // Convert to Parquet's expected physical type + std::vector values_buffer( + sizeof(ParquetCDataType) * values.size()); + auto values_parquet = + reinterpret_cast*>(values_buffer.data()); + std::copy(values.cbegin(), values.cend(), values_parquet); + for (int i = 0; i < num_chunks; i++) { + auto row_group_writer = file_writer->AppendRowGroup(chunk_size); + auto column_writer = + static_cast*>(row_group_writer->NextColumn()); + ParquetCDataType* data = values_parquet + i * chunk_size; + column_writer->WriteBatch(chunk_size, nullptr, nullptr, data); + column_writer->Close(); + row_group_writer->Close(); + } + file_writer->Close(); + *file_reader = this->ReaderFromSink(); + } + + void CheckSingleColumnRequiredTableRead(int num_chunks) { + std::vector values(SMALL_SIZE, test_traits::value); + std::unique_ptr file_reader; + ASSERT_NO_THROW(MakeTestFile(values, num_chunks, &file_reader)); + + std::shared_ptr
out; + this->ReadTableFromFile(std::move(file_reader), &out); + ASSERT_EQ(1, out->num_columns()); + ASSERT_EQ(SMALL_SIZE, out->num_rows()); + + std::shared_ptr chunked_array = out->column(0)->data(); + ASSERT_EQ(1, chunked_array->num_chunks()); + ExpectArray(values.data(), chunked_array->chunk(0).get()); + } + + void CheckSingleColumnRequiredRead(int num_chunks) { + std::vector values(SMALL_SIZE, test_traits::value); + std::unique_ptr file_reader; + ASSERT_NO_THROW(MakeTestFile(values, num_chunks, &file_reader)); + + std::shared_ptr out; + this->ReadSingleColumnFile(std::move(file_reader), &out); + + ExpectArray(values.data(), out.get()); + } +}; + +typedef ::testing::Types PrimitiveTestTypes; + +TYPED_TEST_CASE(TestPrimitiveParquetIO, PrimitiveTestTypes); + +TYPED_TEST(TestPrimitiveParquetIO, SingleColumnRequiredRead) { + this->CheckSingleColumnRequiredRead(1); +} + +TYPED_TEST(TestPrimitiveParquetIO, SingleColumnRequiredTableRead) { + this->CheckSingleColumnRequiredTableRead(1); +} + +TYPED_TEST(TestPrimitiveParquetIO, SingleColumnRequiredChunkedRead) { + this->CheckSingleColumnRequiredRead(4); +} + +TYPED_TEST(TestPrimitiveParquetIO, SingleColumnRequiredChunkedTableRead) { + this->CheckSingleColumnRequiredTableRead(4); +} + +} // namespace parquet + +} // namespace arrow diff --git a/cpp/src/arrow/parquet/utils.h b/cpp/src/arrow/parquet/utils.h index 409bcd9065c..bcc46be60e6 100644 --- a/cpp/src/arrow/parquet/utils.h +++ b/cpp/src/arrow/parquet/utils.h @@ -18,12 +18,12 @@ #ifndef ARROW_PARQUET_UTILS_H #define ARROW_PARQUET_UTILS_H -#include "arrow/util/status.h" +#include +#include "arrow/util/status.h" #include "parquet/exception.h" namespace arrow { - namespace parquet { #define PARQUET_CATCH_NOT_OK(s) \ @@ -36,8 +36,17 @@ namespace parquet { (s); \ } catch (const ::parquet::ParquetException& e) {} -} // namespace parquet +#define PARQUET_THROW_NOT_OK(s) \ + do { \ + ::arrow::Status _s = (s); \ + if (!_s.ok()) { \ + std::stringstream ss; \ + ss << "Arrow error: " << _s.ToString(); \ + throw ::parquet::ParquetException(ss.str()); \ + } \ + } while (0); +} // namespace parquet } // namespace arrow #endif // ARROW_PARQUET_UTILS_H diff --git a/python/pyarrow/includes/libarrow_io.pxd b/python/pyarrow/includes/libarrow_io.pxd index d874ba30912..d0fb8f9f000 100644 --- a/python/pyarrow/includes/libarrow_io.pxd +++ b/python/pyarrow/includes/libarrow_io.pxd @@ -51,17 +51,17 @@ cdef extern from "arrow/io/hdfs.h" namespace "arrow::io" nogil: cdef cppclass HdfsReadableFile(CHdfsFile): CStatus GetSize(int64_t* size) - CStatus Read(int32_t nbytes, int32_t* bytes_read, + CStatus Read(int64_t nbytes, int64_t* bytes_read, uint8_t* buffer) - CStatus ReadAt(int64_t position, int32_t nbytes, - int32_t* bytes_read, uint8_t* buffer) + CStatus ReadAt(int64_t position, int64_t nbytes, + int64_t* bytes_read, uint8_t* buffer) cdef cppclass HdfsWriteableFile(CHdfsFile): - CStatus Write(const uint8_t* buffer, int32_t nbytes) + CStatus Write(const uint8_t* buffer, int64_t nbytes) - CStatus Write(const uint8_t* buffer, int32_t nbytes, - int32_t* bytes_written) + CStatus Write(const uint8_t* buffer, int64_t nbytes, + int64_t* bytes_written) cdef cppclass CHdfsClient" arrow::io::HdfsClient": @staticmethod diff --git a/python/pyarrow/io.pyx b/python/pyarrow/io.pyx index 8b97671e453..071eea5ba6e 100644 --- a/python/pyarrow/io.pyx +++ b/python/pyarrow/io.pyx @@ -383,7 +383,7 @@ cdef class HdfsFile: Read indicated number of bytes from the file, up to EOF """ cdef: - int32_t bytes_read = 0 + int64_t bytes_read = 0 uint8_t* buf self._assert_readable() @@ -394,7 +394,7 @@ cdef class HdfsFile: if buf == NULL: raise MemoryError("Failed to allocate {0} bytes".format(nbytes)) - cdef int32_t total_bytes = 0 + cdef int64_t total_bytes = 0 cdef int rpc_chunksize = min(self.buffer_size, nbytes) @@ -423,7 +423,7 @@ cdef class HdfsFile: memory). First seeks to the beginning of the file. """ cdef: - int32_t bytes_read = 0 + int64_t bytes_read = 0 uint8_t* buf self._assert_readable() @@ -499,6 +499,6 @@ cdef class HdfsFile: data = tobytes(data) cdef const uint8_t* buf = cp.PyBytes_AS_STRING(data) - cdef int32_t bufsize = len(data) + cdef int64_t bufsize = len(data) with nogil: check_cstatus(self.wr_file.get().Write(buf, bufsize))