Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
49 commits
Select commit Hold shift + click to select a range
570355d
Implement crc in reading and writing crc
mapleFU Oct 8, 2022
7888183
adding config for crc, which default are all false
mapleFU Oct 8, 2022
464b8b1
Trying to fixing compile: add `use_page_checksum_verification` in Pag…
mapleFU Oct 8, 2022
6ff0aaa
Merge branch 'master' into crc32-for-data-page
mapleFU Oct 21, 2022
4b2734a
[Update] vendor crc32 implements from
mapleFU Oct 21, 2022
07bcec0
[Update] crc32: remove HAVE_DECLARE_OPTIMIZE macro
mapleFU Oct 22, 2022
93a616a
Merge branch 'master' into crc32-for-data-page
mapleFU Oct 22, 2022
02d37ad
[Update] Change crc32.h visibility
mapleFU Oct 22, 2022
fea3733
[Update] fmt: using clang-format to format the library
mapleFU Oct 22, 2022
dfe16c6
[update] fmt the whole updates
mapleFU Oct 22, 2022
619debc
[Update] change swap to arrow::bit_util::ByteSwap
mapleFU Oct 22, 2022
c14610e
[fmt] fmt file_writer
mapleFU Oct 22, 2022
3a47a74
[add] add arrow license for crc32.cc
mapleFU Oct 22, 2022
a20241a
Merge branch 'master' into crc32-for-data-page
mapleFU Dec 4, 2022
e3415a3
[Update] Update page test
mapleFU Dec 4, 2022
b72509e
[Update] Finish page-level testing
mapleFU Dec 4, 2022
891ce6e
[Update] add test in properties
mapleFU Dec 4, 2022
523799f
[ADD] add checksum failed cases
mapleFU Dec 4, 2022
6024d36
[ADD] Add SerializedPageWriter test
mapleFU Dec 4, 2022
01c04da
[Fix] using actual_size instead of buffer.size()
mapleFU Dec 4, 2022
6d83725
[FIX] fix some previous argument passing problem
mapleFU Dec 4, 2022
8165730
[ADD] adding testing for checking crc
mapleFU Dec 4, 2022
6900946
remove iostream for debug, and clang-format the code
mapleFU Dec 4, 2022
c6cac1d
Merge branch 'master' into crc32-for-data-page
mapleFU Dec 5, 2022
809ae81
[Update] update submodule to latest
mapleFU Dec 5, 2022
b5ef742
Merge branch 'master' into crc32-for-data-page
mapleFU Dec 13, 2022
4c609cd
[Update] Resolve comment for code
mapleFU Dec 13, 2022
ae692fe
[Update] crc verification: check exception message
mapleFU Dec 13, 2022
a3da96e
[FIX] Fix compile caused by api change
mapleFU Dec 13, 2022
be32e3a
[Update] Update ColumnWriterTest for checksum
mapleFU Dec 13, 2022
7952923
Merge branch 'master' into crc32-for-data-page
mapleFU Dec 14, 2022
b26c86a
Merge branch 'master' into crc32-for-data-page
mapleFU Dec 14, 2022
eddf8ef
[Update] Trying to eliminate duplicate code in testing
mapleFU Dec 14, 2022
dd71ec6
Mention checksums in supported features
pitrou Dec 15, 2022
b8fecfc
Move test around, check exception message
pitrou Dec 15, 2022
9e3703e
Improve file reader tests
pitrou Dec 15, 2022
f14ad46
Merge branch 'master' into crc32-for-data-page
mapleFU Jan 4, 2023
e786f8b
Merge branch 'master' into crc32-for-data-page
mapleFU Jan 12, 2023
951df14
rename variable: change data_page on checksum to page
mapleFU Jan 12, 2023
f3da672
Merge branch 'master' into crc32-for-data-page
mapleFU Jan 13, 2023
9fd4ce0
Merge branch 'master' into crc32-for-data-page
mapleFU Jan 31, 2023
6d3b686
Merge branch 'master' into crc32-for-data-page
mapleFU Feb 2, 2023
8f57334
address comment
mapleFU Feb 2, 2023
72a0108
Merge branch 'master' into crc32-for-data-page
mapleFU Feb 5, 2023
300b680
add crc test
mapleFU Feb 5, 2023
ac9413a
add boost crc32 testing
mapleFU Feb 5, 2023
17e13bd
change to use random u32
mapleFU Feb 5, 2023
0fd3cdd
using vector to avoid stack overflow
mapleFU Feb 5, 2023
b014cb1
Merge branch 'master' into crc32-for-data-page
mapleFU Feb 7, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cpp/src/arrow/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,7 @@ set(ARROW_SRCS
util/compression.cc
util/counting_semaphore.cc
util/cpu_info.cc
util/crc32.cc
util/debug.cc
util/decimal.cc
util/delimiting.cc
Expand Down
2 changes: 2 additions & 0 deletions cpp/src/arrow/util/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,8 @@ add_arrow_test(threading-utility-test
test_common.cc
thread_pool_test.cc)

add_arrow_test(crc32-test SOURCES crc32_test.cc)

add_arrow_benchmark(bit_block_counter_benchmark)
add_arrow_benchmark(bit_util_benchmark)
add_arrow_benchmark(bitmap_reader_benchmark)
Expand Down
966 changes: 966 additions & 0 deletions cpp/src/arrow/util/crc32.cc

Large diffs are not rendered by default.

36 changes: 36 additions & 0 deletions cpp/src/arrow/util/crc32.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#include <cstddef>
#include <cstdint>

#include "arrow/util/visibility.h"

namespace arrow {
namespace internal {

/// \brief Compute the CRC32 checksum of the given data
///
/// This function computes CRC32 with the polynomial 0x04C11DB7,
/// as used in zlib and others (note this is different from CRC32C).
/// To compute a running CRC32, pass the previous value in `prev`,
/// otherwise `prev` should be 0.
ARROW_EXPORT
uint32_t crc32(uint32_t prev, const void* data, size_t length);

} // namespace internal
} // namespace arrow
110 changes: 110 additions & 0 deletions cpp/src/arrow/util/crc32_test.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#include <array>
#include <cstdint>
#include <limits>
#include <memory>
#include <random>

#include <gtest/gtest.h>
#include <boost/crc.hpp>

#include "arrow/util/crc32.h"

namespace arrow {

TEST(Crc32Test, Basic) {
// use the string "123456789" in ASCII as test data.
constexpr uint32_t TEST_CRC32_RESULT = 0xCBF43926;
constexpr size_t TEST_CRC32_LENGTH = 9;
std::array<unsigned char, TEST_CRC32_LENGTH> std_data = {0x31, 0x32, 0x33, 0x34, 0x35,
0x36, 0x37, 0x38, 0x39};
size_t const std_data_len = sizeof(std_data) / sizeof(std_data[0]);
EXPECT_EQ(TEST_CRC32_RESULT, internal::crc32(0, &std_data[0], std_data_len));

for (size_t i = 1; i < std_data_len - 1; ++i) {
uint32_t crc1 = internal::crc32(0, &std_data[0], i);
EXPECT_EQ(TEST_CRC32_RESULT, internal::crc32(crc1, &std_data[i], std_data_len - i));
}
}

TEST(Crc32Test, matchesBoost32Type) {
const size_t BUFFER_SIZE = 512 * 1024 * sizeof(uint64_t);
std::vector<uint8_t> buffer;
buffer.resize(BUFFER_SIZE, 0);

// Populate a buffer with a deterministic pattern
// on which to compute checksums
std::random_device r;
std::seed_seq seed{r(), r(), r(), r(), r(), r(), r(), r()};
std::mt19937 gen(seed);
// N4659 29.6.1.1 [rand.req.genl]/1e requires one of short, int, long, long long,
// unsigned short, unsigned int, unsigned long, or unsigned long long
std::uniform_int_distribution<uint32_t> dist;

for (size_t i = 0; i < BUFFER_SIZE; ++i) {
buffer[i] = static_cast<uint8_t>(dist(gen));
}

struct TestCrcGroup {
size_t offset;
size_t length;
};

// NOLINTNEXTLINE
TestCrcGroup testCrcGroups[] = {
// Zero-byte input
{0, 0},
{8, 1},
{8, 2},
{8, 3},
{8, 4},
{8, 5},
{8, 6},
{8, 7},
{9, 1},
{10, 2},
{11, 3},
{12, 4},
{13, 5},
{14, 6},
{15, 7},
{8, 8},
{8, 9},
{8, 10},
{8, 11},
{8, 12},
{8, 13},
{8, 14},
{8, 15},
{8, 16},
{8, 17},
// Much larger inputs
{0, BUFFER_SIZE},
{1, BUFFER_SIZE / 2},
};

for (TestCrcGroup group : testCrcGroups) {
uint32_t crc = internal::crc32(0, &buffer[group.offset], group.length);
boost::crc_32_type boost_crc;
boost_crc.process_bytes(&buffer[group.offset], group.length);
EXPECT_EQ(boost_crc.checksum(), crc);
}
}

} // namespace arrow
2 changes: 1 addition & 1 deletion cpp/src/parquet/arrow/path_internal.cc
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ struct PathWriteContext {

// Incorporates |range| into visited elements. If the |range| is contiguous
// with the last range, extend the last range, otherwise add |range| separately
// tot he list.
// to the list.
void RecordPostListVisit(const ElementRange& range) {
if (!visited_elements.empty() && range.start == visited_elements.back().end) {
visited_elements.back().end = range.end;
Expand Down
17 changes: 15 additions & 2 deletions cpp/src/parquet/column_reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
#include "arrow/util/bit_util.h"
#include "arrow/util/checked_cast.h"
#include "arrow/util/compression.h"
#include "arrow/util/crc32.h"
#include "arrow/util/int_util_overflow.h"
#include "arrow/util/logging.h"
#include "arrow/util/rle_encoding.h"
Expand Down Expand Up @@ -471,6 +472,20 @@ std::shared_ptr<Page> SerializedPageReader::NextPage() {
ParquetException::EofException(ss.str());
}

const PageType::type page_type = LoadEnumSafe(&current_page_header_.type);

// TODO(PARQUET-594) crc checksum for DATA_PAGE_V2 and DICT_PAGE
if (properties_.page_checksum_verification() && page_type == PageType::DATA_PAGE &&
current_page_header_.__isset.crc) {
// verify crc
uint32_t checksum =
::arrow::internal::crc32(/* prev */ 0, page_buffer->data(), compressed_len);
if (static_cast<int32_t>(checksum) != current_page_header_.crc) {
throw ParquetException(
"could not verify page integrity, CRC checksum verification failed");
}
}

// Decrypt it if we need to
if (crypto_ctx_.data_decryptor != nullptr) {
PARQUET_THROW_NOT_OK(decryption_buffer_->Resize(
Expand All @@ -482,8 +497,6 @@ std::shared_ptr<Page> SerializedPageReader::NextPage() {
page_buffer = decryption_buffer_;
}

// Uncompress and construct the pages to return.
const PageType::type page_type = LoadEnumSafe(&current_page_header_.type);
if (page_type == PageType::DICTIONARY_PAGE) {
crypto_ctx_.start_decrypt_with_dictionary_page = false;
const format::DictionaryPageHeader& dict_header =
Expand Down
33 changes: 24 additions & 9 deletions cpp/src/parquet/column_writer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
#include "arrow/util/bitmap_ops.h"
#include "arrow/util/checked_cast.h"
#include "arrow/util/compression.h"
#include "arrow/util/crc32.h"
#include "arrow/util/endian.h"
#include "arrow/util/logging.h"
#include "arrow/util/rle_encoding.h"
Expand Down Expand Up @@ -248,6 +249,7 @@ class SerializedPageWriter : public PageWriter {
SerializedPageWriter(std::shared_ptr<ArrowOutputStream> sink, Compression::type codec,
int compression_level, ColumnChunkMetaDataBuilder* metadata,
int16_t row_group_ordinal, int16_t column_chunk_ordinal,
bool use_page_checksum_verification,
MemoryPool* pool = ::arrow::default_memory_pool(),
std::shared_ptr<Encryptor> meta_encryptor = nullptr,
std::shared_ptr<Encryptor> data_encryptor = nullptr)
Expand All @@ -262,6 +264,7 @@ class SerializedPageWriter : public PageWriter {
page_ordinal_(0),
row_group_ordinal_(row_group_ordinal),
column_ordinal_(column_chunk_ordinal),
page_checksum_verification_(use_page_checksum_verification),
meta_encryptor_(std::move(meta_encryptor)),
data_encryptor_(std::move(data_encryptor)),
encryption_buffer_(AllocateBuffer(pool, 0)) {
Expand Down Expand Up @@ -379,7 +382,13 @@ class SerializedPageWriter : public PageWriter {
format::PageHeader page_header;
page_header.__set_uncompressed_page_size(static_cast<int32_t>(uncompressed_size));
page_header.__set_compressed_page_size(static_cast<int32_t>(output_data_len));
// TODO(PARQUET-594) crc checksum

// TODO(PARQUET-594) crc checksum for DATA_PAGE_V2 and DICT_PAGE
if (page_checksum_verification_ && page.type() == PageType::DATA_PAGE) {
uint32_t crc32 =
::arrow::internal::crc32(/* prev */ 0, output_data_buffer, output_data_len);
page_header.__set_crc(static_cast<int32_t>(crc32));
}

if (page.type() == PageType::DATA_PAGE) {
const DataPageV1& v1_page = checked_cast<const DataPageV1&>(page);
Expand Down Expand Up @@ -425,7 +434,7 @@ class SerializedPageWriter : public PageWriter {
page_header.__set_data_page_header(data_page_header);
}

void SetDataPageV2Header(format::PageHeader& page_header, const DataPageV2 page) {
void SetDataPageV2Header(format::PageHeader& page_header, const DataPageV2& page) {
format::DataPageHeaderV2 data_page_header;
data_page_header.__set_num_values(page.num_values());
data_page_header.__set_num_nulls(page.num_nulls());
Expand Down Expand Up @@ -456,6 +465,8 @@ class SerializedPageWriter : public PageWriter {

int64_t total_uncompressed_size() { return total_uncompressed_size_; }

bool page_checksum_verification() { return page_checksum_verification_; }

private:
// To allow UpdateEncryption on Close
friend class BufferedPageWriter;
Expand Down Expand Up @@ -520,6 +531,7 @@ class SerializedPageWriter : public PageWriter {
int32_t page_ordinal_;
int16_t row_group_ordinal_;
int16_t column_ordinal_;
bool page_checksum_verification_;

std::unique_ptr<ThriftSerializer> thrift_serializer_;

Expand All @@ -544,15 +556,16 @@ class BufferedPageWriter : public PageWriter {
BufferedPageWriter(std::shared_ptr<ArrowOutputStream> sink, Compression::type codec,
int compression_level, ColumnChunkMetaDataBuilder* metadata,
int16_t row_group_ordinal, int16_t current_column_ordinal,
bool use_page_checksum_verification,
MemoryPool* pool = ::arrow::default_memory_pool(),
std::shared_ptr<Encryptor> meta_encryptor = nullptr,
std::shared_ptr<Encryptor> data_encryptor = nullptr)
: final_sink_(std::move(sink)), metadata_(metadata), has_dictionary_pages_(false) {
in_memory_sink_ = CreateOutputStream(pool);
pager_ = std::make_unique<SerializedPageWriter>(
in_memory_sink_, codec, compression_level, metadata, row_group_ordinal,
current_column_ordinal, pool, std::move(meta_encryptor),
std::move(data_encryptor));
current_column_ordinal, use_page_checksum_verification, pool,
std::move(meta_encryptor), std::move(data_encryptor));
}

int64_t WriteDictionaryPage(const DictionaryPage& page) override {
Expand Down Expand Up @@ -606,15 +619,17 @@ std::unique_ptr<PageWriter> PageWriter::Open(
int compression_level, ColumnChunkMetaDataBuilder* metadata,
int16_t row_group_ordinal, int16_t column_chunk_ordinal, MemoryPool* pool,
bool buffered_row_group, std::shared_ptr<Encryptor> meta_encryptor,
std::shared_ptr<Encryptor> data_encryptor) {
std::shared_ptr<Encryptor> data_encryptor, bool page_write_checksum_enabled) {
if (buffered_row_group) {
return std::make_unique<BufferedPageWriter>(
return std::unique_ptr<PageWriter>(new BufferedPageWriter(
std::move(sink), codec, compression_level, metadata, row_group_ordinal,
column_chunk_ordinal, pool, std::move(meta_encryptor), std::move(data_encryptor));
column_chunk_ordinal, page_write_checksum_enabled, pool,
std::move(meta_encryptor), std::move(data_encryptor)));
} else {
return std::make_unique<SerializedPageWriter>(
return std::unique_ptr<PageWriter>(new SerializedPageWriter(
std::move(sink), codec, compression_level, metadata, row_group_ordinal,
column_chunk_ordinal, pool, std::move(meta_encryptor), std::move(data_encryptor));
column_chunk_ordinal, page_write_checksum_enabled, pool,
std::move(meta_encryptor), std::move(data_encryptor)));
}
}

Expand Down
3 changes: 2 additions & 1 deletion cpp/src/parquet/column_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,8 @@ class PARQUET_EXPORT PageWriter {
::arrow::MemoryPool* pool = ::arrow::default_memory_pool(),
bool buffered_row_group = false,
std::shared_ptr<Encryptor> header_encryptor = NULLPTR,
std::shared_ptr<Encryptor> data_encryptor = NULLPTR);
std::shared_ptr<Encryptor> data_encryptor = NULLPTR,
bool page_write_checksum_enabled = false);

// The Column Writer decides if dictionary encoding is used if set and
// if the dictionary encoding has fallen back to default encoding on reaching dictionary
Expand Down
Loading