Skip to content

Commit

Permalink
[FEAT] Streaming Local Parquet Reads (#2592)
Browse files Browse the repository at this point in the history
This PR implements streaming local file reads for parquet.

Memory profiling results on Q6 (native streaming vs python bulk): Native
streaming achieves almost 2x lower memory
<img width="1108" alt="Screenshot 2024-08-01 at 6 08 40 PM"
src="https://github.com/user-attachments/assets/045594bf-54a4-40ee-ba6d-6524dd3bc6d2">
<img width="1107" alt="Screenshot 2024-08-01 at 6 09 20 PM"
src="https://github.com/user-attachments/assets/4dbef9df-2bc9-410c-8886-d8d2cdb10f78">

TPCH Results: Overall achieves parity with python runner, with some
exceptions like Q1 achieving 1.75x speedup

[tpch_result.txt](https://github.com/user-attachments/files/16463937/tpch_result.txt)



Todos in follow up PRs:
- Metadata only reads
- Remote parquet reads

---------

Co-authored-by: Colin Ho <[email protected]>
Co-authored-by: Colin Ho <[email protected]>
  • Loading branch information
3 people authored Aug 3, 2024
1 parent 9bb4b3a commit b616031
Show file tree
Hide file tree
Showing 16 changed files with 798 additions and 357 deletions.
9 changes: 7 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

40 changes: 16 additions & 24 deletions src/daft-csv/src/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ pub async fn stream_csv(
io_client: Arc<IOClient>,
io_stats: Option<IOStatsRef>,
max_chunks_in_flight: Option<usize>,
) -> DaftResult<BoxStream<'static, DaftResult<Vec<Table>>>> {
) -> DaftResult<BoxStream<'static, DaftResult<Table>>> {
let stream = stream_csv_single(
&uri,
convert_options,
Expand Down Expand Up @@ -333,7 +333,7 @@ async fn stream_csv_single(
io_client: Arc<IOClient>,
io_stats: Option<IOStatsRef>,
max_chunks_in_flight: Option<usize>,
) -> DaftResult<impl Stream<Item = DaftResult<Vec<Table>>> + Send> {
) -> DaftResult<impl Stream<Item = DaftResult<Table>> + Send> {
let predicate = convert_options
.as_ref()
.and_then(|opts| opts.predicate.clone());
Expand Down Expand Up @@ -385,7 +385,8 @@ async fn stream_csv_single(
// Limit the number of chunks we have in flight at any given time.
.try_buffered(max_chunks_in_flight);

let filtered_tables = tables.map_ok(move |table| {
let filtered_tables = tables.map(move |table| {
let table = table?;
if let Some(predicate) = &predicate {
let filtered = table?.filter(&[predicate.clone()])?;
if let Some(include_columns) = &include_columns {
Expand All @@ -399,28 +400,19 @@ async fn stream_csv_single(
});

let mut remaining_rows = limit.map(|limit| limit as i64);
let tables = filtered_tables
.try_take_while(move |result| {
match (result, remaining_rows) {
// Limit has been met, early-terminate.
(_, Some(rows_left)) if rows_left <= 0 => futures::future::ready(Ok(false)),
// Limit has not yet been met, update remaining limit slack and continue.
(Ok(table), Some(rows_left)) => {
remaining_rows = Some(rows_left - table.len() as i64);
futures::future::ready(Ok(true))
}
// (1) No limit, never early-terminate.
// (2) Encountered error, propagate error to try_collect to allow it to short-circuit.
(_, None) | (Err(_), _) => futures::future::ready(Ok(true)),
let tables = filtered_tables.try_take_while(move |table| {
match remaining_rows {
// Limit has been met, early-terminate.
Some(rows_left) if rows_left <= 0 => futures::future::ready(Ok(false)),
// Limit has not yet been met, update remaining limit slack and continue.
Some(rows_left) => {
remaining_rows = Some(rows_left - table.len() as i64);
futures::future::ready(Ok(true))
}
})
.map(|r| match r {
Ok(table) => table,
Err(e) => Err(e.into()),
})
// Chunk the tables into chunks of size max_chunks_in_flight.
.try_ready_chunks(max_chunks_in_flight)
.map_err(|e| DaftError::ComputeError(e.to_string()));
// No limit, never early-terminate.
None => futures::future::ready(Ok(true)),
}
});
Ok(tables)
}

Expand Down
39 changes: 15 additions & 24 deletions src/daft-json/src/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -296,8 +296,7 @@ pub async fn stream_json(
io_client: Arc<IOClient>,
io_stats: Option<IOStatsRef>,
max_chunks_in_flight: Option<usize>,
) -> DaftResult<BoxStream<'static, DaftResult<Vec<Table>>>> {
// BoxStream::
) -> DaftResult<BoxStream<'static, DaftResult<Table>>> {
let predicate = convert_options
.as_ref()
.and_then(|opts| opts.predicate.clone());
Expand Down Expand Up @@ -349,7 +348,8 @@ pub async fn stream_json(
// Limit the number of chunks we have in flight at any given time.
.try_buffered(max_chunks_in_flight);

let filtered_tables = tables.map_ok(move |table| {
let filtered_tables = tables.map(move |table| {
let table = table?;
if let Some(predicate) = &predicate {
let filtered = table?.filter(&[predicate.clone()])?;
if let Some(include_columns) = &include_columns {
Expand All @@ -363,28 +363,19 @@ pub async fn stream_json(
});

let mut remaining_rows = limit.map(|limit| limit as i64);
let tables = filtered_tables
.try_take_while(move |result| {
match (result, remaining_rows) {
// Limit has been met, early-terminate.
(_, Some(rows_left)) if rows_left <= 0 => futures::future::ready(Ok(false)),
// Limit has not yet been met, update remaining limit slack and continue.
(Ok(table), Some(rows_left)) => {
remaining_rows = Some(rows_left - table.len() as i64);
futures::future::ready(Ok(true))
}
// (1) No limit, never early-terminate.
// (2) Encountered error, propagate error to try_collect to allow it to short-circuit.
(_, None) | (Err(_), _) => futures::future::ready(Ok(true)),
let tables = filtered_tables.try_take_while(move |table| {
match remaining_rows {
// Limit has been met, early-terminate.
Some(rows_left) if rows_left <= 0 => futures::future::ready(Ok(false)),
// Limit has not yet been met, update remaining limit slack and continue.
Some(rows_left) => {
remaining_rows = Some(rows_left - table.len() as i64);
futures::future::ready(Ok(true))
}
})
.map(|r| match r {
Ok(table) => table,
Err(e) => Err(e.into()),
})
// Chunk the tables into chunks of size max_chunks_in_flight.
.try_ready_chunks(max_chunks_in_flight)
.map_err(|e| DaftError::ComputeError(e.to_string()));
// No limit, never early-terminate.
None => futures::future::ready(Ok(true)),
}
});
Ok(Box::pin(tables))
}

Expand Down
6 changes: 6 additions & 0 deletions src/daft-local-execution/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,18 @@
async-stream = {workspace = true}
common-error = {path = "../common/error", default-features = false}
common-tracing = {path = "../common/tracing", default-features = false}
daft-core = {path = "../daft-core", default-features = false}
daft-csv = {path = "../daft-csv", default-features = false}
daft-dsl = {path = "../daft-dsl", default-features = false}
daft-io = {path = "../daft-io", default-features = false}
daft-json = {path = "../daft-json", default-features = false}
daft-micropartition = {path = "../daft-micropartition", default-features = false}
daft-parquet = {path = "../daft-parquet", default-features = false}
daft-physical-plan = {path = "../daft-physical-plan", default-features = false}
daft-plan = {path = "../daft-plan", default-features = false}
daft-scan = {path = "../daft-scan", default-features = false}
daft-stats = {path = "../daft-stats", default-features = false}
daft-table = {path = "../daft-table", default-features = false}
dyn-clone = {workspace = true}
futures = {workspace = true}
lazy_static = {workspace = true}
Expand Down
2 changes: 1 addition & 1 deletion src/daft-local-execution/src/sources/in_memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ impl InMemorySource {

impl Source for InMemorySource {
#[instrument(name = "InMemorySource::get_data", level = "info", skip(self))]
fn get_data(&self) -> SourceStream {
fn get_data(&self, maintain_order: bool) -> SourceStream {
stream::iter(self.data.clone().into_iter().map(Ok)).boxed()
}
}
Loading

0 comments on commit b616031

Please sign in to comment.