-
Notifications
You must be signed in to change notification settings - Fork 147
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FEAT] Remote parquet streaming #2620
Conversation
src/daft-parquet/src/file.rs
Outdated
|
||
let mut arr_iters = Vec::with_capacity(self.arrow_schema.fields.len()); | ||
for field in self.arrow_schema.fields.iter() { | ||
let filtered_cols_idx = columns |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
looks like this may be computed for every row_group. We may want to just compute it once and leverage that?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will they always be the same for all row groups?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe so, but you should double check!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok managed to refactor it so that it doesn't need idxs, just retrieves the column metadata directly for each field per rowgroup, which should be unique.
src/daft-parquet/src/file.rs
Outdated
|
||
let mut range_readers = Vec::with_capacity(filtered_cols_idx.len()); | ||
for range in needed_byte_ranges.into_iter() { | ||
let range_reader = ranges.get_range_reader(range).await?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this does block on the first IO request being complete. so you may want to not await this but join_all
once you have all the range_readers.
src/daft-parquet/src/file.rs
Outdated
.collect::<Vec<_>>(); | ||
|
||
let mut range_readers = Vec::with_capacity(filtered_cols_idx.len()); | ||
for range in needed_byte_ranges.into_iter() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
instead of collecting range_readers
into a Vec
, you can probably do this in 1 loop, producing decompressed_iters
, ptypes
, num_values
, etc
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #2620 +/- ##
=======================================
Coverage ? 63.88%
=======================================
Files ? 965
Lines ? 111230
Branches ? 0
=======================================
Hits ? 71060
Misses ? 40170
Partials ? 0
|
Adds streaming reads to remote parquet files.
The algorithm is similar to that for local parquet files: Read bytes into memory -> get arrow chunk iterator -> emit table per chunk
Q6 Memory Profile:
Streaming
Bulk