Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FEAT] Streaming Local Parquet Reads #2592

Merged
merged 9 commits into from
Aug 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

40 changes: 16 additions & 24 deletions src/daft-csv/src/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ pub async fn stream_csv(
io_client: Arc<IOClient>,
io_stats: Option<IOStatsRef>,
max_chunks_in_flight: Option<usize>,
) -> DaftResult<BoxStream<'static, DaftResult<Vec<Table>>>> {
) -> DaftResult<BoxStream<'static, DaftResult<Table>>> {
let stream = stream_csv_single(
&uri,
convert_options,
Expand Down Expand Up @@ -333,7 +333,7 @@ async fn stream_csv_single(
io_client: Arc<IOClient>,
io_stats: Option<IOStatsRef>,
max_chunks_in_flight: Option<usize>,
) -> DaftResult<impl Stream<Item = DaftResult<Vec<Table>>> + Send> {
) -> DaftResult<impl Stream<Item = DaftResult<Table>> + Send> {
let predicate = convert_options
.as_ref()
.and_then(|opts| opts.predicate.clone());
Expand Down Expand Up @@ -385,7 +385,8 @@ async fn stream_csv_single(
// Limit the number of chunks we have in flight at any given time.
.try_buffered(max_chunks_in_flight);

let filtered_tables = tables.map_ok(move |table| {
let filtered_tables = tables.map(move |table| {
let table = table?;
if let Some(predicate) = &predicate {
let filtered = table?.filter(&[predicate.clone()])?;
if let Some(include_columns) = &include_columns {
Expand All @@ -399,28 +400,19 @@ async fn stream_csv_single(
});

let mut remaining_rows = limit.map(|limit| limit as i64);
let tables = filtered_tables
.try_take_while(move |result| {
match (result, remaining_rows) {
// Limit has been met, early-terminate.
(_, Some(rows_left)) if rows_left <= 0 => futures::future::ready(Ok(false)),
// Limit has not yet been met, update remaining limit slack and continue.
(Ok(table), Some(rows_left)) => {
remaining_rows = Some(rows_left - table.len() as i64);
futures::future::ready(Ok(true))
}
// (1) No limit, never early-terminate.
// (2) Encountered error, propagate error to try_collect to allow it to short-circuit.
(_, None) | (Err(_), _) => futures::future::ready(Ok(true)),
let tables = filtered_tables.try_take_while(move |table| {
match remaining_rows {
// Limit has been met, early-terminate.
Some(rows_left) if rows_left <= 0 => futures::future::ready(Ok(false)),
// Limit has not yet been met, update remaining limit slack and continue.
Some(rows_left) => {
remaining_rows = Some(rows_left - table.len() as i64);
futures::future::ready(Ok(true))
}
})
.map(|r| match r {
Ok(table) => table,
Err(e) => Err(e.into()),
})
// Chunk the tables into chunks of size max_chunks_in_flight.
.try_ready_chunks(max_chunks_in_flight)
.map_err(|e| DaftError::ComputeError(e.to_string()));
// No limit, never early-terminate.
None => futures::future::ready(Ok(true)),
}
});
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just cleaning up some of the streaming CSV code

Ok(tables)
}

Expand Down
39 changes: 15 additions & 24 deletions src/daft-json/src/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -296,8 +296,7 @@ pub async fn stream_json(
io_client: Arc<IOClient>,
io_stats: Option<IOStatsRef>,
max_chunks_in_flight: Option<usize>,
) -> DaftResult<BoxStream<'static, DaftResult<Vec<Table>>>> {
// BoxStream::
) -> DaftResult<BoxStream<'static, DaftResult<Table>>> {
let predicate = convert_options
.as_ref()
.and_then(|opts| opts.predicate.clone());
Expand Down Expand Up @@ -349,7 +348,8 @@ pub async fn stream_json(
// Limit the number of chunks we have in flight at any given time.
.try_buffered(max_chunks_in_flight);

let filtered_tables = tables.map_ok(move |table| {
let filtered_tables = tables.map(move |table| {
let table = table?;
if let Some(predicate) = &predicate {
let filtered = table?.filter(&[predicate.clone()])?;
if let Some(include_columns) = &include_columns {
Expand All @@ -363,28 +363,19 @@ pub async fn stream_json(
});

let mut remaining_rows = limit.map(|limit| limit as i64);
let tables = filtered_tables
.try_take_while(move |result| {
match (result, remaining_rows) {
// Limit has been met, early-terminate.
(_, Some(rows_left)) if rows_left <= 0 => futures::future::ready(Ok(false)),
// Limit has not yet been met, update remaining limit slack and continue.
(Ok(table), Some(rows_left)) => {
remaining_rows = Some(rows_left - table.len() as i64);
futures::future::ready(Ok(true))
}
// (1) No limit, never early-terminate.
// (2) Encountered error, propagate error to try_collect to allow it to short-circuit.
(_, None) | (Err(_), _) => futures::future::ready(Ok(true)),
let tables = filtered_tables.try_take_while(move |table| {
match remaining_rows {
// Limit has been met, early-terminate.
Some(rows_left) if rows_left <= 0 => futures::future::ready(Ok(false)),
// Limit has not yet been met, update remaining limit slack and continue.
Some(rows_left) => {
remaining_rows = Some(rows_left - table.len() as i64);
futures::future::ready(Ok(true))
}
})
.map(|r| match r {
Ok(table) => table,
Err(e) => Err(e.into()),
})
// Chunk the tables into chunks of size max_chunks_in_flight.
.try_ready_chunks(max_chunks_in_flight)
.map_err(|e| DaftError::ComputeError(e.to_string()));
// No limit, never early-terminate.
None => futures::future::ready(Ok(true)),
}
});
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just cleaning up some of the streaming json code

Ok(Box::pin(tables))
}

Expand Down
6 changes: 6 additions & 0 deletions src/daft-local-execution/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,18 @@
async-stream = {workspace = true}
common-error = {path = "../common/error", default-features = false}
common-tracing = {path = "../common/tracing", default-features = false}
daft-core = {path = "../daft-core", default-features = false}
daft-csv = {path = "../daft-csv", default-features = false}
daft-dsl = {path = "../daft-dsl", default-features = false}
daft-io = {path = "../daft-io", default-features = false}
daft-json = {path = "../daft-json", default-features = false}
daft-micropartition = {path = "../daft-micropartition", default-features = false}
daft-parquet = {path = "../daft-parquet", default-features = false}
daft-physical-plan = {path = "../daft-physical-plan", default-features = false}
daft-plan = {path = "../daft-plan", default-features = false}
daft-scan = {path = "../daft-scan", default-features = false}
daft-stats = {path = "../daft-stats", default-features = false}
daft-table = {path = "../daft-table", default-features = false}
dyn-clone = {workspace = true}
futures = {workspace = true}
lazy_static = {workspace = true}
Expand Down
2 changes: 1 addition & 1 deletion src/daft-local-execution/src/sources/in_memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ impl InMemorySource {

impl Source for InMemorySource {
#[instrument(name = "InMemorySource::get_data", level = "info", skip(self))]
fn get_data(&self) -> SourceStream {
fn get_data(&self, maintain_order: bool) -> SourceStream {
stream::iter(self.data.clone().into_iter().map(Ok)).boxed()
}
}
Loading
Loading