Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FEAT] Streaming Local Parquet Reads #2592

Merged
merged 9 commits into from
Aug 3, 2024
Merged

Conversation

colin-ho
Copy link
Contributor

@colin-ho colin-ho commented Jul 31, 2024

This PR implements streaming local file reads for parquet.

Memory profiling results on Q6 (native streaming vs python bulk): Native streaming achieves almost 2x lower memory
Screenshot 2024-08-01 at 6 08 40 PM
Screenshot 2024-08-01 at 6 09 20 PM

TPCH Results: Overall achieves parity with python runner, with some exceptions like Q1 achieving 1.75x speedup
tpch_result.txt

Todos in follow up PRs:

  • Metadata only reads
  • Remote parquet reads

@github-actions github-actions bot added the enhancement New feature or request label Jul 31, 2024
Copy link

codecov bot commented Jul 31, 2024

Codecov Report

Attention: Patch coverage is 0% with 394 lines in your changes missing coverage. Please review.

Please upload report for BASE (main@ddabd34). Learn more about missing BASE report.
Report is 4 commits behind head on main.

Files Patch % Lines
src/daft-parquet/src/stream_reader.rs 0.00% 205 Missing ⚠️
src/daft-parquet/src/read.rs 0.00% 130 Missing ⚠️
src/daft-micropartition/src/micropartition.rs 0.00% 53 Missing ⚠️
src/daft-parquet/src/file.rs 0.00% 5 Missing ⚠️
src/daft-physical-plan/src/translate.rs 0.00% 1 Missing ⚠️
Additional details and impacted files

Impacted file tree graph

@@           Coverage Diff           @@
##             main    #2592   +/-   ##
=======================================
  Coverage        ?   63.63%           
=======================================
  Files           ?      959           
  Lines           ?   109918           
  Branches        ?        0           
=======================================
  Hits            ?    69943           
  Misses          ?    39975           
  Partials        ?        0           
Files Coverage Δ
src/daft-parquet/src/lib.rs 50.00% <ø> (ø)
src/daft-physical-plan/src/translate.rs 0.00% <0.00%> (ø)
src/daft-parquet/src/file.rs 65.83% <0.00%> (ø)
src/daft-micropartition/src/micropartition.rs 76.35% <0.00%> (ø)
src/daft-parquet/src/read.rs 56.88% <0.00%> (ø)
src/daft-parquet/src/stream_reader.rs 49.19% <0.00%> (ø)

// Use block in place to read metadata as the current function is in an asynchronous context.
let metadata = match metadata {
Some(m) => m,
None => read::read_metadata(&mut reader)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a blocking call in an asynchronous context. However since it is a metadata read, not sure if it's worth the overhead of calling it in a blocking thread i.e. spawn_blocking / rayon.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this should be just reading the metadata from the local filesystem which should be pretty quick to do once.

@desmondcheongzx desmondcheongzx self-requested a review August 1, 2024 00:03
@@ -18,7 +18,7 @@ impl InMemorySource {

impl Source for InMemorySource {
#[instrument(name = "InMemorySource::get_data", level = "info", skip(self))]
fn get_data(&self) -> SourceStream {
fn get_data(&self, in_order: bool) -> SourceStream {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So I know this already existed within MultiSender but it's not very apparent from the code what this flag actually represents.

does it mean:

A: the node does not change the output ordering
B: the data has been ordered
C: the node requires that the data is ordered

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah.. still trying to figure out the best abstractions for the new executor, so apologies for the confusion here.

But to answer your question, it's supposed to indicate that the parent node requires that the data received is ordered.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what about using some enums and additional trait methods just to make everything a bit more readable

enum Ordering {
  Unordered,
  Ordered,
  Unknown
}


trait Source {
    fn get_data(&self, ordering: &Ordering) -> Data;
}

pub trait IntermediateOperator {
    fn execute(&self, input: &Arc<MicroPartition>) -> DaftResult<Arc<MicroPartition>>;
    fn name(&self) -> &'static str;
    fn output_ordering(&self) -> Ordering
    fn required_input_ordering(&self) -> Ordering;
}


impl MultiSender {
  fn required_input_ordering(&self) -> Ordering;
}

src/daft-micropartition/src/micropartition.rs Outdated Show resolved Hide resolved
src/daft-micropartition/src/micropartition.rs Outdated Show resolved Hide resolved
src/daft-micropartition/src/micropartition.rs Outdated Show resolved Hide resolved
src/daft-micropartition/src/micropartition.rs Outdated Show resolved Hide resolved
src/daft-micropartition/src/micropartition.rs Outdated Show resolved Hide resolved
// No limit, never early-terminate.
None => futures::future::ready(Ok(true)),
}
});
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just cleaning up some of the streaming CSV code

// No limit, never early-terminate.
None => futures::future::ready(Ok(true)),
}
});
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just cleaning up some of the streaming json code

// Use block in place to read metadata as the current function is in an asynchronous context.
let metadata = match metadata {
Some(m) => m,
None => read::read_metadata(&mut reader)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this should be just reading the metadata from the local filesystem which should be pretty quick to do once.

rg_metadata,
schema.fields.clone(),
Some(chunk_size),
num_rows,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe this should be the num_rows we need from that row_group. Let's say you have 2 row groups of 10 rows each and we request 15 rows.
The first request to the row group should be 10 rows and the second one should be 5.

I believe the row_ranges has this number for each row group

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ahh thats an important catch thanks

.map(|rg_range| {
let expected_num_chunks =
f32::ceil(rg_range.num_rows as f32 / chunk_size as f32) as usize;
tokio::sync::mpsc::channel(expected_num_chunks)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For channels we should use the std channels rather than the tokio ones which are async emulations of those abstractions.

For even better performance, we can use https://docs.rs/crossbeam/latest/crossbeam/channel/index.html

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One of the cool things in crossbeam is that we can use it as an iterator.
https://docs.rs/crossbeam/latest/crossbeam/channel/index.html#iteration

tokio::sync::mpsc::channel(expected_num_chunks)
})
.unzip();
// Create a channel to send errors to the stream
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you shouldn't need to have a seperate channel to send errors. Normally we can just send DaftResult into the output channel.

@colin-ho colin-ho requested a review from samster25 August 2, 2024 23:29
Copy link
Member

@samster25 samster25 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔥

@colin-ho colin-ho merged commit b616031 into main Aug 3, 2024
44 checks passed
@colin-ho colin-ho deleted the colin/streaming-parquet branch August 3, 2024 00:43
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants