Skip to content

Commit

Permalink
PARQUET-1508: [C++] Read ByteArray data directly into arrow::BinaryBu…
Browse files Browse the repository at this point in the history
…ilder and BinaryDictionaryBuilder. Refactor encoders/decoders to use cleaner virtual interfaces

This patch ended up being a bit of a bloodbath, but it sorted out a number of technical debt problems.

Summary:

* Add type-specific virtual encoder interfaces such as `ByteArrayEncoder` and `ByteArrayDecoder` -- this enables adding new encoder or decoder methods without conflicting with the other types. This was very hard to do before because all types shared a common template such as
`PlainDecoder<ByteArrayType>`
* Encoder and decoder implementations are now all in an `encoding.cc` compilation unit, performance should be unchanged (I will check to make sure)
* Add BYTE_ARRAY decoder methods that write into `ChunkedBinaryBuilder` or `BinaryDictionaryBuilder`. This unblocks the long-desired direct-to-categorical Parquet reads
* Altered RecordReader to decode BYTE_ARRAY values directly into `ChunkedBinaryBuilder`.  More work will be required to expose DictionaryArray reads in a sane way

Along the way I've decided I want to eradicate all instances of `extern template class` from the codebase. It's insanely brittle with different visibility rules in MSVC, gcc, AND clang (no kidding, gcc and clang do different things). I'll refactor the others parts of the codebase that use them later

Author: Wes McKinney <[email protected]>
Author: Uwe L. Korn <[email protected]>

Closes #3492 from wesm/PARQUET-1508 and squashes the following commits:

df1bfc0 <Wes McKinney> lint
f3fadcb <Uwe L. Korn> Update cpp/src/parquet/arrow/record_reader.cc
4bafc55 <Wes McKinney> Fix public compile definition on windows
c4fcf74 <Wes McKinney> verbose makefile
06e2c23 <Wes McKinney> lint
daac8a6 <Wes McKinney> Delete a couple commented-out methods, add code comments about unimplemented DecodeArrowNonNull method for DictionaryBuilder
453ecbd <Wes McKinney> Refactor encoder and decoder classes to facilitate type-level extensibility
  • Loading branch information
wesm committed Jan 29, 2019
1 parent eedda2f commit 3d435e4
Show file tree
Hide file tree
Showing 42 changed files with 1,918 additions and 1,189 deletions.
1 change: 1 addition & 0 deletions ci/cpp-msvc-build-main.bat
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ mkdir cpp\build
pushd cpp\build

cmake -G "%GENERATOR%" %CMAKE_ARGS% ^
-DCMAKE_VERBOSE_MAKEFILE=OFF ^
-DCMAKE_INSTALL_PREFIX=%CONDA_PREFIX%\Library ^
-DARROW_BOOST_USE_SHARED=OFF ^
-DCMAKE_BUILD_TYPE=%CONFIGURATION% ^
Expand Down
2 changes: 1 addition & 1 deletion cpp/build-support/iwyu/mappings/arrow-misc.imp
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@
{ symbol: ["shared_ptr", private, "<memory>", public ] },
{ symbol: ["_Node_const_iterator", private, "<flatbuffers/flatbuffers.h>", public ] },
{ symbol: ["unordered_map<>::mapped_type", private, "<flatbuffers/flatbuffers.h>", public ] },
{ symbol: ["move", private, "<utility>", public ] },
{ symbol: ["std::move", private, "<utility>", public ] },
{ symbol: ["pair", private, "<utility>", public ] },
{ symbol: ["errno", private, "<cerrno>", public ] },
{ symbol: ["posix_memalign", private, "<cstdlib>", public ] }
Expand Down
2 changes: 2 additions & 0 deletions cpp/src/arrow/array/builder_binary.h
Original file line number Diff line number Diff line change
Expand Up @@ -281,6 +281,8 @@ class ARROW_EXPORT ChunkedBinaryBuilder {
return builder_->AppendNull();
}

Status Reserve(int64_t values) { return builder_->Reserve(values); }

virtual Status Finish(ArrayVector* out);

protected:
Expand Down
8 changes: 8 additions & 0 deletions cpp/src/parquet/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,7 @@ set(PARQUET_SRCS
column_reader.cc
column_scanner.cc
column_writer.cc
encoding.cc
file_reader.cc
file_writer.cc
metadata.cc
Expand Down Expand Up @@ -269,6 +270,13 @@ foreach(LIB_TARGET ${PARQUET_LIBRARIES})
endif()
endforeach()

# We always build the Parquet static libraries (see PARQUET-1420) so we add the
# PARQUET_STATIC public compile definition if we are building the unit tests OR
# if we are building the static library
if (WIN32 AND (NOT NO_TESTS OR ARROW_BUILD_STATIC))
target_compile_definitions(parquet_static PUBLIC PARQUET_STATIC)
endif()

add_subdirectory(api)
add_subdirectory(arrow)
add_subdirectory(util)
Expand Down
1 change: 1 addition & 0 deletions cpp/src/parquet/arrow/arrow-schema-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include "gtest/gtest.h"

#include "parquet/arrow/schema.h"
#include "parquet/schema.h"

#include "arrow/api.h"
#include "arrow/test-util.h"
Expand Down
11 changes: 8 additions & 3 deletions cpp/src/parquet/arrow/reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,18 @@
#include <climits>
#include <cstring>
#include <future>
#include <ostream>
#include <string>
#include <type_traits>
#include <utility>
#include <vector>

#include "arrow/api.h"
#include "arrow/array.h"
#include "arrow/buffer.h"
#include "arrow/builder.h"
#include "arrow/record_batch.h"
#include "arrow/status.h"
#include "arrow/table.h"
#include "arrow/type.h"
#include "arrow/type_traits.h"
#include "arrow/util/bit-util.h"
#include "arrow/util/int-util.h"
#include "arrow/util/logging.h"
Expand Down
94 changes: 44 additions & 50 deletions cpp/src/parquet/arrow/record_reader.cc
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// licensed to the Apache Software Foundation (ASF) under one
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
Expand All @@ -22,25 +22,20 @@
#include <cstring>
#include <iostream>
#include <memory>
#include <sstream>
#include <unordered_map>
#include <utility>

#include "arrow/array.h"
#include "arrow/buffer.h"
#include "arrow/builder.h"
#include "arrow/memory_pool.h"
#include "arrow/status.h"
#include "arrow/type.h"
#include "arrow/util/bit-util.h"
#include "arrow/util/logging.h"
#include "arrow/util/rle-encoding.h"

#include "parquet/column_page.h"
#include "parquet/column_reader.h"
#include "parquet/encoding-internal.h"
#include "parquet/encoding.h"
#include "parquet/exception.h"
#include "parquet/properties.h"
#include "parquet/schema.h"
#include "parquet/types.h"

Expand All @@ -51,9 +46,6 @@ namespace internal {

namespace BitUtil = ::arrow::BitUtil;

template <typename DType>
class TypedRecordReader;

// PLAIN_DICTIONARY is deprecated but used to be used as a dictionary index
// encoding.
static bool IsDictionaryIndexEncoding(const Encoding::type& e) {
Expand All @@ -80,9 +72,12 @@ class RecordReader::RecordReaderImpl {
null_count_(0),
levels_written_(0),
levels_position_(0),
levels_capacity_(0) {
levels_capacity_(0),
uses_values_(!(descr->physical_type() == Type::BYTE_ARRAY)) {
nullable_values_ = internal::HasSpacedValues(descr);
values_ = AllocateBuffer(pool);
if (uses_values_) {
values_ = AllocateBuffer(pool);
}
valid_bits_ = AllocateBuffer(pool);
def_levels_ = AllocateBuffer(pool);
rep_levels_ = AllocateBuffer(pool);
Expand Down Expand Up @@ -210,9 +205,13 @@ class RecordReader::RecordReaderImpl {
bool nullable_values() const { return nullable_values_; }

std::shared_ptr<ResizableBuffer> ReleaseValues() {
auto result = values_;
values_ = AllocateBuffer(pool_);
return result;
if (uses_values_) {
auto result = values_;
values_ = AllocateBuffer(pool_);
return result;
} else {
return nullptr;
}
}

std::shared_ptr<ResizableBuffer> ReleaseIsValid() {
Expand Down Expand Up @@ -324,7 +323,13 @@ class RecordReader::RecordReaderImpl {
}

int type_size = GetTypeByteSize(descr_->physical_type());
PARQUET_THROW_NOT_OK(values_->Resize(new_values_capacity * type_size, false));

// XXX(wesm): A hack to avoid memory allocation when reading directly
// into builder classes
if (uses_values_) {
PARQUET_THROW_NOT_OK(values_->Resize(new_values_capacity * type_size, false));
}

values_capacity_ = new_values_capacity;
}
if (nullable_values_) {
Expand Down Expand Up @@ -371,7 +376,9 @@ class RecordReader::RecordReaderImpl {
void ResetValues() {
if (values_written_ > 0) {
// Resize to 0, but do not shrink to fit
PARQUET_THROW_NOT_OK(values_->Resize(0, false));
if (uses_values_) {
PARQUET_THROW_NOT_OK(values_->Resize(0, false));
}
PARQUET_THROW_NOT_OK(valid_bits_->Resize(0, false));
values_written_ = 0;
values_capacity_ = 0;
Expand Down Expand Up @@ -427,6 +434,9 @@ class RecordReader::RecordReaderImpl {
int64_t levels_capacity_;

std::shared_ptr<::arrow::ResizableBuffer> values_;
// In the case of false, don't allocate the values buffer (when we directly read into
// builder classes).
bool uses_values_;

template <typename T>
T* ValuesHead() {
Expand Down Expand Up @@ -559,12 +569,12 @@ class TypedRecordReader : public RecordReader::RecordReaderImpl {
}

private:
typedef Decoder<DType> DecoderType;
using DecoderType = typename EncodingTraits<DType>::Decoder;

// Map of encoding type to the respective decoder object. For example, a
// column chunk's data pages may include both dictionary-encoded and
// plain-encoded data.
std::unordered_map<int, std::shared_ptr<DecoderType>> decoders_;
std::unordered_map<int, std::unique_ptr<DecoderType>> decoders_;

std::unique_ptr<BuilderType> builder_;

Expand Down Expand Up @@ -620,15 +630,9 @@ ::arrow::ArrayVector TypedRecordReader<FLBAType>::GetBuilderChunks() {

template <>
inline void TypedRecordReader<ByteArrayType>::ReadValuesDense(int64_t values_to_read) {
auto values = ValuesHead<ByteArray>();
int64_t num_decoded =
current_decoder_->Decode(values, static_cast<int>(values_to_read));
int64_t num_decoded = current_decoder_->DecodeArrowNonNull(
static_cast<int>(values_to_read), builder_.get());
DCHECK_EQ(num_decoded, values_to_read);

for (int64_t i = 0; i < num_decoded; i++) {
PARQUET_THROW_NOT_OK(
builder_->Append(values[i].ptr, static_cast<int32_t>(values[i].len)));
}
ResetValues();
}

Expand All @@ -648,23 +652,10 @@ inline void TypedRecordReader<FLBAType>::ReadValuesDense(int64_t values_to_read)
template <>
inline void TypedRecordReader<ByteArrayType>::ReadValuesSpaced(int64_t values_to_read,
int64_t null_count) {
uint8_t* valid_bits = valid_bits_->mutable_data();
const int64_t valid_bits_offset = values_written_;
auto values = ValuesHead<ByteArray>();

int64_t num_decoded = current_decoder_->DecodeSpaced(
values, static_cast<int>(values_to_read), static_cast<int>(null_count), valid_bits,
valid_bits_offset);
int64_t num_decoded = current_decoder_->DecodeArrow(
static_cast<int>(values_to_read), static_cast<int>(null_count),
valid_bits_->mutable_data(), values_written_, builder_.get());
DCHECK_EQ(num_decoded, values_to_read);

for (int64_t i = 0; i < num_decoded; i++) {
if (::arrow::BitUtil::GetBit(valid_bits, valid_bits_offset + i)) {
PARQUET_THROW_NOT_OK(
builder_->Append(values[i].ptr, static_cast<int32_t>(values[i].len)));
} else {
PARQUET_THROW_NOT_OK(builder_->AppendNull());
}
}
ResetValues();
}

Expand Down Expand Up @@ -705,23 +696,25 @@ inline void TypedRecordReader<DType>::ConfigureDictionary(const DictionaryPage*

if (page->encoding() == Encoding::PLAIN_DICTIONARY ||
page->encoding() == Encoding::PLAIN) {
PlainDecoder<DType> dictionary(descr_);
dictionary.SetData(page->num_values(), page->data(), page->size());
auto dictionary = MakeTypedDecoder<DType>(Encoding::PLAIN, descr_);
dictionary->SetData(page->num_values(), page->data(), page->size());

// The dictionary is fully decoded during DictionaryDecoder::Init, so the
// DictionaryPage buffer is no longer required after this step
//
// TODO(wesm): investigate whether this all-or-nothing decoding of the
// dictionary makes sense and whether performance can be improved

auto decoder = std::make_shared<DictionaryDecoder<DType>>(descr_, pool_);
decoder->SetDict(&dictionary);
decoders_[encoding] = decoder;
std::unique_ptr<DictDecoder<DType>> decoder = MakeDictDecoder<DType>(descr_, pool_);
decoder->SetDict(dictionary.get());
decoders_[encoding] =
std::unique_ptr<DecoderType>(dynamic_cast<DecoderType*>(decoder.release()));
} else {
ParquetException::NYI("only plain dictionary encoding has been implemented");
}

current_decoder_ = decoders_[encoding].get();
DCHECK(current_decoder_);
}

template <typename DType>
Expand Down Expand Up @@ -787,16 +780,17 @@ bool TypedRecordReader<DType>::ReadNewPage() {

auto it = decoders_.find(static_cast<int>(encoding));
if (it != decoders_.end()) {
DCHECK(it->second.get() != nullptr);
if (encoding == Encoding::RLE_DICTIONARY) {
DCHECK(current_decoder_->encoding() == Encoding::RLE_DICTIONARY);
}
current_decoder_ = it->second.get();
} else {
switch (encoding) {
case Encoding::PLAIN: {
std::shared_ptr<DecoderType> decoder(new PlainDecoder<DType>(descr_));
decoders_[static_cast<int>(encoding)] = decoder;
auto decoder = MakeTypedDecoder<DType>(Encoding::PLAIN, descr_);
current_decoder_ = decoder.get();
decoders_[static_cast<int>(encoding)] = std::move(decoder);
break;
}
case Encoding::RLE_DICTIONARY:
Expand Down
1 change: 0 additions & 1 deletion cpp/src/parquet/arrow/record_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@

#include "arrow/memory_pool.h"

#include "parquet/util/macros.h"
#include "parquet/util/memory.h"

namespace arrow {
Expand Down
14 changes: 10 additions & 4 deletions cpp/src/parquet/arrow/schema.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,20 @@

#include <string>
#include <unordered_set>
#include <utility>
#include <vector>

#include "parquet/api/schema.h"
#include "parquet/util/schema-util.h"

#include "arrow/api.h"
#include "arrow/array.h"
#include "arrow/status.h"
#include "arrow/type.h"
#include "arrow/util/logging.h"

#include "parquet/arrow/writer.h"
#include "parquet/exception.h"
#include "parquet/properties.h"
#include "parquet/types.h"
#include "parquet/util/schema-util.h"

using arrow::Field;
using arrow::Status;

Expand Down
5 changes: 2 additions & 3 deletions cpp/src/parquet/arrow/schema.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,14 @@
#include <memory>
#include <vector>

#include "arrow/api.h"

#include "parquet/arrow/writer.h"
#include "parquet/metadata.h"
#include "parquet/schema.h"
#include "parquet/util/visibility.h"

namespace arrow {

class Field;
class Schema;
class Status;

} // namespace arrow
Expand Down
16 changes: 14 additions & 2 deletions cpp/src/parquet/arrow/writer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,29 @@
#include "parquet/arrow/writer.h"

#include <algorithm>
#include <string>
#include <cstddef>
#include <type_traits>
#include <utility>
#include <vector>

#include "arrow/api.h"
#include "arrow/array.h"
#include "arrow/buffer.h"
#include "arrow/builder.h"
#include "arrow/compute/api.h"
#include "arrow/status.h"
#include "arrow/table.h"
#include "arrow/util/bit-util.h"
#include "arrow/util/checked_cast.h"
#include "arrow/visitor_inline.h"

#include "arrow/util/logging.h"

#include "parquet/arrow/schema.h"
#include "parquet/column_writer.h"
#include "parquet/exception.h"
#include "parquet/file_writer.h"
#include "parquet/schema.h"
#include "parquet/util/memory.h"

using arrow::Array;
using arrow::BinaryArray;
Expand Down
Loading

0 comments on commit 3d435e4

Please sign in to comment.