From a17afd60dba9fa6b2ff98cb55eb18f9b40cc1f55 Mon Sep 17 00:00:00 2001 From: Fatemah Panahi Date: Mon, 10 Oct 2022 16:46:45 +0000 Subject: [PATCH] Add stress test for RecordReader ReadRecords and SkipRecords. --- cpp/src/parquet/column_reader_test.cc | 181 ++++++++++++++++++++++++++ cpp/src/parquet/test_util.h | 20 ++- 2 files changed, 197 insertions(+), 4 deletions(-) diff --git a/cpp/src/parquet/column_reader_test.cc b/cpp/src/parquet/column_reader_test.cc index 369f6c07d05..1a939ae5e6d 100644 --- a/cpp/src/parquet/column_reader_test.cc +++ b/cpp/src/parquet/column_reader_test.cc @@ -1073,5 +1073,186 @@ TEST(RecordReaderByteArrayTest, SkipByteArray) { } } +// Test random combination of ReadRecords and SkipRecords. +class RecordReaderStressTest + : public ::testing::TestWithParam {}; + +TEST_P(RecordReaderStressTest, StressTest) { + internal::LevelInfo level_info; + // Define these boolean variables for improving readability below. + bool repeated = false, required = false; + if (GetParam() == Repetition::REQUIRED) { + level_info.def_level = 0; + level_info.rep_level = 0; + required = true; + } else if (GetParam() == Repetition::OPTIONAL) { + level_info.def_level = 1; + level_info.rep_level = 0; + } else { + level_info.def_level = 1; + level_info.rep_level = 1; + repeated = true; + } + + NodePtr type = schema::Int32("b", GetParam()); + const ColumnDescriptor descr(type, level_info.def_level, + level_info.rep_level); + + std::default_random_engine gen(/*seed=*/time(0)); + // Generate random number of pages with random number of values per page. + std::uniform_int_distribution d(0, 2000); + const int num_pages = d(gen); + const int levels_per_page = d(gen); + std::vector values; + std::vector def_levels; + std::vector rep_levels; + std::vector data_buffer; + std::vector> pages; + // Uses time(0) as seed so it would run a different test every time it is + // run. + MakePages(&descr, num_pages, levels_per_page, def_levels, + rep_levels, values, data_buffer, pages, Encoding::PLAIN, + /*seed=*/time(0)); + std::unique_ptr pager; + pager.reset(new test::MockPageReader(pages)); + + // Set up the RecordReader. + std::shared_ptr record_reader = + internal::RecordReader::Make(&descr, level_info); + record_reader->SetPageReader(std::move(pager)); + + // Figure out how many total records. + int total_records = 0; + if (repeated) { + for (int16_t rep : rep_levels) { + if (rep == 0) { + ++total_records; + } + } + } else { + total_records = def_levels.size(); + } + + // Generate a sequence of reads and skips. + int records_left = total_records; + // The first element of the pair is 1 if SkipRecords and 0 if ReadRecords. + // The second element indicates the number of records for reading or + // skipping. + std::vector> sequence; + while (records_left > 0) { + std::uniform_int_distribution d(0, records_left); + // Generate a number to decide if this is a skip or read. + bool is_skip = d(gen) < records_left / 2; + int num_records = d(gen); + + sequence.emplace_back(is_skip, num_records); + records_left -= num_records; + } + + // Prepare the expected result and do the SkipRecords and ReadRecords. + std::vector expected_values; + std::vector expected_def_levels; + std::vector expected_rep_levels; + std::set null_indexes; + size_t levels_index = 0; + size_t values_index = 0; + bool inside_repeated_field = false; + + for (const auto& [is_skip, num_records] : sequence) { + int read_records = 0; + while (read_records < num_records || inside_repeated_field) { + if (!repeated || (repeated && rep_levels[levels_index] == 0)) { + ++read_records; + } + + bool has_value = required || (!required && def_levels[levels_index] == + level_info.def_level); + + // If we are not skipping, we need to update the expected values and + // rep/defs. If we are skipping, we just keep going. + if (!is_skip) { + if (!required) { + expected_def_levels.push_back(def_levels[levels_index]); + if (!has_value) { + null_indexes.insert(expected_values.size()); + expected_values.push_back(-1); + } + } + if (repeated) { + expected_rep_levels.push_back(rep_levels[levels_index]); + } + if (has_value) { + expected_values.push_back(values[values_index]); + } + } + + if (has_value) { + ++values_index; + } + + // If we are in the middle of a repeated field, we should keep going + // until we consume it all. + if (repeated && levels_index + 1 < rep_levels.size() && + rep_levels[levels_index + 1] == 1) { + inside_repeated_field = true; + } else { + inside_repeated_field = false; + } + + ++levels_index; + } + + // Perform the actual read/skip. + if (is_skip) { + record_reader->SkipRecords(num_records); + } else { + record_reader->ReadRecords(num_records); + } + } + + if (required) { + ASSERT_EQ(total_records, record_reader->values_written()); + } else { + // We should have consumed all the column chunk. + ASSERT_EQ(record_reader->levels_position(), + record_reader->levels_written()); + // values_written() includes null count. + ASSERT_EQ(record_reader->levels_position(), + record_reader->values_written()); + } + + const auto read_values = + reinterpret_cast(record_reader->values()); + if (required) { + ASSERT_EQ(record_reader->null_count(), 0); + } + std::vector read_vals(read_values, + read_values + record_reader->values_written()); + for (size_t i = 0; i < expected_values.size(); ++i) { + if (null_indexes.find(i) == null_indexes.end()) { + ASSERT_EQ(read_vals[i], expected_values[i]); + } + } + + if (!required) { + std::vector read_def_levels( + record_reader->def_levels(), + record_reader->def_levels() + record_reader->levels_written()); + ASSERT_TRUE(vector_equal(read_def_levels, expected_def_levels)); + } + + if (repeated) { + std::vector read_rep_levels( + record_reader->rep_levels(), + record_reader->rep_levels() + record_reader->levels_written()); + ASSERT_TRUE(vector_equal(read_rep_levels, expected_rep_levels)); + } +} + +INSTANTIATE_TEST_SUITE_P(Repetition_type, RecordReaderStressTest, + ::testing::Values(Repetition::REQUIRED, + Repetition::OPTIONAL, + Repetition::REPEATED)); + } // namespace test } // namespace parquet diff --git a/cpp/src/parquet/test_util.h b/cpp/src/parquet/test_util.h index 400ed643125..2617e72ac38 100644 --- a/cpp/src/parquet/test_util.h +++ b/cpp/src/parquet/test_util.h @@ -521,10 +521,10 @@ static inline int MakePages(const ColumnDescriptor* d, int num_pages, int levels std::vector& values, std::vector& buffer, std::vector>& pages, - Encoding::type encoding = Encoding::PLAIN) { + Encoding::type encoding = Encoding::PLAIN, + uint32_t seed = 0) { int num_levels = levels_per_page * num_pages; int num_values = 0; - uint32_t seed = 0; int16_t zero = 0; int16_t max_def_level = d->max_definition_level(); int16_t max_rep_level = d->max_repetition_level(); @@ -546,10 +546,22 @@ static inline int MakePages(const ColumnDescriptor* d, int num_pages, int levels } else { num_values = num_levels; } - // Create repetition levels + // Create repitition levels if (max_rep_level > 0) { rep_levels.resize(num_levels); - random_numbers(num_levels, seed, zero, max_rep_level, rep_levels.data()); + // Using a different seed so that def_levels and rep_levels are different. + random_numbers(num_levels, seed + 789, zero, max_rep_level, rep_levels.data()); + // The generated levels are random. Force the very first page to start with a new + // record. + rep_levels[0] = 0; + // For a null value, rep_levels and def_levels are both 0. + // If we have a repeated value right after this, it needs to start with + // rep_level = 0 to indicate a new record. + for (int i = 0; i < num_levels - 1; ++i) { + if (rep_levels[i] == 0 && def_levels[i] == 0) { + rep_levels[i + 1] = 0; + } + } } // Create values values.resize(num_values);