Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
111 changes: 105 additions & 6 deletions cpp/src/parquet/stream_reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
namespace parquet {

StreamReader::StreamReader(std::unique_ptr<ParquetFileReader> reader)
: file_reader_{std::move(reader)} {
: file_reader_{std::move(reader)}, eof_{false} {
file_metadata_ = file_reader_->metadata();

auto schema = file_metadata_->schema();
Expand All @@ -36,6 +36,22 @@ StreamReader::StreamReader(std::unique_ptr<ParquetFileReader> reader)
NextRowGroup();
}

int StreamReader::num_columns() const {
// Check for file metadata i.e. object is not default constructed.
if (file_metadata_) {
return file_metadata_->num_columns();
}
return 0;
}

int64_t StreamReader::num_rows() const {
// Check for file metadata i.e. object is not default constructed.
if (file_metadata_) {
return file_metadata_->num_rows();
}
return 0;
}

StreamReader& StreamReader::operator>>(bool& v) {
CheckColumn(Type::BOOLEAN, ConvertedType::NONE);
Read<BoolReader>(&v);
Expand Down Expand Up @@ -183,11 +199,11 @@ void StreamReader::EndRow() {
" of " + std::to_string(nodes_.size()) + " columns read");
}
column_index_ = 0;
++current_row_;

if (column_readers_[0]->HasNext()) {
return;
if (!column_readers_[0]->HasNext()) {
NextRowGroup();
}
NextRowGroup();
}

void StreamReader::NextRowGroup() {
Expand All @@ -202,23 +218,106 @@ void StreamReader::NextRowGroup() {
column_readers_[i] = row_group_reader_->Column(i);
}
if (column_readers_[0]->HasNext()) {
row_group_row_offset_ = current_row_;
return;
}
}
// No more row groups found.
SetEof();
}

void StreamReader::SetEof() {
// Do not reset file_metadata_ to ensure queries on the number of
// rows/columns still function.
eof_ = true;
file_reader_.reset();
file_metadata_.reset();
row_group_reader_.reset();
column_readers_.clear();
nodes_.clear();
}

int64_t StreamReader::SkipRows(int64_t num_rows_to_skip) {
if (0 != column_index_) {
throw ParquetException("Must finish reading current row before skipping rows.");
}
int64_t num_rows_remaining_to_skip = num_rows_to_skip;

while (!eof_ && (num_rows_remaining_to_skip > 0)) {
int64_t num_rows_in_row_group = row_group_reader_->metadata()->num_rows();
int64_t num_rows_remaining_in_row_group =
num_rows_in_row_group - current_row_ - row_group_row_offset_;

if (num_rows_remaining_in_row_group > num_rows_remaining_to_skip) {
for (auto reader : column_readers_) {
SkipRowsInColumn(reader.get(), num_rows_remaining_to_skip);
}
current_row_ += num_rows_remaining_to_skip;
num_rows_remaining_to_skip = 0;
} else {
num_rows_remaining_to_skip -= num_rows_remaining_in_row_group;
current_row_ += num_rows_remaining_in_row_group;
NextRowGroup();
}
}
return num_rows_to_skip - num_rows_remaining_to_skip;
}

int64_t StreamReader::SkipColumns(int64_t num_columns_to_skip) {
int64_t num_columns_skipped = 0;

if (!eof_) {
for (; (num_columns_to_skip > num_columns_skipped) &&
static_cast<std::size_t>(column_index_) < nodes_.size();
++column_index_) {
SkipRowsInColumn(column_readers_[column_index_].get(), 1);
++num_columns_skipped;
}
}
return num_columns_skipped;
}

void StreamReader::SkipRowsInColumn(ColumnReader* reader, int64_t num_rows_to_skip) {
int64_t num_skipped = 0;

switch (reader->type()) {
case Type::BOOLEAN:
num_skipped = static_cast<BoolReader*>(reader)->Skip(num_rows_to_skip);
break;
case Type::INT32:
num_skipped = static_cast<Int32Reader*>(reader)->Skip(num_rows_to_skip);
break;
case Type::INT64:
num_skipped = static_cast<Int64Reader*>(reader)->Skip(num_rows_to_skip);
break;
case Type::BYTE_ARRAY:
num_skipped = static_cast<ByteArrayReader*>(reader)->Skip(num_rows_to_skip);
break;
case Type::FIXED_LEN_BYTE_ARRAY:
num_skipped = static_cast<FixedLenByteArrayReader*>(reader)->Skip(num_rows_to_skip);
break;
case Type::FLOAT:
num_skipped = static_cast<FloatReader*>(reader)->Skip(num_rows_to_skip);
break;
case Type::DOUBLE:
num_skipped = static_cast<DoubleReader*>(reader)->Skip(num_rows_to_skip);
break;
case Type::INT96:
case Type::UNDEFINED:
throw ParquetException("Unexpected type: " + TypeToString(reader->type()));
break;
}
if (num_rows_to_skip != num_skipped) {
throw ParquetException("Skipped " + std::to_string(num_skipped) + "/" +
std::to_string(num_rows_to_skip) + " rows in column " +
reader->descr()->name());
}
}

void StreamReader::CheckColumn(Type::type physical_type,
ConvertedType::type converted_type, int length) {
if (static_cast<std::size_t>(column_index_) >= nodes_.size()) {
if (eof_) {
throw ParquetException("EOF reached");
ParquetException::EofException();
}
throw ParquetException("Column index out-of-bounds. Index " +
std::to_string(column_index_) + " is invalid for " +
Expand Down
33 changes: 32 additions & 1 deletion cpp/src/parquet/stream_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,14 @@ class PARQUET_EXPORT StreamReader {

bool eof() const { return eof_; }

int current_column() const { return column_index_; }

int64_t current_row() const { return current_row_; }

int num_columns() const;

int64_t num_rows() const;

// Moving is possible.
StreamReader(StreamReader&&) = default;
StreamReader& operator=(StreamReader&&) = default;
Expand Down Expand Up @@ -114,6 +122,23 @@ class PARQUET_EXPORT StreamReader {
// Terminate current row and advance to next one.
void EndRow();

/// \brief Skip the data in the next columns.
/// If the number of columns exceeds the columns remaining on the
/// current row then skipping is terminated - it does _not_ continue
/// skipping columns on the next row.
/// Skipping of columns still requires the use 'EndRow' even if all
/// remaining columns were skipped.
/// \return Number of columns actually skipped.
int64_t SkipColumns(int64_t num_columns_to_skip);

/// \brief Skip the data in the next rows.
/// Skipping of rows is not allowed if reading of data for the
/// current row is not finished.
/// Skipping of rows will be terminated if the end of file is
/// reached.
/// \return Number of rows actually skipped.
int64_t SkipRows(int64_t num_rows_to_skip);

protected:
template <typename ReaderType, typename T>
void Read(T* v) {
Expand All @@ -140,6 +165,10 @@ class PARQUET_EXPORT StreamReader {
void CheckColumn(Type::type physical_type, ConvertedType::type converted_type,
int length = 0);

void SkipRowsInColumn(ColumnReader* reader, int64_t num_rows_to_skip);

void SetEof();

private:
using node_ptr_type = std::shared_ptr<schema::PrimitiveNode>;

Expand All @@ -149,9 +178,11 @@ class PARQUET_EXPORT StreamReader {
std::vector<std::shared_ptr<ColumnReader>> column_readers_;
std::vector<node_ptr_type> nodes_;

bool eof_{false};
bool eof_{true};
int row_group_index_{0};
int column_index_{0};
int64_t current_row_{0};
int64_t row_group_row_offset_{0};
};

PARQUET_EXPORT
Expand Down
Loading