Skip to content
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
Show all changes
49 commits
Select commit Hold shift + click to select a range
570355d
Implement crc in reading and writing crc
mapleFU Oct 8, 2022
7888183
adding config for crc, which default are all false
mapleFU Oct 8, 2022
464b8b1
Trying to fixing compile: add `use_page_checksum_verification` in Pag…
mapleFU Oct 8, 2022
6ff0aaa
Merge branch 'master' into crc32-for-data-page
mapleFU Oct 21, 2022
4b2734a
[Update] vendor crc32 implements from
mapleFU Oct 21, 2022
07bcec0
[Update] crc32: remove HAVE_DECLARE_OPTIMIZE macro
mapleFU Oct 22, 2022
93a616a
Merge branch 'master' into crc32-for-data-page
mapleFU Oct 22, 2022
02d37ad
[Update] Change crc32.h visibility
mapleFU Oct 22, 2022
fea3733
[Update] fmt: using clang-format to format the library
mapleFU Oct 22, 2022
dfe16c6
[update] fmt the whole updates
mapleFU Oct 22, 2022
619debc
[Update] change swap to arrow::bit_util::ByteSwap
mapleFU Oct 22, 2022
c14610e
[fmt] fmt file_writer
mapleFU Oct 22, 2022
3a47a74
[add] add arrow license for crc32.cc
mapleFU Oct 22, 2022
a20241a
Merge branch 'master' into crc32-for-data-page
mapleFU Dec 4, 2022
e3415a3
[Update] Update page test
mapleFU Dec 4, 2022
b72509e
[Update] Finish page-level testing
mapleFU Dec 4, 2022
891ce6e
[Update] add test in properties
mapleFU Dec 4, 2022
523799f
[ADD] add checksum failed cases
mapleFU Dec 4, 2022
6024d36
[ADD] Add SerializedPageWriter test
mapleFU Dec 4, 2022
01c04da
[Fix] using actual_size instead of buffer.size()
mapleFU Dec 4, 2022
6d83725
[FIX] fix some previous argument passing problem
mapleFU Dec 4, 2022
8165730
[ADD] adding testing for checking crc
mapleFU Dec 4, 2022
6900946
remove iostream for debug, and clang-format the code
mapleFU Dec 4, 2022
c6cac1d
Merge branch 'master' into crc32-for-data-page
mapleFU Dec 5, 2022
809ae81
[Update] update submodule to latest
mapleFU Dec 5, 2022
b5ef742
Merge branch 'master' into crc32-for-data-page
mapleFU Dec 13, 2022
4c609cd
[Update] Resolve comment for code
mapleFU Dec 13, 2022
ae692fe
[Update] crc verification: check exception message
mapleFU Dec 13, 2022
a3da96e
[FIX] Fix compile caused by api change
mapleFU Dec 13, 2022
be32e3a
[Update] Update ColumnWriterTest for checksum
mapleFU Dec 13, 2022
7952923
Merge branch 'master' into crc32-for-data-page
mapleFU Dec 14, 2022
b26c86a
Merge branch 'master' into crc32-for-data-page
mapleFU Dec 14, 2022
eddf8ef
[Update] Trying to eliminate duplicate code in testing
mapleFU Dec 14, 2022
dd71ec6
Mention checksums in supported features
pitrou Dec 15, 2022
b8fecfc
Move test around, check exception message
pitrou Dec 15, 2022
9e3703e
Improve file reader tests
pitrou Dec 15, 2022
f14ad46
Merge branch 'master' into crc32-for-data-page
mapleFU Jan 4, 2023
e786f8b
Merge branch 'master' into crc32-for-data-page
mapleFU Jan 12, 2023
951df14
rename variable: change data_page on checksum to page
mapleFU Jan 12, 2023
f3da672
Merge branch 'master' into crc32-for-data-page
mapleFU Jan 13, 2023
9fd4ce0
Merge branch 'master' into crc32-for-data-page
mapleFU Jan 31, 2023
6d3b686
Merge branch 'master' into crc32-for-data-page
mapleFU Feb 2, 2023
8f57334
address comment
mapleFU Feb 2, 2023
72a0108
Merge branch 'master' into crc32-for-data-page
mapleFU Feb 5, 2023
300b680
add crc test
mapleFU Feb 5, 2023
ac9413a
add boost crc32 testing
mapleFU Feb 5, 2023
17e13bd
change to use random u32
mapleFU Feb 5, 2023
0fd3cdd
using vector to avoid stack overflow
mapleFU Feb 5, 2023
b014cb1
Merge branch 'master' into crc32-for-data-page
mapleFU Feb 7, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cpp/src/arrow/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,7 @@ set(ARROW_SRCS
util/compression.cc
util/counting_semaphore.cc
util/cpu_info.cc
util/crc32.cc
util/debug.cc
util/decimal.cc
util/delimiting.cc
Expand Down
961 changes: 961 additions & 0 deletions cpp/src/arrow/util/crc32.cc

Large diffs are not rendered by default.

30 changes: 30 additions & 0 deletions cpp/src/arrow/util/crc32.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#include <cstddef>
#include <cstdint>

#include "arrow/util/visibility.h"

namespace arrow {
namespace internal {

ARROW_EXPORT
uint32_t crc32(uint32_t prev, const void* data, size_t length);

} // namespace internal
} // namespace arrow
2 changes: 1 addition & 1 deletion cpp/src/parquet/arrow/path_internal.cc
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ struct PathWriteContext {

// Incorporates |range| into visited elements. If the |range| is contiguous
// with the last range, extend the last range, otherwise add |range| separately
// tot he list.
// to the list.
void RecordPostListVisit(const ElementRange& range) {
if (!visited_elements.empty() && range.start == visited_elements.back().end) {
visited_elements.back().end = range.end;
Expand Down
16 changes: 14 additions & 2 deletions cpp/src/parquet/column_reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
#include "arrow/util/bit_util.h"
#include "arrow/util/checked_cast.h"
#include "arrow/util/compression.h"
#include "arrow/util/crc32.h"
#include "arrow/util/int_util_overflow.h"
#include "arrow/util/logging.h"
#include "arrow/util/rle_encoding.h"
Expand Down Expand Up @@ -393,6 +394,19 @@ std::shared_ptr<Page> SerializedPageReader::NextPage() {
ParquetException::EofException(ss.str());
}

const PageType::type page_type = LoadEnumSafe(&current_page_header_.type);

if (properties_.use_page_checksum_verification() &&
page_type == PageType::DATA_PAGE && current_page_header_.__isset.crc) {
// verify crc
uint32_t checksum =
::arrow::internal::crc32(/* prev */ 0, page_buffer->data(), compressed_len);
if (static_cast<int32_t>(checksum) != current_page_header_.crc) {
throw ParquetException(
"could not verify page integrity, CRC checksum verification failed");
}
}

// Decrypt it if we need to
if (crypto_ctx_.data_decryptor != nullptr) {
PARQUET_THROW_NOT_OK(decryption_buffer_->Resize(
Expand All @@ -403,8 +417,6 @@ std::shared_ptr<Page> SerializedPageReader::NextPage() {
page_buffer = decryption_buffer_;
}

const PageType::type page_type = LoadEnumSafe(&current_page_header_.type);

if (page_type == PageType::DICTIONARY_PAGE) {
crypto_ctx_.start_decrypt_with_dictionary_page = false;
const format::DictionaryPageHeader& dict_header =
Expand Down
41 changes: 27 additions & 14 deletions cpp/src/parquet/column_writer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
#include "arrow/util/bitmap_ops.h"
#include "arrow/util/checked_cast.h"
#include "arrow/util/compression.h"
#include "arrow/util/crc32.h"
#include "arrow/util/endian.h"
#include "arrow/util/logging.h"
#include "arrow/util/rle_encoding.h"
Expand Down Expand Up @@ -247,6 +248,7 @@ class SerializedPageWriter : public PageWriter {
SerializedPageWriter(std::shared_ptr<ArrowOutputStream> sink, Compression::type codec,
int compression_level, ColumnChunkMetaDataBuilder* metadata,
int16_t row_group_ordinal, int16_t column_chunk_ordinal,
bool use_page_checksum_verification,
MemoryPool* pool = ::arrow::default_memory_pool(),
std::shared_ptr<Encryptor> meta_encryptor = nullptr,
std::shared_ptr<Encryptor> data_encryptor = nullptr)
Expand All @@ -261,6 +263,7 @@ class SerializedPageWriter : public PageWriter {
page_ordinal_(0),
row_group_ordinal_(row_group_ordinal),
column_ordinal_(column_chunk_ordinal),
use_page_checksum_verification_(use_page_checksum_verification),
meta_encryptor_(std::move(meta_encryptor)),
data_encryptor_(std::move(data_encryptor)),
encryption_buffer_(AllocateBuffer(pool, 0)) {
Expand Down Expand Up @@ -378,7 +381,12 @@ class SerializedPageWriter : public PageWriter {
format::PageHeader page_header;
page_header.__set_uncompressed_page_size(static_cast<int32_t>(uncompressed_size));
page_header.__set_compressed_page_size(static_cast<int32_t>(output_data_len));
// TODO(PARQUET-594) crc checksum

if (use_page_checksum_verification_ && page.type() == PageType::DATA_PAGE) {
uint32_t crc32 =
::arrow::internal::crc32(/* prev */ 0, output_data_buffer, output_data_len);
page_header.__set_crc(static_cast<int32_t>(crc32));
}

if (page.type() == PageType::DATA_PAGE) {
const DataPageV1& v1_page = checked_cast<const DataPageV1&>(page);
Expand Down Expand Up @@ -455,6 +463,8 @@ class SerializedPageWriter : public PageWriter {

int64_t total_uncompressed_size() { return total_uncompressed_size_; }

bool use_page_checksum_verification() { return use_page_checksum_verification_; }

private:
// To allow UpdateEncryption on Close
friend class BufferedPageWriter;
Expand Down Expand Up @@ -519,6 +529,7 @@ class SerializedPageWriter : public PageWriter {
int16_t page_ordinal_;
int16_t row_group_ordinal_;
int16_t column_ordinal_;
bool use_page_checksum_verification_;

std::unique_ptr<ThriftSerializer> thrift_serializer_;

Expand All @@ -543,15 +554,16 @@ class BufferedPageWriter : public PageWriter {
BufferedPageWriter(std::shared_ptr<ArrowOutputStream> sink, Compression::type codec,
int compression_level, ColumnChunkMetaDataBuilder* metadata,
int16_t row_group_ordinal, int16_t current_column_ordinal,
bool use_page_checksum_verification,
MemoryPool* pool = ::arrow::default_memory_pool(),
std::shared_ptr<Encryptor> meta_encryptor = nullptr,
std::shared_ptr<Encryptor> data_encryptor = nullptr)
: final_sink_(std::move(sink)), metadata_(metadata), has_dictionary_pages_(false) {
in_memory_sink_ = CreateOutputStream(pool);
pager_ = std::unique_ptr<SerializedPageWriter>(
new SerializedPageWriter(in_memory_sink_, codec, compression_level, metadata,
row_group_ordinal, current_column_ordinal, pool,
std::move(meta_encryptor), std::move(data_encryptor)));
pager_ = std::make_unique<SerializedPageWriter>(
in_memory_sink_, codec, compression_level, metadata, row_group_ordinal,
current_column_ordinal, use_page_checksum_verification, pool,
std::move(meta_encryptor), std::move(data_encryptor));
}

int64_t WriteDictionaryPage(const DictionaryPage& page) override {
Expand Down Expand Up @@ -604,18 +616,19 @@ std::unique_ptr<PageWriter> PageWriter::Open(
std::shared_ptr<ArrowOutputStream> sink, Compression::type codec,
int compression_level, ColumnChunkMetaDataBuilder* metadata,
int16_t row_group_ordinal, int16_t column_chunk_ordinal, MemoryPool* pool,
bool buffered_row_group, std::shared_ptr<Encryptor> meta_encryptor,
bool buffered_row_group, bool page_write_checksum_enabled,
std::shared_ptr<Encryptor> meta_encryptor,
std::shared_ptr<Encryptor> data_encryptor) {
if (buffered_row_group) {
return std::unique_ptr<PageWriter>(
new BufferedPageWriter(std::move(sink), codec, compression_level, metadata,
row_group_ordinal, column_chunk_ordinal, pool,
std::move(meta_encryptor), std::move(data_encryptor)));
return std::unique_ptr<PageWriter>(new BufferedPageWriter(
std::move(sink), codec, compression_level, metadata, row_group_ordinal,
column_chunk_ordinal, page_write_checksum_enabled, pool,
std::move(meta_encryptor), std::move(data_encryptor)));
} else {
return std::unique_ptr<PageWriter>(
new SerializedPageWriter(std::move(sink), codec, compression_level, metadata,
row_group_ordinal, column_chunk_ordinal, pool,
std::move(meta_encryptor), std::move(data_encryptor)));
return std::unique_ptr<PageWriter>(new SerializedPageWriter(
std::move(sink), codec, compression_level, metadata, row_group_ordinal,
column_chunk_ordinal, page_write_checksum_enabled, pool,
std::move(meta_encryptor), std::move(data_encryptor)));
}
}

Expand Down
2 changes: 1 addition & 1 deletion cpp/src/parquet/column_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ class PARQUET_EXPORT PageWriter {
int compression_level, ColumnChunkMetaDataBuilder* metadata,
int16_t row_group_ordinal = -1, int16_t column_chunk_ordinal = -1,
::arrow::MemoryPool* pool = ::arrow::default_memory_pool(),
bool buffered_row_group = false,
bool buffered_row_group = false, bool page_write_checksum_enabled = false,
std::shared_ptr<Encryptor> header_encryptor = NULLPTR,
std::shared_ptr<Encryptor> data_encryptor = NULLPTR);

Expand Down
6 changes: 4 additions & 2 deletions cpp/src/parquet/file_writer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,8 @@ class RowGroupSerializer : public RowGroupWriter::Contents {
std::unique_ptr<PageWriter> pager = PageWriter::Open(
sink_, properties_->compression(path), properties_->compression_level(path),
col_meta, row_group_ordinal_, static_cast<int16_t>(next_column_index_ - 1),
properties_->memory_pool(), false, meta_encryptor, data_encryptor);
properties_->memory_pool(), false, properties_->page_write_checksum_enabled(),
meta_encryptor, data_encryptor);
column_writers_[0] = ColumnWriter::Make(col_meta, std::move(pager), properties_);
return column_writers_[0].get();
}
Expand Down Expand Up @@ -244,7 +245,8 @@ class RowGroupSerializer : public RowGroupWriter::Contents {
sink_, properties_->compression(path), properties_->compression_level(path),
col_meta, static_cast<int16_t>(row_group_ordinal_),
static_cast<int16_t>(next_column_index_++), properties_->memory_pool(),
buffered_row_group_, meta_encryptor, data_encryptor);
buffered_row_group_, properties_->page_write_checksum_enabled(), meta_encryptor,
data_encryptor);
column_writers_.push_back(
ColumnWriter::Make(col_meta, std::move(pager), properties_));
}
Expand Down
26 changes: 22 additions & 4 deletions cpp/src/parquet/properties.h
Original file line number Diff line number Diff line change
Expand Up @@ -95,12 +95,18 @@ class PARQUET_EXPORT ReaderProperties {
return file_decryption_properties_;
}

bool use_page_checksum_verification() const { return use_page_checksum_verification_; }
void set_use_page_checksum_verification(bool check_crc) {
use_page_checksum_verification_ = check_crc;
}

private:
MemoryPool* pool_;
int64_t buffer_size_ = kDefaultBufferSize;
int32_t thrift_string_size_limit_ = kDefaultThriftStringSizeLimit;
int32_t thrift_container_size_limit_ = kDefaultThriftContainerSizeLimit;
bool buffered_stream_enabled_ = false;
bool use_page_checksum_verification_ = false;
std::shared_ptr<FileDecryptionProperties> file_decryption_properties_;
};

Expand Down Expand Up @@ -184,7 +190,8 @@ class PARQUET_EXPORT WriterProperties {
pagesize_(kDefaultDataPageSize),
version_(ParquetVersion::PARQUET_2_4),
data_page_version_(ParquetDataPageVersion::V1),
created_by_(DEFAULT_CREATED_BY) {}
created_by_(DEFAULT_CREATED_BY),
page_write_checksum_enabled_(false) {}
virtual ~Builder() {}

/// Specify the memory pool for the writer. Default default_memory_pool.
Expand Down Expand Up @@ -273,6 +280,11 @@ class PARQUET_EXPORT WriterProperties {
return this;
}

Builder* page_write_checksum_enabled(bool enable_checksum) {
page_write_checksum_enabled_ = enable_checksum;
return this;
}

/// \brief Define the encoding that is used when we don't utilise dictionary encoding.
//
/// This either apply if dictionary encoding is disabled or if we fallback
Expand Down Expand Up @@ -458,8 +470,9 @@ class PARQUET_EXPORT WriterProperties {

return std::shared_ptr<WriterProperties>(new WriterProperties(
pool_, dictionary_pagesize_limit_, write_batch_size_, max_row_group_length_,
pagesize_, version_, created_by_, std::move(file_encryption_properties_),
default_column_properties_, column_properties, data_page_version_));
pagesize_, version_, created_by_, page_write_checksum_enabled_,
std::move(file_encryption_properties_), default_column_properties_,
column_properties, data_page_version_));
}

private:
Expand All @@ -471,6 +484,7 @@ class PARQUET_EXPORT WriterProperties {
ParquetVersion::type version_;
ParquetDataPageVersion data_page_version_;
std::string created_by_;
bool page_write_checksum_enabled_;

std::shared_ptr<FileEncryptionProperties> file_encryption_properties_;

Expand Down Expand Up @@ -501,6 +515,8 @@ class PARQUET_EXPORT WriterProperties {

inline std::string created_by() const { return parquet_created_by_; }

inline bool page_write_checksum_enabled() const { return page_write_checksum_enabled_; }

inline Encoding::type dictionary_index_encoding() const {
if (parquet_version_ == ParquetVersion::PARQUET_1_0) {
return Encoding::PLAIN_DICTIONARY;
Expand Down Expand Up @@ -565,7 +581,7 @@ class PARQUET_EXPORT WriterProperties {
explicit WriterProperties(
MemoryPool* pool, int64_t dictionary_pagesize_limit, int64_t write_batch_size,
int64_t max_row_group_length, int64_t pagesize, ParquetVersion::type version,
const std::string& created_by,
const std::string& created_by, bool page_write_checksum_enabled,
std::shared_ptr<FileEncryptionProperties> file_encryption_properties,
const ColumnProperties& default_column_properties,
const std::unordered_map<std::string, ColumnProperties>& column_properties,
Expand All @@ -578,6 +594,7 @@ class PARQUET_EXPORT WriterProperties {
parquet_data_page_version_(data_page_version),
parquet_version_(version),
parquet_created_by_(created_by),
page_write_checksum_enabled_(page_write_checksum_enabled),
file_encryption_properties_(file_encryption_properties),
default_column_properties_(default_column_properties),
column_properties_(column_properties) {}
Expand All @@ -590,6 +607,7 @@ class PARQUET_EXPORT WriterProperties {
ParquetDataPageVersion parquet_data_page_version_;
ParquetVersion::type parquet_version_;
std::string parquet_created_by_;
bool page_write_checksum_enabled_;

std::shared_ptr<FileEncryptionProperties> file_encryption_properties_;

Expand Down