Skip to content

Conversation

colin-ho
Copy link
Contributor

@colin-ho colin-ho commented May 30, 2025

Changes Made

Use spawned tasks to forward the scan task stream instead of using flatten unordered, which internally uses futuresunordered and doesn't allow the runtime to poll the streams concurrently.

Improves performance of reading many small files, such as in the script here: https://gist.github.com/metadaddy/ec9e645fa0929321b626d8be6e11162e

TLDR:

Total record count: 65.28 seconds -> 20.22 seconds
Record count for 2025-03-31: 6.58 seconds -> 2.95 seconds
Capacity: 5.87 seconds -> 2.58 seconds
Top 10 most common drive models: 48.27 seconds -> 25.40 seconds

Before:

INFO:__main__:Setting log level for daft.iceberg.iceberg_scan to ERROR to suppress warning about unspecified partition filter
INFO:__main__:Metadata located at s3://drivestats-iceberg/drivestats/metadata/00239-6b135391-6039-4810-a736-2c0de6d4da0a.metadata.json
/home/ec2-user/daft-main/.venv/lib64/python3.11/site-packages/pyiceberg/avro/decoder.py:185: UserWarning: Falling back to pure Python Avro decoder, missing Cython implementation
  warnings.warn("Falling back to pure Python Avro decoder, missing Cython implementation")
Total record count: 592366002 (65.28 seconds)
/home/ec2-user/daft-main/.venv/lib64/python3.11/site-packages/pyiceberg/avro/decoder.py:185: UserWarning: Falling back to pure Python Avro decoder, missing Cython implementation
  warnings.warn("Falling back to pure Python Avro decoder, missing Cython implementation")
Record count for 2025-03-31: 312831 (6.58 seconds)
Capacity: 4.665 EB (5.87 seconds)
Top 10 most common drive models:
╭──────────────────────┬────────╮
│ model                ┆ count  │
│ ---                  ┆ ---    │
│ Utf8                 ┆ UInt64 │
╞══════════════════════╪════════╡
│ TOSHIBA MG08ACA16TA  ┆ 40936  │
├╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌┤
│ TOSHIBA MG07ACA14TA  ┆ 39387  │
├╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌┤
│ ST12000NM0007        ┆ 38843  │
├╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌┤
│ ST4000DM000          ┆ 37040  │
├╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌┤
│ WDC WUH722222ALE6L4  ┆ 35005  │
├╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌┤
│ ST16000NM001G        ┆ 34578  │
├╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌┤
│ WDC WUH721816ALE6L4  ┆ 26581  │
├╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌┤
│ ST12000NM0008        ┆ 21037  │
├╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌┤
│ HGST HMS5C4040BLE640 ┆ 16349  │
├╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌┤
│ ST8000NM0055         ┆ 15680  │
╰──────────────────────┴────────╯

(Showing first 10 rows)
(48.27 seconds)

After:

INFO:__main__:Setting log level for daft.iceberg.iceberg_scan to ERROR to suppress warning about unspecified partition filter
INFO:__main__:Metadata located at s3://drivestats-iceberg/drivestats/metadata/00239-6b135391-6039-4810-a736-2c0de6d4da0a.metadata.json
/home/ec2-user/daft-main/.venv/lib64/python3.11/site-packages/pyiceberg/avro/decoder.py:185: UserWarning: Falling back to pure Python Avro decoder, missing Cython implementation
  warnings.warn("Falling back to pure Python Avro decoder, missing Cython implementation")
Total record count: 592366002 (20.22 seconds)
/home/ec2-user/daft-main/.venv/lib64/python3.11/site-packages/pyiceberg/avro/decoder.py:185: UserWarning: Falling back to pure Python Avro decoder, missing Cython implementation
  warnings.warn("Falling back to pure Python Avro decoder, missing Cython implementation")
Record count for 2025-03-31: 312831 (2.95 seconds)
Capacity: 4.665 EB (2.58 seconds)
Top 10 most common drive models:
╭──────────────────────┬────────╮
│ model                ┆ count  │
│ ---                  ┆ ---    │
│ Utf8                 ┆ UInt64 │
╞══════════════════════╪════════╡
│ TOSHIBA MG08ACA16TA  ┆ 40936  │
├╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌┤
│ TOSHIBA MG07ACA14TA  ┆ 39387  │
├╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌┤
│ ST12000NM0007        ┆ 38843  │
├╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌┤
│ ST4000DM000          ┆ 37040  │
├╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌┤
│ WDC WUH722222ALE6L4  ┆ 35005  │
├╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌┤
│ ST16000NM001G        ┆ 34578  │
├╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌┤
│ WDC WUH721816ALE6L4  ┆ 26581  │
├╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌┤
│ ST12000NM0008        ┆ 21037  │
├╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌┤
│ HGST HMS5C4040BLE640 ┆ 16349  │
├╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌┤
│ ST8000NM0055         ┆ 15680  │
╰──────────────────────┴────────╯

(Showing first 10 rows)
(25.40 seconds)

Related Issues

Checklist

  • Documented in API Docs (if applicable)
  • Documented in User Guide (if applicable)
  • If adding a new documentation page, doc is added to docs/mkdocs.yml navigation
  • Documentation builds and is formatted properly (tag @/ccmao1130 for docs review)

@github-actions github-actions bot added the perf label May 30, 2025
Copy link

codecov bot commented May 30, 2025

Codecov Report

Attention: Patch coverage is 88.19444% with 17 lines in your changes missing coverage. Please review.

Project coverage is 76.20%. Comparing base (acd8881) to head (41ac172).
Report is 4 commits behind head on main.

Files with missing lines Patch % Lines
src/daft-local-execution/src/sources/scan_task.rs 82.79% 16 Missing ⚠️
src/common/runtime/src/lib.rs 93.33% 1 Missing ⚠️
Additional details and impacted files

Impacted file tree graph

@@            Coverage Diff             @@
##             main    #4451      +/-   ##
==========================================
- Coverage   78.25%   76.20%   -2.06%     
==========================================
  Files         846      846              
  Lines      115257   118828    +3571     
==========================================
+ Hits        90196    90554     +358     
- Misses      25061    28274    +3213     
Files with missing lines Coverage Δ
src/daft-local-execution/src/channel.rs 98.33% <100.00%> (+0.18%) ⬆️
...-execution/src/intermediate_ops/intermediate_op.rs 78.39% <100.00%> (-0.62%) ⬇️
src/daft-local-execution/src/lib.rs 98.26% <100.00%> (+0.10%) ⬆️
src/daft-local-execution/src/run.rs 48.83% <100.00%> (-0.16%) ⬇️
...rc/daft-local-execution/src/sinks/blocking_sink.rs 81.11% <100.00%> (ø)
...c/daft-local-execution/src/sinks/streaming_sink.rs 78.14% <100.00%> (ø)
src/daft-local-execution/src/sources/source.rs 55.55% <100.00%> (ø)
src/daft-parquet/src/file.rs 74.62% <100.00%> (+0.03%) ⬆️
src/daft-parquet/src/lib.rs 70.00% <ø> (ø)
src/daft-parquet/src/stream_reader.rs 91.05% <100.00%> (+0.01%) ⬆️
... and 3 more

... and 77 files with indirect coverage changes

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

@colin-ho colin-ho marked this pull request as ready for review June 2, 2025 19:56
@@ -227,6 +227,28 @@ pub fn get_compute_pool_num_threads() -> usize {
get_or_init_compute_runtime_num_worker_threads()
}

// Helper function to combine a stream with a future that returns a result
pub fn combine_stream<T, E>(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I moved this out because it's a pretty common pattern used in parquet + warc, now local execution and also in the future distributed execution (which i didn't touch for now).

In the future we can have a common/async utilities crate that has all of the async utils but for now i just put it in runtime

Copy link
Contributor

@srilman srilman left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM! Had a couple of questions for learning sake, will let Desmond approve cause of the IO changes.

@@ -599,6 +598,7 @@ pub async fn local_parquet_stream(

let stream_of_streams =
futures::stream::iter(output_receivers.into_iter().map(ReceiverStream::new));
let parquet_task = async move { parquet_task.await? };
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Whats the point of this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The combine_stream method accepts an impl<future<Result<(), E>>> but the parquet task is of type RuntimeTask<DaftResult<()>>, of which it's future type is impl<future<DaftResult<DaftResult<()>>>>.

This is me trying to fold the DaftResult<DaftResult<()>> into a DaftResult<()>

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh the ? gets pushed to the outer function, not the async block. Otherwise async blocks are not like try blocks in nightly. Thanks!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@@ -120,7 +120,7 @@ impl PipelineNode for SourceNode {
let (destination_sender, destination_receiver) = create_channel(0);
let counting_sender =
CountingSender::new(destination_sender, self.runtime_stats.clone(), progress_bar);
runtime_handle.spawn(
runtime_handle.spawn_local(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For my understanding, does this force all spawned tasks to execute on the same thread?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh nvm, just a rename, looks like it always used spawn_local under the hood

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup

Copy link
Collaborator

@desmondcheongzx desmondcheongzx left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Noice, thanks!

@colin-ho colin-ho merged commit c8abb43 into main Jun 2, 2025
46 of 47 checks passed
@colin-ho colin-ho deleted the colin/fix-source-buffering branch June 2, 2025 23:30
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants