Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
181 changes: 181 additions & 0 deletions cpp/src/parquet/column_reader_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1073,5 +1073,186 @@ TEST(RecordReaderByteArrayTest, SkipByteArray) {
}
}

// Test random combination of ReadRecords and SkipRecords.
class RecordReaderStressTest
: public ::testing::TestWithParam<Repetition::type> {};

TEST_P(RecordReaderStressTest, StressTest) {
internal::LevelInfo level_info;
// Define these boolean variables for improving readability below.
bool repeated = false, required = false;
if (GetParam() == Repetition::REQUIRED) {
level_info.def_level = 0;
level_info.rep_level = 0;
required = true;
} else if (GetParam() == Repetition::OPTIONAL) {
level_info.def_level = 1;
level_info.rep_level = 0;
} else {
level_info.def_level = 1;
level_info.rep_level = 1;
repeated = true;
}

NodePtr type = schema::Int32("b", GetParam());
const ColumnDescriptor descr(type, level_info.def_level,
level_info.rep_level);

std::default_random_engine gen(/*seed=*/time(0));
// Generate random number of pages with random number of values per page.
std::uniform_int_distribution<int> d(0, 2000);
const int num_pages = d(gen);
const int levels_per_page = d(gen);
std::vector<int32_t> values;
std::vector<int16_t> def_levels;
std::vector<int16_t> rep_levels;
std::vector<uint8_t> data_buffer;
std::vector<std::shared_ptr<Page>> pages;
// Uses time(0) as seed so it would run a different test every time it is
// run.
MakePages<Int32Type>(&descr, num_pages, levels_per_page, def_levels,
rep_levels, values, data_buffer, pages, Encoding::PLAIN,
/*seed=*/time(0));
std::unique_ptr<PageReader> pager;
pager.reset(new test::MockPageReader(pages));

// Set up the RecordReader.
std::shared_ptr<internal::RecordReader> record_reader =
internal::RecordReader::Make(&descr, level_info);
record_reader->SetPageReader(std::move(pager));

// Figure out how many total records.
int total_records = 0;
if (repeated) {
for (int16_t rep : rep_levels) {
if (rep == 0) {
++total_records;
}
}
} else {
total_records = def_levels.size();
}

// Generate a sequence of reads and skips.
int records_left = total_records;
// The first element of the pair is 1 if SkipRecords and 0 if ReadRecords.
// The second element indicates the number of records for reading or
// skipping.
std::vector<std::pair<bool, int>> sequence;
while (records_left > 0) {
std::uniform_int_distribution<int> d(0, records_left);
// Generate a number to decide if this is a skip or read.
bool is_skip = d(gen) < records_left / 2;
int num_records = d(gen);

sequence.emplace_back(is_skip, num_records);
records_left -= num_records;
}

// Prepare the expected result and do the SkipRecords and ReadRecords.
std::vector<int32_t> expected_values;
std::vector<int16_t> expected_def_levels;
std::vector<int16_t> expected_rep_levels;
std::set<int16_t> null_indexes;
size_t levels_index = 0;
size_t values_index = 0;
bool inside_repeated_field = false;

for (const auto& [is_skip, num_records] : sequence) {
int read_records = 0;
while (read_records < num_records || inside_repeated_field) {
if (!repeated || (repeated && rep_levels[levels_index] == 0)) {
++read_records;
}

bool has_value = required || (!required && def_levels[levels_index] ==
level_info.def_level);

// If we are not skipping, we need to update the expected values and
// rep/defs. If we are skipping, we just keep going.
if (!is_skip) {
if (!required) {
expected_def_levels.push_back(def_levels[levels_index]);
if (!has_value) {
null_indexes.insert(expected_values.size());
expected_values.push_back(-1);
}
}
if (repeated) {
expected_rep_levels.push_back(rep_levels[levels_index]);
}
if (has_value) {
expected_values.push_back(values[values_index]);
}
}

if (has_value) {
++values_index;
}

// If we are in the middle of a repeated field, we should keep going
// until we consume it all.
if (repeated && levels_index + 1 < rep_levels.size() &&
rep_levels[levels_index + 1] == 1) {
inside_repeated_field = true;
} else {
inside_repeated_field = false;
}

++levels_index;
}

// Perform the actual read/skip.
if (is_skip) {
record_reader->SkipRecords(num_records);
} else {
record_reader->ReadRecords(num_records);
}
}

if (required) {
ASSERT_EQ(total_records, record_reader->values_written());
} else {
// We should have consumed all the column chunk.
ASSERT_EQ(record_reader->levels_position(),
record_reader->levels_written());
// values_written() includes null count.
ASSERT_EQ(record_reader->levels_position(),
record_reader->values_written());
}

const auto read_values =
reinterpret_cast<const int32_t*>(record_reader->values());
if (required) {
ASSERT_EQ(record_reader->null_count(), 0);
}
std::vector<int32_t> read_vals(read_values,
read_values + record_reader->values_written());
for (size_t i = 0; i < expected_values.size(); ++i) {
if (null_indexes.find(i) == null_indexes.end()) {
ASSERT_EQ(read_vals[i], expected_values[i]);
}
}

if (!required) {
std::vector<int16_t> read_def_levels(
record_reader->def_levels(),
record_reader->def_levels() + record_reader->levels_written());
ASSERT_TRUE(vector_equal(read_def_levels, expected_def_levels));
}

if (repeated) {
std::vector<int16_t> read_rep_levels(
record_reader->rep_levels(),
record_reader->rep_levels() + record_reader->levels_written());
ASSERT_TRUE(vector_equal(read_rep_levels, expected_rep_levels));
}
}

INSTANTIATE_TEST_SUITE_P(Repetition_type, RecordReaderStressTest,
::testing::Values(Repetition::REQUIRED,
Repetition::OPTIONAL,
Repetition::REPEATED));

} // namespace test
} // namespace parquet
20 changes: 16 additions & 4 deletions cpp/src/parquet/test_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -521,10 +521,10 @@ static inline int MakePages(const ColumnDescriptor* d, int num_pages, int levels
std::vector<typename Type::c_type>& values,
std::vector<uint8_t>& buffer,
std::vector<std::shared_ptr<Page>>& pages,
Encoding::type encoding = Encoding::PLAIN) {
Encoding::type encoding = Encoding::PLAIN,
uint32_t seed = 0) {
int num_levels = levels_per_page * num_pages;
int num_values = 0;
uint32_t seed = 0;
int16_t zero = 0;
int16_t max_def_level = d->max_definition_level();
int16_t max_rep_level = d->max_repetition_level();
Expand All @@ -546,10 +546,22 @@ static inline int MakePages(const ColumnDescriptor* d, int num_pages, int levels
} else {
num_values = num_levels;
}
// Create repetition levels
// Create repitition levels
if (max_rep_level > 0) {
rep_levels.resize(num_levels);
random_numbers(num_levels, seed, zero, max_rep_level, rep_levels.data());
// Using a different seed so that def_levels and rep_levels are different.
random_numbers(num_levels, seed + 789, zero, max_rep_level, rep_levels.data());
// The generated levels are random. Force the very first page to start with a new
// record.
rep_levels[0] = 0;
// For a null value, rep_levels and def_levels are both 0.
// If we have a repeated value right after this, it needs to start with
// rep_level = 0 to indicate a new record.
for (int i = 0; i < num_levels - 1; ++i) {
if (rep_levels[i] == 0 && def_levels[i] == 0) {
rep_levels[i + 1] = 0;
}
}
}
// Create values
values.resize(num_values);
Expand Down