Skip to content

Commit 5a61849

Browse files
authored
GH-33115: [C++] Parquet Implement crc in reading and writing Page for DATA_PAGE (v1) (#14351)
This patch add crc in writing and reading DATA_PAGE. And crc for dictionary, DATA_PAGE_V2 will be added in comming patches. * [x] Implement crc in writing DATA_PAGE * [x] Implement crc in reading DATA_PAGE * [x] Adding config for write crc page and checking * [x] Testing DATA_PAGE with crc, the testing maybe borrowed from `parquet-mr` * [x] Using crc library in https://issues.apache.org/jira/browse/ARROW-17904 And there is some questions, I found that in thirdparty, arrow imports `crc32c`, which is extracted from leveldb's crc library. But seems that our standard uses crc32, which has a different magic number. So I vendor implementions mentioned in https://issues.apache.org/jira/browse/ARROW-17904 . The default config of `enable crc` in parquet-mr for writer is true, but here I use `false`, because set it true may slow down writer. * Closes: #33115 Authored-by: mwish <[email protected]> Signed-off-by: Will Jones <[email protected]>
1 parent 717d4fb commit 5a61849

19 files changed

+1566
-97
lines changed

cpp/src/arrow/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -206,6 +206,7 @@ set(ARROW_SRCS
206206
util/compression.cc
207207
util/counting_semaphore.cc
208208
util/cpu_info.cc
209+
util/crc32.cc
209210
util/debug.cc
210211
util/decimal.cc
211212
util/delimiting.cc

cpp/src/arrow/util/CMakeLists.txt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,8 @@ add_arrow_test(threading-utility-test
9090
test_common.cc
9191
thread_pool_test.cc)
9292

93+
add_arrow_test(crc32-test SOURCES crc32_test.cc)
94+
9395
add_arrow_benchmark(bit_block_counter_benchmark)
9496
add_arrow_benchmark(bit_util_benchmark)
9597
add_arrow_benchmark(bitmap_reader_benchmark)

cpp/src/arrow/util/crc32.cc

Lines changed: 966 additions & 0 deletions
Large diffs are not rendered by default.

cpp/src/arrow/util/crc32.h

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
#include <cstddef>
19+
#include <cstdint>
20+
21+
#include "arrow/util/visibility.h"
22+
23+
namespace arrow {
24+
namespace internal {
25+
26+
/// \brief Compute the CRC32 checksum of the given data
27+
///
28+
/// This function computes CRC32 with the polynomial 0x04C11DB7,
29+
/// as used in zlib and others (note this is different from CRC32C).
30+
/// To compute a running CRC32, pass the previous value in `prev`,
31+
/// otherwise `prev` should be 0.
32+
ARROW_EXPORT
33+
uint32_t crc32(uint32_t prev, const void* data, size_t length);
34+
35+
} // namespace internal
36+
} // namespace arrow

cpp/src/arrow/util/crc32_test.cc

Lines changed: 110 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,110 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
#include <array>
19+
#include <cstdint>
20+
#include <limits>
21+
#include <memory>
22+
#include <random>
23+
24+
#include <gtest/gtest.h>
25+
#include <boost/crc.hpp>
26+
27+
#include "arrow/util/crc32.h"
28+
29+
namespace arrow {
30+
31+
TEST(Crc32Test, Basic) {
32+
// use the string "123456789" in ASCII as test data.
33+
constexpr uint32_t TEST_CRC32_RESULT = 0xCBF43926;
34+
constexpr size_t TEST_CRC32_LENGTH = 9;
35+
std::array<unsigned char, TEST_CRC32_LENGTH> std_data = {0x31, 0x32, 0x33, 0x34, 0x35,
36+
0x36, 0x37, 0x38, 0x39};
37+
size_t const std_data_len = sizeof(std_data) / sizeof(std_data[0]);
38+
EXPECT_EQ(TEST_CRC32_RESULT, internal::crc32(0, &std_data[0], std_data_len));
39+
40+
for (size_t i = 1; i < std_data_len - 1; ++i) {
41+
uint32_t crc1 = internal::crc32(0, &std_data[0], i);
42+
EXPECT_EQ(TEST_CRC32_RESULT, internal::crc32(crc1, &std_data[i], std_data_len - i));
43+
}
44+
}
45+
46+
TEST(Crc32Test, matchesBoost32Type) {
47+
const size_t BUFFER_SIZE = 512 * 1024 * sizeof(uint64_t);
48+
std::vector<uint8_t> buffer;
49+
buffer.resize(BUFFER_SIZE, 0);
50+
51+
// Populate a buffer with a deterministic pattern
52+
// on which to compute checksums
53+
std::random_device r;
54+
std::seed_seq seed{r(), r(), r(), r(), r(), r(), r(), r()};
55+
std::mt19937 gen(seed);
56+
// N4659 29.6.1.1 [rand.req.genl]/1e requires one of short, int, long, long long,
57+
// unsigned short, unsigned int, unsigned long, or unsigned long long
58+
std::uniform_int_distribution<uint32_t> dist;
59+
60+
for (size_t i = 0; i < BUFFER_SIZE; ++i) {
61+
buffer[i] = static_cast<uint8_t>(dist(gen));
62+
}
63+
64+
struct TestCrcGroup {
65+
size_t offset;
66+
size_t length;
67+
};
68+
69+
// NOLINTNEXTLINE
70+
TestCrcGroup testCrcGroups[] = {
71+
// Zero-byte input
72+
{0, 0},
73+
{8, 1},
74+
{8, 2},
75+
{8, 3},
76+
{8, 4},
77+
{8, 5},
78+
{8, 6},
79+
{8, 7},
80+
{9, 1},
81+
{10, 2},
82+
{11, 3},
83+
{12, 4},
84+
{13, 5},
85+
{14, 6},
86+
{15, 7},
87+
{8, 8},
88+
{8, 9},
89+
{8, 10},
90+
{8, 11},
91+
{8, 12},
92+
{8, 13},
93+
{8, 14},
94+
{8, 15},
95+
{8, 16},
96+
{8, 17},
97+
// Much larger inputs
98+
{0, BUFFER_SIZE},
99+
{1, BUFFER_SIZE / 2},
100+
};
101+
102+
for (TestCrcGroup group : testCrcGroups) {
103+
uint32_t crc = internal::crc32(0, &buffer[group.offset], group.length);
104+
boost::crc_32_type boost_crc;
105+
boost_crc.process_bytes(&buffer[group.offset], group.length);
106+
EXPECT_EQ(boost_crc.checksum(), crc);
107+
}
108+
}
109+
110+
} // namespace arrow

cpp/src/parquet/arrow/path_internal.cc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -201,7 +201,7 @@ struct PathWriteContext {
201201

202202
// Incorporates |range| into visited elements. If the |range| is contiguous
203203
// with the last range, extend the last range, otherwise add |range| separately
204-
// tot he list.
204+
// to the list.
205205
void RecordPostListVisit(const ElementRange& range) {
206206
if (!visited_elements.empty() && range.start == visited_elements.back().end) {
207207
visited_elements.back().end = range.end;

cpp/src/parquet/column_reader.cc

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
#include "arrow/util/bit_util.h"
4040
#include "arrow/util/checked_cast.h"
4141
#include "arrow/util/compression.h"
42+
#include "arrow/util/crc32.h"
4243
#include "arrow/util/int_util_overflow.h"
4344
#include "arrow/util/logging.h"
4445
#include "arrow/util/rle_encoding.h"
@@ -471,6 +472,20 @@ std::shared_ptr<Page> SerializedPageReader::NextPage() {
471472
ParquetException::EofException(ss.str());
472473
}
473474

475+
const PageType::type page_type = LoadEnumSafe(&current_page_header_.type);
476+
477+
// TODO(PARQUET-594) crc checksum for DATA_PAGE_V2 and DICT_PAGE
478+
if (properties_.page_checksum_verification() && page_type == PageType::DATA_PAGE &&
479+
current_page_header_.__isset.crc) {
480+
// verify crc
481+
uint32_t checksum =
482+
::arrow::internal::crc32(/* prev */ 0, page_buffer->data(), compressed_len);
483+
if (static_cast<int32_t>(checksum) != current_page_header_.crc) {
484+
throw ParquetException(
485+
"could not verify page integrity, CRC checksum verification failed");
486+
}
487+
}
488+
474489
// Decrypt it if we need to
475490
if (crypto_ctx_.data_decryptor != nullptr) {
476491
PARQUET_THROW_NOT_OK(decryption_buffer_->Resize(
@@ -482,8 +497,6 @@ std::shared_ptr<Page> SerializedPageReader::NextPage() {
482497
page_buffer = decryption_buffer_;
483498
}
484499

485-
// Uncompress and construct the pages to return.
486-
const PageType::type page_type = LoadEnumSafe(&current_page_header_.type);
487500
if (page_type == PageType::DICTIONARY_PAGE) {
488501
crypto_ctx_.start_decrypt_with_dictionary_page = false;
489502
const format::DictionaryPageHeader& dict_header =

cpp/src/parquet/column_writer.cc

Lines changed: 24 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
#include "arrow/util/bitmap_ops.h"
3838
#include "arrow/util/checked_cast.h"
3939
#include "arrow/util/compression.h"
40+
#include "arrow/util/crc32.h"
4041
#include "arrow/util/endian.h"
4142
#include "arrow/util/logging.h"
4243
#include "arrow/util/rle_encoding.h"
@@ -248,6 +249,7 @@ class SerializedPageWriter : public PageWriter {
248249
SerializedPageWriter(std::shared_ptr<ArrowOutputStream> sink, Compression::type codec,
249250
int compression_level, ColumnChunkMetaDataBuilder* metadata,
250251
int16_t row_group_ordinal, int16_t column_chunk_ordinal,
252+
bool use_page_checksum_verification,
251253
MemoryPool* pool = ::arrow::default_memory_pool(),
252254
std::shared_ptr<Encryptor> meta_encryptor = nullptr,
253255
std::shared_ptr<Encryptor> data_encryptor = nullptr)
@@ -262,6 +264,7 @@ class SerializedPageWriter : public PageWriter {
262264
page_ordinal_(0),
263265
row_group_ordinal_(row_group_ordinal),
264266
column_ordinal_(column_chunk_ordinal),
267+
page_checksum_verification_(use_page_checksum_verification),
265268
meta_encryptor_(std::move(meta_encryptor)),
266269
data_encryptor_(std::move(data_encryptor)),
267270
encryption_buffer_(AllocateBuffer(pool, 0)) {
@@ -379,7 +382,13 @@ class SerializedPageWriter : public PageWriter {
379382
format::PageHeader page_header;
380383
page_header.__set_uncompressed_page_size(static_cast<int32_t>(uncompressed_size));
381384
page_header.__set_compressed_page_size(static_cast<int32_t>(output_data_len));
382-
// TODO(PARQUET-594) crc checksum
385+
386+
// TODO(PARQUET-594) crc checksum for DATA_PAGE_V2 and DICT_PAGE
387+
if (page_checksum_verification_ && page.type() == PageType::DATA_PAGE) {
388+
uint32_t crc32 =
389+
::arrow::internal::crc32(/* prev */ 0, output_data_buffer, output_data_len);
390+
page_header.__set_crc(static_cast<int32_t>(crc32));
391+
}
383392

384393
if (page.type() == PageType::DATA_PAGE) {
385394
const DataPageV1& v1_page = checked_cast<const DataPageV1&>(page);
@@ -425,7 +434,7 @@ class SerializedPageWriter : public PageWriter {
425434
page_header.__set_data_page_header(data_page_header);
426435
}
427436

428-
void SetDataPageV2Header(format::PageHeader& page_header, const DataPageV2 page) {
437+
void SetDataPageV2Header(format::PageHeader& page_header, const DataPageV2& page) {
429438
format::DataPageHeaderV2 data_page_header;
430439
data_page_header.__set_num_values(page.num_values());
431440
data_page_header.__set_num_nulls(page.num_nulls());
@@ -456,6 +465,8 @@ class SerializedPageWriter : public PageWriter {
456465

457466
int64_t total_uncompressed_size() { return total_uncompressed_size_; }
458467

468+
bool page_checksum_verification() { return page_checksum_verification_; }
469+
459470
private:
460471
// To allow UpdateEncryption on Close
461472
friend class BufferedPageWriter;
@@ -520,6 +531,7 @@ class SerializedPageWriter : public PageWriter {
520531
int32_t page_ordinal_;
521532
int16_t row_group_ordinal_;
522533
int16_t column_ordinal_;
534+
bool page_checksum_verification_;
523535

524536
std::unique_ptr<ThriftSerializer> thrift_serializer_;
525537

@@ -544,15 +556,16 @@ class BufferedPageWriter : public PageWriter {
544556
BufferedPageWriter(std::shared_ptr<ArrowOutputStream> sink, Compression::type codec,
545557
int compression_level, ColumnChunkMetaDataBuilder* metadata,
546558
int16_t row_group_ordinal, int16_t current_column_ordinal,
559+
bool use_page_checksum_verification,
547560
MemoryPool* pool = ::arrow::default_memory_pool(),
548561
std::shared_ptr<Encryptor> meta_encryptor = nullptr,
549562
std::shared_ptr<Encryptor> data_encryptor = nullptr)
550563
: final_sink_(std::move(sink)), metadata_(metadata), has_dictionary_pages_(false) {
551564
in_memory_sink_ = CreateOutputStream(pool);
552565
pager_ = std::make_unique<SerializedPageWriter>(
553566
in_memory_sink_, codec, compression_level, metadata, row_group_ordinal,
554-
current_column_ordinal, pool, std::move(meta_encryptor),
555-
std::move(data_encryptor));
567+
current_column_ordinal, use_page_checksum_verification, pool,
568+
std::move(meta_encryptor), std::move(data_encryptor));
556569
}
557570

558571
int64_t WriteDictionaryPage(const DictionaryPage& page) override {
@@ -606,15 +619,17 @@ std::unique_ptr<PageWriter> PageWriter::Open(
606619
int compression_level, ColumnChunkMetaDataBuilder* metadata,
607620
int16_t row_group_ordinal, int16_t column_chunk_ordinal, MemoryPool* pool,
608621
bool buffered_row_group, std::shared_ptr<Encryptor> meta_encryptor,
609-
std::shared_ptr<Encryptor> data_encryptor) {
622+
std::shared_ptr<Encryptor> data_encryptor, bool page_write_checksum_enabled) {
610623
if (buffered_row_group) {
611-
return std::make_unique<BufferedPageWriter>(
624+
return std::unique_ptr<PageWriter>(new BufferedPageWriter(
612625
std::move(sink), codec, compression_level, metadata, row_group_ordinal,
613-
column_chunk_ordinal, pool, std::move(meta_encryptor), std::move(data_encryptor));
626+
column_chunk_ordinal, page_write_checksum_enabled, pool,
627+
std::move(meta_encryptor), std::move(data_encryptor)));
614628
} else {
615-
return std::make_unique<SerializedPageWriter>(
629+
return std::unique_ptr<PageWriter>(new SerializedPageWriter(
616630
std::move(sink), codec, compression_level, metadata, row_group_ordinal,
617-
column_chunk_ordinal, pool, std::move(meta_encryptor), std::move(data_encryptor));
631+
column_chunk_ordinal, page_write_checksum_enabled, pool,
632+
std::move(meta_encryptor), std::move(data_encryptor)));
618633
}
619634
}
620635

cpp/src/parquet/column_writer.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,8 @@ class PARQUET_EXPORT PageWriter {
9090
::arrow::MemoryPool* pool = ::arrow::default_memory_pool(),
9191
bool buffered_row_group = false,
9292
std::shared_ptr<Encryptor> header_encryptor = NULLPTR,
93-
std::shared_ptr<Encryptor> data_encryptor = NULLPTR);
93+
std::shared_ptr<Encryptor> data_encryptor = NULLPTR,
94+
bool page_write_checksum_enabled = false);
9495

9596
// The Column Writer decides if dictionary encoding is used if set and
9697
// if the dictionary encoding has fallen back to default encoding on reaching dictionary

0 commit comments

Comments
 (0)