diff --git a/cpp/src/parquet/stream_reader.cc b/cpp/src/parquet/stream_reader.cc index 48fd6a87589..6e63fc8c20b 100644 --- a/cpp/src/parquet/stream_reader.cc +++ b/cpp/src/parquet/stream_reader.cc @@ -22,7 +22,7 @@ namespace parquet { StreamReader::StreamReader(std::unique_ptr reader) - : file_reader_{std::move(reader)} { + : file_reader_{std::move(reader)}, eof_{false} { file_metadata_ = file_reader_->metadata(); auto schema = file_metadata_->schema(); @@ -36,6 +36,22 @@ StreamReader::StreamReader(std::unique_ptr reader) NextRowGroup(); } +int StreamReader::num_columns() const { + // Check for file metadata i.e. object is not default constructed. + if (file_metadata_) { + return file_metadata_->num_columns(); + } + return 0; +} + +int64_t StreamReader::num_rows() const { + // Check for file metadata i.e. object is not default constructed. + if (file_metadata_) { + return file_metadata_->num_rows(); + } + return 0; +} + StreamReader& StreamReader::operator>>(bool& v) { CheckColumn(Type::BOOLEAN, ConvertedType::NONE); Read(&v); @@ -183,11 +199,11 @@ void StreamReader::EndRow() { " of " + std::to_string(nodes_.size()) + " columns read"); } column_index_ = 0; + ++current_row_; - if (column_readers_[0]->HasNext()) { - return; + if (!column_readers_[0]->HasNext()) { + NextRowGroup(); } - NextRowGroup(); } void StreamReader::NextRowGroup() { @@ -202,23 +218,106 @@ void StreamReader::NextRowGroup() { column_readers_[i] = row_group_reader_->Column(i); } if (column_readers_[0]->HasNext()) { + row_group_row_offset_ = current_row_; return; } } // No more row groups found. + SetEof(); +} + +void StreamReader::SetEof() { + // Do not reset file_metadata_ to ensure queries on the number of + // rows/columns still function. eof_ = true; file_reader_.reset(); - file_metadata_.reset(); row_group_reader_.reset(); column_readers_.clear(); nodes_.clear(); } +int64_t StreamReader::SkipRows(int64_t num_rows_to_skip) { + if (0 != column_index_) { + throw ParquetException("Must finish reading current row before skipping rows."); + } + int64_t num_rows_remaining_to_skip = num_rows_to_skip; + + while (!eof_ && (num_rows_remaining_to_skip > 0)) { + int64_t num_rows_in_row_group = row_group_reader_->metadata()->num_rows(); + int64_t num_rows_remaining_in_row_group = + num_rows_in_row_group - current_row_ - row_group_row_offset_; + + if (num_rows_remaining_in_row_group > num_rows_remaining_to_skip) { + for (auto reader : column_readers_) { + SkipRowsInColumn(reader.get(), num_rows_remaining_to_skip); + } + current_row_ += num_rows_remaining_to_skip; + num_rows_remaining_to_skip = 0; + } else { + num_rows_remaining_to_skip -= num_rows_remaining_in_row_group; + current_row_ += num_rows_remaining_in_row_group; + NextRowGroup(); + } + } + return num_rows_to_skip - num_rows_remaining_to_skip; +} + +int64_t StreamReader::SkipColumns(int64_t num_columns_to_skip) { + int64_t num_columns_skipped = 0; + + if (!eof_) { + for (; (num_columns_to_skip > num_columns_skipped) && + static_cast(column_index_) < nodes_.size(); + ++column_index_) { + SkipRowsInColumn(column_readers_[column_index_].get(), 1); + ++num_columns_skipped; + } + } + return num_columns_skipped; +} + +void StreamReader::SkipRowsInColumn(ColumnReader* reader, int64_t num_rows_to_skip) { + int64_t num_skipped = 0; + + switch (reader->type()) { + case Type::BOOLEAN: + num_skipped = static_cast(reader)->Skip(num_rows_to_skip); + break; + case Type::INT32: + num_skipped = static_cast(reader)->Skip(num_rows_to_skip); + break; + case Type::INT64: + num_skipped = static_cast(reader)->Skip(num_rows_to_skip); + break; + case Type::BYTE_ARRAY: + num_skipped = static_cast(reader)->Skip(num_rows_to_skip); + break; + case Type::FIXED_LEN_BYTE_ARRAY: + num_skipped = static_cast(reader)->Skip(num_rows_to_skip); + break; + case Type::FLOAT: + num_skipped = static_cast(reader)->Skip(num_rows_to_skip); + break; + case Type::DOUBLE: + num_skipped = static_cast(reader)->Skip(num_rows_to_skip); + break; + case Type::INT96: + case Type::UNDEFINED: + throw ParquetException("Unexpected type: " + TypeToString(reader->type())); + break; + } + if (num_rows_to_skip != num_skipped) { + throw ParquetException("Skipped " + std::to_string(num_skipped) + "/" + + std::to_string(num_rows_to_skip) + " rows in column " + + reader->descr()->name()); + } +} + void StreamReader::CheckColumn(Type::type physical_type, ConvertedType::type converted_type, int length) { if (static_cast(column_index_) >= nodes_.size()) { if (eof_) { - throw ParquetException("EOF reached"); + ParquetException::EofException(); } throw ParquetException("Column index out-of-bounds. Index " + std::to_string(column_index_) + " is invalid for " + diff --git a/cpp/src/parquet/stream_reader.h b/cpp/src/parquet/stream_reader.h index cc80e507faa..9d1296e938d 100644 --- a/cpp/src/parquet/stream_reader.h +++ b/cpp/src/parquet/stream_reader.h @@ -56,6 +56,14 @@ class PARQUET_EXPORT StreamReader { bool eof() const { return eof_; } + int current_column() const { return column_index_; } + + int64_t current_row() const { return current_row_; } + + int num_columns() const; + + int64_t num_rows() const; + // Moving is possible. StreamReader(StreamReader&&) = default; StreamReader& operator=(StreamReader&&) = default; @@ -114,6 +122,23 @@ class PARQUET_EXPORT StreamReader { // Terminate current row and advance to next one. void EndRow(); + /// \brief Skip the data in the next columns. + /// If the number of columns exceeds the columns remaining on the + /// current row then skipping is terminated - it does _not_ continue + /// skipping columns on the next row. + /// Skipping of columns still requires the use 'EndRow' even if all + /// remaining columns were skipped. + /// \return Number of columns actually skipped. + int64_t SkipColumns(int64_t num_columns_to_skip); + + /// \brief Skip the data in the next rows. + /// Skipping of rows is not allowed if reading of data for the + /// current row is not finished. + /// Skipping of rows will be terminated if the end of file is + /// reached. + /// \return Number of rows actually skipped. + int64_t SkipRows(int64_t num_rows_to_skip); + protected: template void Read(T* v) { @@ -140,6 +165,10 @@ class PARQUET_EXPORT StreamReader { void CheckColumn(Type::type physical_type, ConvertedType::type converted_type, int length = 0); + void SkipRowsInColumn(ColumnReader* reader, int64_t num_rows_to_skip); + + void SetEof(); + private: using node_ptr_type = std::shared_ptr; @@ -149,9 +178,11 @@ class PARQUET_EXPORT StreamReader { std::vector> column_readers_; std::vector nodes_; - bool eof_{false}; + bool eof_{true}; int row_group_index_{0}; int column_index_{0}; + int64_t current_row_{0}; + int64_t row_group_row_offset_{0}; }; PARQUET_EXPORT diff --git a/cpp/src/parquet/stream_reader_test.cc b/cpp/src/parquet/stream_reader_test.cc index 75e11038893..b2f57309272 100644 --- a/cpp/src/parquet/stream_reader_test.cc +++ b/cpp/src/parquet/stream_reader_test.cc @@ -176,6 +176,18 @@ TEST_F(TestStreamReader, DefaultConstructed) { ASSERT_THROW(os >> i, ParquetException); ASSERT_THROW(os >> s, ParquetException); ASSERT_THROW(os >> EndRow, ParquetException); + + ASSERT_EQ(true, os.eof()); + ASSERT_EQ(0, os.current_column()); + ASSERT_EQ(0, os.current_row()); + + ASSERT_EQ(0, os.num_columns()); + ASSERT_EQ(0, os.num_rows()); + + // Skipping columns and rows is allowed. + // + ASSERT_EQ(0, os.SkipColumns(100)); + ASSERT_EQ(0, os.SkipRows(100)); } TEST_F(TestStreamReader, TypeChecking) { @@ -236,6 +248,8 @@ TEST_F(TestStreamReader, ValueChecking) { int i; for (i = 0; !reader_.eof(); ++i) { + ASSERT_EQ(i, reader_.current_row()); + reader_ >> b; reader_ >> s; reader_ >> c; @@ -261,8 +275,195 @@ TEST_F(TestStreamReader, ValueChecking) { ASSERT_FLOAT_EQ(f, TestData::GetFloat(i)); ASSERT_DOUBLE_EQ(d, TestData::GetDouble(i)); } + ASSERT_EQ(reader_.current_row(), TestData::num_rows); ASSERT_EQ(i, TestData::num_rows); } +TEST_F(TestStreamReader, SkipRows) { + // Skipping zero and negative number of rows is ok. + // + ASSERT_EQ(0, reader_.SkipRows(0)); + ASSERT_EQ(0, reader_.SkipRows(-100)); + + ASSERT_EQ(false, reader_.eof()); + ASSERT_EQ(0, reader_.current_row()); + ASSERT_EQ(TestData::num_rows, reader_.num_rows()); + + const int iter_num_rows_to_read = 3; + const int iter_num_rows_to_skip = 13; + int num_rows_read = 0; + int i = 0; + int num_iterations; + + for (num_iterations = 0; !reader_.eof(); ++num_iterations) { + // Each iteration of this loop reads some rows (iter_num_rows_to_read + // are read) and then skips some rows (iter_num_rows_to_skip will be + // skipped). + // The loop variable i is the current row being read. + // Loop variable j is used just to count the number of rows to + // read. + bool b; + std::string s; + std::array char_array; + char c; + int8_t int8; + uint16_t uint16; + int32_t int32; + uint64_t uint64; + std::chrono::microseconds ts_us; + float f; + double d; + std::string str; + + for (int j = 0; !reader_.eof() && (j < iter_num_rows_to_read); ++i, ++j) { + ASSERT_EQ(i, reader_.current_row()); + + reader_ >> b; + reader_ >> s; + reader_ >> c; + reader_ >> char_array; + reader_ >> int8; + reader_ >> uint16; + + // Not allowed to skip row once reading columns has started. + ASSERT_THROW(reader_.SkipRows(1), ParquetException); + + reader_ >> int32; + reader_ >> uint64; + reader_ >> ts_us; + reader_ >> f; + reader_ >> d; + reader_ >> EndRow; + num_rows_read += 1; + + ASSERT_EQ(b, TestData::GetBool(i)); + ASSERT_EQ(s, TestData::GetString(i)); + ASSERT_EQ(c, TestData::GetChar(i)); + ASSERT_EQ(char_array, TestData::GetCharArray(i)); + ASSERT_EQ(int8, TestData::GetInt8(i)); + ASSERT_EQ(uint16, TestData::GetUInt16(i)); + ASSERT_EQ(int32, TestData::GetInt32(i)); + ASSERT_EQ(uint64, TestData::GetUInt64(i)); + ASSERT_EQ(ts_us, TestData::GetChronoMicroseconds(i)); + ASSERT_FLOAT_EQ(f, TestData::GetFloat(i)); + ASSERT_DOUBLE_EQ(d, TestData::GetDouble(i)); + } + ASSERT_EQ(iter_num_rows_to_skip, reader_.SkipRows(iter_num_rows_to_skip)); + i += iter_num_rows_to_skip; + } + ASSERT_EQ(TestData::num_rows, reader_.current_row()); + + ASSERT_EQ(num_rows_read, num_iterations * iter_num_rows_to_read); + + // Skipping rows at eof is allowed. + // + ASSERT_EQ(0, reader_.SkipRows(100)); +} + +TEST_F(TestStreamReader, SkipAllRows) { + ASSERT_EQ(false, reader_.eof()); + ASSERT_EQ(0, reader_.current_row()); + + ASSERT_EQ(reader_.num_rows(), reader_.SkipRows(2 * reader_.num_rows())); + + ASSERT_EQ(true, reader_.eof()); + ASSERT_EQ(reader_.num_rows(), reader_.current_row()); +} + +TEST_F(TestStreamReader, SkipColumns) { + bool b; + std::string s; + std::array char_array; + char c; + int8_t int8; + uint16_t uint16; + int32_t int32; + uint64_t uint64; + std::chrono::microseconds ts_us; + float f; + double d; + std::string str; + + int i; + + // Skipping zero and negative number of columns is ok. + // + ASSERT_EQ(0, reader_.SkipColumns(0)); + ASSERT_EQ(0, reader_.SkipColumns(-100)); + + for (i = 0; !reader_.eof(); ++i) { + ASSERT_EQ(i, reader_.current_row()); + ASSERT_EQ(0, reader_.current_column()); + + // Skip all columns every 31 rows. + if (i % 31 == 0) { + ASSERT_EQ(reader_.num_columns(), reader_.SkipColumns(reader_.num_columns())); + ASSERT_EQ(reader_.num_columns(), reader_.current_column()); + reader_ >> EndRow; + continue; + } + reader_ >> b; + ASSERT_EQ(b, TestData::GetBool(i)); + ASSERT_EQ(1, reader_.current_column()); + + // Skip the next column every 3 rows. + if (i % 3 == 0) { + ASSERT_EQ(1, reader_.SkipColumns(1)); + } else { + reader_ >> s; + ASSERT_EQ(s, TestData::GetString(i)); + } + ASSERT_EQ(2, reader_.current_column()); + + reader_ >> c; + ASSERT_EQ(c, TestData::GetChar(i)); + ASSERT_EQ(3, reader_.current_column()); + reader_ >> char_array; + ASSERT_EQ(char_array, TestData::GetCharArray(i)); + ASSERT_EQ(4, reader_.current_column()); + reader_ >> int8; + ASSERT_EQ(int8, TestData::GetInt8(i)); + ASSERT_EQ(5, reader_.current_column()); + + // Skip the next 3 columns every 7 rows. + if (i % 7 == 0) { + ASSERT_EQ(3, reader_.SkipColumns(3)); + } else { + reader_ >> uint16; + ASSERT_EQ(uint16, TestData::GetUInt16(i)); + ASSERT_EQ(6, reader_.current_column()); + reader_ >> int32; + ASSERT_EQ(int32, TestData::GetInt32(i)); + ASSERT_EQ(7, reader_.current_column()); + reader_ >> uint64; + ASSERT_EQ(uint64, TestData::GetUInt64(i)); + } + ASSERT_EQ(8, reader_.current_column()); + + reader_ >> ts_us; + ASSERT_EQ(ts_us, TestData::GetChronoMicroseconds(i)); + ASSERT_EQ(9, reader_.current_column()); + + // Skip 301 columns (i.e. all remaining) every 11 rows. + if (i % 11 == 0) { + ASSERT_EQ(2, reader_.SkipColumns(301)); + } else { + reader_ >> f; + ASSERT_FLOAT_EQ(f, TestData::GetFloat(i)); + ASSERT_EQ(10, reader_.current_column()); + reader_ >> d; + ASSERT_DOUBLE_EQ(d, TestData::GetDouble(i)); + } + ASSERT_EQ(11, reader_.current_column()); + reader_ >> EndRow; + } + ASSERT_EQ(i, TestData::num_rows); + ASSERT_EQ(reader_.current_row(), TestData::num_rows); + + // Skipping columns at eof is allowed. + // + ASSERT_EQ(0, reader_.SkipColumns(100)); +} + } // namespace test } // namespace parquet