From b6160311b28e2078fabf5224bb26a1a82145da6f Mon Sep 17 00:00:00 2001 From: Colin Ho Date: Fri, 2 Aug 2024 17:43:02 -0700 Subject: [PATCH] [FEAT] Streaming Local Parquet Reads (#2592) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This PR implements streaming local file reads for parquet. Memory profiling results on Q6 (native streaming vs python bulk): Native streaming achieves almost 2x lower memory Screenshot 2024-08-01 at 6 08 40 PM Screenshot 2024-08-01 at 6 09 20 PM TPCH Results: Overall achieves parity with python runner, with some exceptions like Q1 achieving 1.75x speedup [tpch_result.txt](https://github.com/user-attachments/files/16463937/tpch_result.txt) Todos in follow up PRs: - Metadata only reads - Remote parquet reads --------- Co-authored-by: Colin Ho Co-authored-by: Colin Ho --- Cargo.lock | 9 +- src/daft-csv/src/read.rs | 40 ++- src/daft-json/src/read.rs | 39 +-- src/daft-local-execution/Cargo.toml | 6 + .../src/sources/in_memory.rs | 2 +- .../src/sources/scan_task.rs | 269 +++++++++++++++++- .../src/sources/source.rs | 9 +- src/daft-micropartition/Cargo.toml | 2 - src/daft-micropartition/src/micropartition.rs | 265 ----------------- src/daft-parquet/Cargo.toml | 1 + src/daft-parquet/src/file.rs | 9 +- src/daft-parquet/src/lib.rs | 13 +- src/daft-parquet/src/read.rs | 177 +++++++++++- src/daft-parquet/src/stream_reader.rs | 267 ++++++++++++++++- src/daft-physical-plan/src/translate.rs | 2 +- tests/benchmarks/test_local_tpch.py | 45 +-- 16 files changed, 798 insertions(+), 357 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 00ec8f6794..009a9544bf 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1789,12 +1789,18 @@ dependencies = [ "async-stream", "common-error", "common-tracing", + "daft-core", + "daft-csv", "daft-dsl", "daft-io", + "daft-json", "daft-micropartition", + "daft-parquet", "daft-physical-plan", "daft-plan", "daft-scan", + "daft-stats", + "daft-table", "dyn-clone", "futures", "lazy_static", @@ -1809,7 +1815,6 @@ name = "daft-micropartition" version = "0.2.0-dev0" dependencies = [ "arrow2", - "async-stream", "bincode", "common-error", "daft-core", @@ -1821,7 +1826,6 @@ dependencies = [ "daft-scan", "daft-stats", "daft-table", - "futures", "parquet2", "pyo3", "snafu", @@ -1846,6 +1850,7 @@ dependencies = [ "async-stream", "bytes", "common-error", + "crossbeam-channel", "daft-core", "daft-dsl", "daft-io", diff --git a/src/daft-csv/src/read.rs b/src/daft-csv/src/read.rs index cc0923c7bd..31ab4f84f0 100644 --- a/src/daft-csv/src/read.rs +++ b/src/daft-csv/src/read.rs @@ -144,7 +144,7 @@ pub async fn stream_csv( io_client: Arc, io_stats: Option, max_chunks_in_flight: Option, -) -> DaftResult>>> { +) -> DaftResult>> { let stream = stream_csv_single( &uri, convert_options, @@ -333,7 +333,7 @@ async fn stream_csv_single( io_client: Arc, io_stats: Option, max_chunks_in_flight: Option, -) -> DaftResult>> + Send> { +) -> DaftResult> + Send> { let predicate = convert_options .as_ref() .and_then(|opts| opts.predicate.clone()); @@ -385,7 +385,8 @@ async fn stream_csv_single( // Limit the number of chunks we have in flight at any given time. .try_buffered(max_chunks_in_flight); - let filtered_tables = tables.map_ok(move |table| { + let filtered_tables = tables.map(move |table| { + let table = table?; if let Some(predicate) = &predicate { let filtered = table?.filter(&[predicate.clone()])?; if let Some(include_columns) = &include_columns { @@ -399,28 +400,19 @@ async fn stream_csv_single( }); let mut remaining_rows = limit.map(|limit| limit as i64); - let tables = filtered_tables - .try_take_while(move |result| { - match (result, remaining_rows) { - // Limit has been met, early-terminate. - (_, Some(rows_left)) if rows_left <= 0 => futures::future::ready(Ok(false)), - // Limit has not yet been met, update remaining limit slack and continue. - (Ok(table), Some(rows_left)) => { - remaining_rows = Some(rows_left - table.len() as i64); - futures::future::ready(Ok(true)) - } - // (1) No limit, never early-terminate. - // (2) Encountered error, propagate error to try_collect to allow it to short-circuit. - (_, None) | (Err(_), _) => futures::future::ready(Ok(true)), + let tables = filtered_tables.try_take_while(move |table| { + match remaining_rows { + // Limit has been met, early-terminate. + Some(rows_left) if rows_left <= 0 => futures::future::ready(Ok(false)), + // Limit has not yet been met, update remaining limit slack and continue. + Some(rows_left) => { + remaining_rows = Some(rows_left - table.len() as i64); + futures::future::ready(Ok(true)) } - }) - .map(|r| match r { - Ok(table) => table, - Err(e) => Err(e.into()), - }) - // Chunk the tables into chunks of size max_chunks_in_flight. - .try_ready_chunks(max_chunks_in_flight) - .map_err(|e| DaftError::ComputeError(e.to_string())); + // No limit, never early-terminate. + None => futures::future::ready(Ok(true)), + } + }); Ok(tables) } diff --git a/src/daft-json/src/read.rs b/src/daft-json/src/read.rs index 94f9573bab..38bb4c1973 100644 --- a/src/daft-json/src/read.rs +++ b/src/daft-json/src/read.rs @@ -296,8 +296,7 @@ pub async fn stream_json( io_client: Arc, io_stats: Option, max_chunks_in_flight: Option, -) -> DaftResult>>> { - // BoxStream:: +) -> DaftResult>> { let predicate = convert_options .as_ref() .and_then(|opts| opts.predicate.clone()); @@ -349,7 +348,8 @@ pub async fn stream_json( // Limit the number of chunks we have in flight at any given time. .try_buffered(max_chunks_in_flight); - let filtered_tables = tables.map_ok(move |table| { + let filtered_tables = tables.map(move |table| { + let table = table?; if let Some(predicate) = &predicate { let filtered = table?.filter(&[predicate.clone()])?; if let Some(include_columns) = &include_columns { @@ -363,28 +363,19 @@ pub async fn stream_json( }); let mut remaining_rows = limit.map(|limit| limit as i64); - let tables = filtered_tables - .try_take_while(move |result| { - match (result, remaining_rows) { - // Limit has been met, early-terminate. - (_, Some(rows_left)) if rows_left <= 0 => futures::future::ready(Ok(false)), - // Limit has not yet been met, update remaining limit slack and continue. - (Ok(table), Some(rows_left)) => { - remaining_rows = Some(rows_left - table.len() as i64); - futures::future::ready(Ok(true)) - } - // (1) No limit, never early-terminate. - // (2) Encountered error, propagate error to try_collect to allow it to short-circuit. - (_, None) | (Err(_), _) => futures::future::ready(Ok(true)), + let tables = filtered_tables.try_take_while(move |table| { + match remaining_rows { + // Limit has been met, early-terminate. + Some(rows_left) if rows_left <= 0 => futures::future::ready(Ok(false)), + // Limit has not yet been met, update remaining limit slack and continue. + Some(rows_left) => { + remaining_rows = Some(rows_left - table.len() as i64); + futures::future::ready(Ok(true)) } - }) - .map(|r| match r { - Ok(table) => table, - Err(e) => Err(e.into()), - }) - // Chunk the tables into chunks of size max_chunks_in_flight. - .try_ready_chunks(max_chunks_in_flight) - .map_err(|e| DaftError::ComputeError(e.to_string())); + // No limit, never early-terminate. + None => futures::future::ready(Ok(true)), + } + }); Ok(Box::pin(tables)) } diff --git a/src/daft-local-execution/Cargo.toml b/src/daft-local-execution/Cargo.toml index 214405baa4..4ac61ccfc0 100644 --- a/src/daft-local-execution/Cargo.toml +++ b/src/daft-local-execution/Cargo.toml @@ -2,12 +2,18 @@ async-stream = {workspace = true} common-error = {path = "../common/error", default-features = false} common-tracing = {path = "../common/tracing", default-features = false} +daft-core = {path = "../daft-core", default-features = false} +daft-csv = {path = "../daft-csv", default-features = false} daft-dsl = {path = "../daft-dsl", default-features = false} daft-io = {path = "../daft-io", default-features = false} +daft-json = {path = "../daft-json", default-features = false} daft-micropartition = {path = "../daft-micropartition", default-features = false} +daft-parquet = {path = "../daft-parquet", default-features = false} daft-physical-plan = {path = "../daft-physical-plan", default-features = false} daft-plan = {path = "../daft-plan", default-features = false} daft-scan = {path = "../daft-scan", default-features = false} +daft-stats = {path = "../daft-stats", default-features = false} +daft-table = {path = "../daft-table", default-features = false} dyn-clone = {workspace = true} futures = {workspace = true} lazy_static = {workspace = true} diff --git a/src/daft-local-execution/src/sources/in_memory.rs b/src/daft-local-execution/src/sources/in_memory.rs index f64ae4ebce..259040ac11 100644 --- a/src/daft-local-execution/src/sources/in_memory.rs +++ b/src/daft-local-execution/src/sources/in_memory.rs @@ -18,7 +18,7 @@ impl InMemorySource { impl Source for InMemorySource { #[instrument(name = "InMemorySource::get_data", level = "info", skip(self))] - fn get_data(&self) -> SourceStream { + fn get_data(&self, maintain_order: bool) -> SourceStream { stream::iter(self.data.clone().into_iter().map(Ok)).boxed() } } diff --git a/src/daft-local-execution/src/sources/scan_task.rs b/src/daft-local-execution/src/sources/scan_task.rs index bc2d92baec..21723cf97c 100644 --- a/src/daft-local-execution/src/sources/scan_task.rs +++ b/src/daft-local-execution/src/sources/scan_task.rs @@ -1,7 +1,18 @@ -use daft_io::IOStatsContext; +use common_error::DaftResult; +use daft_core::schema::SchemaRef; +use daft_csv::{CsvConvertOptions, CsvParseOptions, CsvReadOptions}; +use daft_io::{IOStatsContext, IOStatsRef}; +use daft_json::{JsonConvertOptions, JsonParseOptions, JsonReadOptions}; use daft_micropartition::MicroPartition; -use daft_scan::ScanTask; -use futures::StreamExt; +use daft_parquet::read::ParquetSchemaInferenceOptions; +use daft_scan::{ + file_format::{FileFormatConfig, ParquetSourceConfig}, + storage_config::StorageConfig, + ChunkSpec, ScanTask, +}; +use daft_stats::{PartitionSpec, TableStatistics}; +use daft_table::Table; +use futures::{stream::BoxStream, StreamExt}; use std::sync::Arc; use crate::{ @@ -32,10 +43,11 @@ impl ScanTaskSource { scan_task: Arc, sender: SingleSender, morsel_size: usize, + maintain_order: bool, ) { - let io_stats = IOStatsContext::new("MicroPartition::from_scan_task"); + let io_stats = IOStatsContext::new("StreamScanTask"); let stream_result = - MicroPartition::from_scan_task_streaming(scan_task, io_stats, morsel_size).await; + stream_scan_task(scan_task, Some(io_stats), maintain_order, morsel_size).await; match stream_result { Ok(mut stream) => { while let Some(partition) = stream.next().await { @@ -43,7 +55,7 @@ impl ScanTaskSource { } } Err(e) => { - let _ = sender.send(Err(e.into())).await; + let _ = sender.send(Err(e)).await; } } } @@ -51,7 +63,7 @@ impl ScanTaskSource { impl Source for ScanTaskSource { #[instrument(name = "ScanTaskSource::get_data", level = "info", skip(self))] - fn get_data(&self) -> SourceStream { + fn get_data(&self, maintain_order: bool) -> SourceStream { let morsel_size = DEFAULT_MORSEL_SIZE; let (mut sender, mut receiver) = create_channel(self.scan_tasks.len(), true); for scan_task in self.scan_tasks.clone() { @@ -59,6 +71,7 @@ impl Source for ScanTaskSource { scan_task, sender.get_next_sender(), morsel_size, + maintain_order, )); } Box::pin(async_stream::stream! { @@ -68,3 +81,245 @@ impl Source for ScanTaskSource { }) } } + +async fn stream_scan_task( + scan_task: Arc, + io_stats: Option, + maintain_order: bool, + morsel_size: usize, +) -> DaftResult> { + let pushdown_columns = scan_task + .pushdowns + .columns + .as_ref() + .map(|v| v.iter().map(|s| s.as_str()).collect::>()); + + let file_column_names = match ( + pushdown_columns, + scan_task.partition_spec().map(|ps| ps.to_fill_map()), + ) { + (None, _) => None, + (Some(columns), None) => Some(columns.to_vec()), + + // If the ScanTask has a partition_spec, we elide reads of partition columns from the file + (Some(columns), Some(partition_fillmap)) => Some( + columns + .iter() + .filter_map(|s| { + if partition_fillmap.contains_key(s) { + None + } else { + Some(*s) + } + }) + .collect::>(), + ), + }; + + if scan_task.sources.len() != 1 { + return Err(common_error::DaftError::TypeError( + "Streaming reads only supported for single source ScanTasks".to_string(), + )); + } + let source = scan_task.sources.first().unwrap(); + let url = source.get_path(); + let table_stream = match scan_task.storage_config.as_ref() { + StorageConfig::Native(native_storage_config) => { + let io_config = Arc::new( + native_storage_config + .io_config + .as_ref() + .cloned() + .unwrap_or_default(), + ); + let multi_threaded_io = native_storage_config.multithreaded_io; + let io_client = daft_io::get_io_client_async(multi_threaded_io, io_config).await?; + + match scan_task.file_format_config.as_ref() { + // ******************** + // Native Parquet Reads + // ******************** + FileFormatConfig::Parquet(ParquetSourceConfig { + coerce_int96_timestamp_unit, + field_id_mapping, + .. + }) => { + let inference_options = + ParquetSchemaInferenceOptions::new(Some(*coerce_int96_timestamp_unit)); + + if source.get_iceberg_delete_files().is_some() { + return Err(common_error::DaftError::TypeError( + "Streaming reads not supported for Iceberg delete files".to_string(), + )); + } + + let row_groups = + if let Some(ChunkSpec::Parquet(row_groups)) = source.get_chunk_spec() { + Some(row_groups.clone()) + } else { + None + }; + let metadata = scan_task + .sources + .first() + .and_then(|s| s.get_parquet_metadata().cloned()); + daft_parquet::read::stream_parquet( + url, + file_column_names.as_deref(), + None, + scan_task.pushdowns.limit, + row_groups, + scan_task.pushdowns.filters.clone(), + io_client.clone(), + io_stats, + &inference_options, + field_id_mapping.clone(), + metadata, + maintain_order, + ) + .await? + } + + // **************** + // Native CSV Reads + // **************** + FileFormatConfig::Csv(cfg) => { + let schema_of_file = scan_task.schema.clone(); + let col_names = if !cfg.has_headers { + Some( + schema_of_file + .fields + .values() + .map(|f| f.name.as_str()) + .collect::>(), + ) + } else { + None + }; + let convert_options = CsvConvertOptions::new_internal( + scan_task.pushdowns.limit, + file_column_names + .as_ref() + .map(|cols| cols.iter().map(|col| col.to_string()).collect()), + col_names + .as_ref() + .map(|cols| cols.iter().map(|col| col.to_string()).collect()), + Some(schema_of_file), + scan_task.pushdowns.filters.clone(), + ); + let parse_options = CsvParseOptions::new_with_defaults( + cfg.has_headers, + cfg.delimiter, + cfg.double_quote, + cfg.quote, + cfg.allow_variable_columns, + cfg.escape_char, + cfg.comment, + )?; + let read_options = + CsvReadOptions::new_internal(cfg.buffer_size, cfg.chunk_size); + daft_csv::stream_csv( + url.to_string(), + Some(convert_options), + Some(parse_options), + Some(read_options), + io_client.clone(), + io_stats.clone(), + None, + // maintain_order, TODO: Implement maintain_order for CSV + ) + .await? + } + + // **************** + // Native JSON Reads + // **************** + FileFormatConfig::Json(cfg) => { + let schema_of_file = scan_task.schema.clone(); + let convert_options = JsonConvertOptions::new_internal( + scan_task.pushdowns.limit, + file_column_names + .as_ref() + .map(|cols| cols.iter().map(|col| col.to_string()).collect()), + Some(schema_of_file), + scan_task.pushdowns.filters.clone(), + ); + // let + let parse_options = JsonParseOptions::new_internal(); + let read_options = + JsonReadOptions::new_internal(cfg.buffer_size, cfg.chunk_size); + + daft_json::read::stream_json( + url.to_string(), + Some(convert_options), + Some(parse_options), + Some(read_options), + io_client, + io_stats, + None, + // maintain_order, TODO: Implement maintain_order for JSON + ) + .await? + } + #[cfg(feature = "python")] + FileFormatConfig::Database(_) => { + return Err(common_error::DaftError::TypeError( + "Native reads for Database file format not implemented".to_string(), + )); + } + #[cfg(feature = "python")] + FileFormatConfig::PythonFunction => { + return Err(common_error::DaftError::TypeError( + "Native reads for PythonFunction file format not implemented".to_string(), + )); + } + } + } + #[cfg(feature = "python")] + StorageConfig::Python(_) => { + return Err(common_error::DaftError::TypeError( + "Streaming reads not supported for Python storage config".to_string(), + )); + } + }; + + let mp_stream = chunk_tables_into_micropartition_stream( + table_stream, + scan_task.materialized_schema(), + scan_task.partition_spec().cloned(), + scan_task.statistics.clone(), + morsel_size, + ); + Ok(Box::pin(mp_stream)) +} + +fn chunk_tables_into_micropartition_stream( + mut table_stream: BoxStream<'static, DaftResult>, + schema: SchemaRef, + partition_spec: Option, + statistics: Option, + morsel_size: usize, +) -> SourceStream<'static> { + let chunked_stream = async_stream::try_stream! { + let mut buffer = vec![]; + let mut total_rows = 0; + while let Some(table) = table_stream.next().await { + let table = table?; + let casted_table = table.cast_to_schema_with_fill(schema.as_ref(), partition_spec.as_ref().map(|pspec| pspec.to_fill_map()).as_ref())?; + total_rows += casted_table.len(); + buffer.push(casted_table); + + if total_rows >= morsel_size { + let mp = Arc::new(MicroPartition::new_loaded(schema.clone(), Arc::new(buffer), statistics.clone())); + buffer = vec![]; + total_rows = 0; + yield mp; + } + } + if !buffer.is_empty() { + let mp = Arc::new(MicroPartition::new_loaded(schema, Arc::new(buffer), statistics)); + yield mp; + } + }; + Box::pin(chunked_stream) +} diff --git a/src/daft-local-execution/src/sources/source.rs b/src/daft-local-execution/src/sources/source.rs index bd9e033e3d..a209ea0876 100644 --- a/src/daft-local-execution/src/sources/source.rs +++ b/src/daft-local-execution/src/sources/source.rs @@ -10,7 +10,7 @@ use crate::channel::MultiSender; pub type SourceStream<'a> = BoxStream<'a, DaftResult>>; pub trait Source: Send + Sync { - fn get_data(&self) -> SourceStream; + fn get_data(&self, maintain_order: bool) -> SourceStream; } pub struct SourceActor { @@ -24,8 +24,8 @@ impl SourceActor { } #[instrument(level = "info", skip(self), name = "SourceActor::run")] - pub async fn run(&mut self) -> DaftResult<()> { - let mut source_stream = self.source.get_data(); + pub async fn run(&mut self, maintain_order: bool) -> DaftResult<()> { + let mut source_stream = self.source.get_data(maintain_order); while let Some(val) = source_stream.next().in_current_span().await { let _ = self.sender.get_next_sender().send(val).await; } @@ -33,10 +33,11 @@ impl SourceActor { } } pub fn run_source(source: Arc, sender: MultiSender) { + let maintain_order = sender.in_order(); let mut actor = SourceActor::new(source, sender); tokio::spawn( async move { - let _ = actor.run().in_current_span().await; + let _ = actor.run(maintain_order).in_current_span().await; } .in_current_span(), ); diff --git a/src/daft-micropartition/Cargo.toml b/src/daft-micropartition/Cargo.toml index a82110931b..43bc70b7e9 100644 --- a/src/daft-micropartition/Cargo.toml +++ b/src/daft-micropartition/Cargo.toml @@ -1,6 +1,5 @@ [dependencies] arrow2 = {workspace = true} -async-stream = {workspace = true} bincode = {workspace = true} common-error = {path = "../common/error", default-features = false} daft-core = {path = "../daft-core", default-features = false} @@ -12,7 +11,6 @@ daft-parquet = {path = "../daft-parquet", default-features = false} daft-scan = {path = "../daft-scan", default-features = false} daft-stats = {path = "../daft-stats", default-features = false} daft-table = {path = "../daft-table", default-features = false} -futures = {workspace = true} parquet2 = {workspace = true} pyo3 = {workspace = true, optional = true} snafu = {workspace = true} diff --git a/src/daft-micropartition/src/micropartition.rs b/src/daft-micropartition/src/micropartition.rs index 2515603b82..3c84acac4e 100644 --- a/src/daft-micropartition/src/micropartition.rs +++ b/src/daft-micropartition/src/micropartition.rs @@ -20,8 +20,6 @@ use daft_scan::storage_config::{NativeStorageConfig, StorageConfig}; use daft_scan::{ChunkSpec, DataSource, Pushdowns, ScanTask}; use daft_table::Table; -use futures::stream::BoxStream; -use futures::{Stream, StreamExt}; use parquet2::metadata::FileMetaData; use snafu::ResultExt; @@ -505,177 +503,6 @@ fn materialize_scan_task( Ok((table_values, cast_to_schema)) } -async fn stream_scan_task( - scan_task: Arc, - io_stats: Option, -) -> crate::Result>>> { - let pushdown_columns = scan_task - .pushdowns - .columns - .as_ref() - .map(|v| v.iter().map(|s| s.as_str()).collect::>()); - let file_column_names = - _get_file_column_names(pushdown_columns.as_deref(), scan_task.partition_spec()); - - if scan_task.sources.len() != 1 { - return Err(common_error::DaftError::TypeError( - "Streaming reads only supported for single source ScanTasks".to_string(), - )) - .context(DaftCoreComputeSnafu); - } - let url = scan_task.sources.first().map(|s| s.get_path()).unwrap(); - let table_values = match scan_task.storage_config.as_ref() { - StorageConfig::Native(native_storage_config) => { - let io_config = Arc::new( - native_storage_config - .io_config - .as_ref() - .cloned() - .unwrap_or_default(), - ); - let multi_threaded_io = native_storage_config.multithreaded_io; - let io_client = daft_io::get_io_client_async(multi_threaded_io, io_config) - .await - .unwrap(); - - match scan_task.file_format_config.as_ref() { - // ******************** - // Native Parquet Reads - // ******************** - FileFormatConfig::Parquet(ParquetSourceConfig { - coerce_int96_timestamp_unit: _, - field_id_mapping: _, - .. - }) => { - todo!("Implement streaming reads for Parquet") - } - - // **************** - // Native CSV Reads - // **************** - FileFormatConfig::Csv(cfg) => { - let schema_of_file = scan_task.schema.clone(); - let col_names = if !cfg.has_headers { - Some( - schema_of_file - .fields - .values() - .map(|f| f.name.as_str()) - .collect::>(), - ) - } else { - None - }; - let convert_options = CsvConvertOptions::new_internal( - scan_task.pushdowns.limit, - file_column_names - .as_ref() - .map(|cols| cols.iter().map(|col| col.to_string()).collect()), - col_names - .as_ref() - .map(|cols| cols.iter().map(|col| col.to_string()).collect()), - Some(schema_of_file), - scan_task.pushdowns.filters.clone(), - ); - let parse_options = CsvParseOptions::new_with_defaults( - cfg.has_headers, - cfg.delimiter, - cfg.double_quote, - cfg.quote, - cfg.allow_variable_columns, - cfg.escape_char, - cfg.comment, - ) - .context(DaftCSVSnafu)?; - let read_options = - CsvReadOptions::new_internal(cfg.buffer_size, cfg.chunk_size); - daft_csv::stream_csv( - url.to_string(), - Some(convert_options), - Some(parse_options), - Some(read_options), - io_client.clone(), - io_stats.clone(), - None, - ) - .await - .context(DaftCoreComputeSnafu)? - } - - // **************** - // Native JSON Reads - // **************** - FileFormatConfig::Json(cfg) => { - let schema_of_file = scan_task.schema.clone(); - let convert_options = JsonConvertOptions::new_internal( - scan_task.pushdowns.limit, - file_column_names - .as_ref() - .map(|cols| cols.iter().map(|col| col.to_string()).collect()), - Some(schema_of_file), - scan_task.pushdowns.filters.clone(), - ); - // let - let parse_options = JsonParseOptions::new_internal(); - let read_options = - JsonReadOptions::new_internal(cfg.buffer_size, cfg.chunk_size); - - daft_json::read::stream_json( - url.to_string(), - Some(convert_options), - Some(parse_options), - Some(read_options), - io_client, - io_stats, - None, - ) - .await - .context(DaftCoreComputeSnafu)? - } - #[cfg(feature = "python")] - FileFormatConfig::Database(_) => { - return Err(common_error::DaftError::TypeError( - "Native reads for Database file format not implemented".to_string(), - )) - .context(DaftCoreComputeSnafu); - } - #[cfg(feature = "python")] - FileFormatConfig::PythonFunction => { - return Err(common_error::DaftError::TypeError( - "Native reads for PythonFunction file format not implemented".to_string(), - )) - .context(DaftCoreComputeSnafu); - } - } - } - #[cfg(feature = "python")] - StorageConfig::Python(_) => match scan_task.file_format_config.as_ref() { - FileFormatConfig::Parquet(ParquetSourceConfig { - coerce_int96_timestamp_unit: _, - .. - }) => todo!("Implement streaming reads for Parquet with Python Storage Config"), - FileFormatConfig::Csv(CsvSourceConfig { - has_headers: _, - delimiter: _, - double_quote: _, - .. - }) => todo!("Implement streaming reads for CSV with Python Storage Config"), - FileFormatConfig::Json(_) => { - todo!("Implement streaming reads for JSON with Python Storage Config") - } - FileFormatConfig::Database(daft_scan::file_format::DatabaseSourceConfig { - sql: _, - conn: _, - }) => todo!("Implement streaming reads for Database"), - FileFormatConfig::PythonFunction => { - todo!("Implement streaming reads for PythonFunction") - } - }, - }; - - Ok(table_values) -} - impl MicroPartition { /// Create a new "unloaded" MicroPartition using an associated [`ScanTask`] /// @@ -836,67 +663,6 @@ impl MicroPartition { } } - pub async fn from_scan_task_streaming( - scan_task: Arc, - io_stats: IOStatsRef, - morsel_size: usize, - ) -> crate::Result>>> { - let schema = scan_task.materialized_schema(); - match ( - &scan_task.metadata, - &scan_task.statistics, - scan_task.file_format_config.as_ref(), - scan_task.storage_config.as_ref(), - ) { - // CASE: ScanTask provides all required metadata. - // If the scan_task provides metadata (e.g. retrieved from a catalog) we can use it to create an unloaded MicroPartition - (Some(metadata), Some(statistics), _, _) if scan_task.pushdowns.filters.is_none() => { - let unloaded = Ok(Arc::new(Self::new_unloaded( - scan_task.clone(), - scan_task - .pushdowns - .limit - .map(|limit| TableMetadata { - length: metadata.length.min(limit), - }) - .unwrap_or_else(|| metadata.clone()), - statistics.clone(), - ))); - Ok(Box::pin(futures::stream::iter(std::iter::once(unloaded)))) - } - - // CASE: ScanTask does not provide metadata, but the file format supports metadata retrieval - // We can perform an eager **metadata** read to create an unloaded MicroPartition - ( - _, - _, - FileFormatConfig::Parquet(ParquetSourceConfig { - coerce_int96_timestamp_unit: _, - field_id_mapping: _, - .. - }), - StorageConfig::Native(_cfg), - ) => { - todo!() - } - - // CASE: Last resort fallback option - // Perform an eager **data** read - _ => { - let statistics = scan_task.statistics.clone(); - let stream = stream_scan_task(scan_task.clone(), Some(io_stats)).await?; - let stream = chunk_tables_into_micropartition_stream( - stream, - schema, - scan_task.partition_spec().cloned(), - statistics, - morsel_size, - ); - Ok(Box::pin(stream)) - } - } - } - pub fn empty(schema: Option) -> Self { let schema = schema.unwrap_or(Schema::empty().into()); Self::new_loaded(schema, Arc::new(vec![]), None) @@ -1011,37 +777,6 @@ impl MicroPartition { } } -fn chunk_tables_into_micropartition_stream( - mut table_stream: BoxStream<'static, DaftResult>>, - schema: SchemaRef, - partition_spec: Option, - statistics: Option, - morsel_size: usize, -) -> impl Stream>> + Send { - async_stream::try_stream! { - let mut buffer = vec![]; - let mut total_rows = 0; - while let Some(tables) = table_stream.next().await { - let tables = tables?; - for table in tables { - let casted_table = table.cast_to_schema_with_fill(schema.as_ref(), partition_spec.as_ref().map(|pspec| pspec.to_fill_map()).as_ref())?; - total_rows += casted_table.len(); - buffer.push(casted_table); - } - if total_rows >= morsel_size { - let mp = Arc::new(MicroPartition::new_loaded(schema.clone(), Arc::new(buffer), statistics.clone())); - buffer = vec![]; - total_rows = 0; - yield mp; - } - } - if !buffer.is_empty() { - let mp = Arc::new(MicroPartition::new_loaded(schema, Arc::new(buffer), statistics)); - yield mp; - } - } -} - fn prune_fields_from_schema( schema: Arc, columns: Option<&[&str]>, diff --git a/src/daft-parquet/Cargo.toml b/src/daft-parquet/Cargo.toml index 9d1125d203..ab2ee1efab 100644 --- a/src/daft-parquet/Cargo.toml +++ b/src/daft-parquet/Cargo.toml @@ -4,6 +4,7 @@ async-compat = {workspace = true} async-stream = {workspace = true} bytes = {workspace = true} common-error = {path = "../common/error", default-features = false} +crossbeam-channel = "0.5.1" daft-core = {path = "../daft-core", default-features = false} daft-dsl = {path = "../daft-dsl", default-features = false} daft-io = {path = "../daft-io", default-features = false} diff --git a/src/daft-parquet/src/file.rs b/src/daft-parquet/src/file.rs index d38e69e774..fc57e87607 100644 --- a/src/daft-parquet/src/file.rs +++ b/src/daft-parquet/src/file.rs @@ -12,7 +12,7 @@ use daft_dsl::ExprRef; use daft_io::{IOClient, IOStatsRef}; use daft_stats::TruthValue; use daft_table::Table; -use futures::{future::try_join_all, StreamExt}; +use futures::{future::try_join_all, stream::BoxStream, StreamExt}; use parquet2::{ page::{CompressedPage, Page}, read::get_owned_page_stream_from_column_start, @@ -369,6 +369,13 @@ impl ParquetFileReader { read_planner.collect(io_client, io_stats) } + pub async fn read_from_ranges_into_table_stream( + self, + _ranges: Arc, + ) -> BoxStream<'static, DaftResult
> { + todo!("Implement streaming reads for remote parquet files") + } + pub async fn read_from_ranges_into_table( self, ranges: Arc, diff --git a/src/daft-parquet/src/lib.rs b/src/daft-parquet/src/lib.rs index cb3949c1c7..d1057e95f7 100644 --- a/src/daft-parquet/src/lib.rs +++ b/src/daft-parquet/src/lib.rs @@ -51,6 +51,12 @@ pub enum Error { source: arrow2::error::Error, }, + #[snafu(display("Unable to read parquet row group for file {}: {}", path, source))] + UnableToReadParquetRowGroup { + path: String, + source: arrow2::error::Error, + }, + #[snafu(display("Unable to create page stream for parquet file {}: {}", path, source))] UnableToCreateParquetPageStream { path: String, @@ -74,7 +80,12 @@ pub enum Error { path: String, source: arrow2::error::Error, }, - + #[snafu(display( + "Unable to create table from arrow chunk for file {}: {}", + path, + source + ))] + UnableToCreateTableFromChunk { path: String, source: DaftError }, #[snafu(display( "Unable to convert arrow schema to daft schema for file {}: {}", path, diff --git a/src/daft-parquet/src/read.rs b/src/daft-parquet/src/read.rs index a095971924..3332d073eb 100644 --- a/src/daft-parquet/src/read.rs +++ b/src/daft-parquet/src/read.rs @@ -17,7 +17,8 @@ use daft_io::{get_runtime, parse_url, IOClient, IOStatsRef, SourceType}; use daft_table::Table; use futures::{ future::{join_all, try_join_all}, - StreamExt, TryStreamExt, + stream::BoxStream, + Stream, StreamExt, TryStreamExt, }; use itertools::Itertools; use parquet2::metadata::FileMetaData; @@ -320,6 +321,144 @@ async fn read_parquet_single( Ok(table) } +#[allow(clippy::too_many_arguments)] +async fn stream_parquet_single( + uri: String, + columns: Option<&[&str]>, + start_offset: Option, + num_rows: Option, + row_groups: Option>, + predicate: Option, + io_client: Arc, + io_stats: Option, + schema_infer_options: ParquetSchemaInferenceOptions, + field_id_mapping: Option>>, + metadata: Option>, + maintain_order: bool, +) -> DaftResult> + Send> { + let field_id_mapping_provided = field_id_mapping.is_some(); + let columns_to_return = columns.map(|s| s.iter().map(|s| s.to_string()).collect_vec()); + let num_rows_to_return = num_rows; + let mut num_rows_to_read = num_rows; + let mut columns_to_read = columns.map(|s| s.iter().map(|s| s.to_string()).collect_vec()); + let requested_columns = columns_to_read.as_ref().map(|v| v.len()); + if let Some(ref pred) = predicate { + num_rows_to_read = None; + + if let Some(req_columns) = columns_to_read.as_mut() { + let needed_columns = get_required_columns(pred); + for c in needed_columns { + if !req_columns.contains(&c) { + req_columns.push(c); + } + } + } + } + + let (source_type, fixed_uri) = parse_url(uri.as_str())?; + + let (metadata, table_stream) = if matches!(source_type, SourceType::File) { + crate::stream_reader::local_parquet_stream( + fixed_uri.as_ref(), + columns_to_return, + columns_to_read, + start_offset, + num_rows_to_return, + num_rows_to_read, + row_groups.clone(), + predicate.clone(), + schema_infer_options, + metadata, + maintain_order, + ) + } else { + let builder = ParquetReaderBuilder::from_uri( + uri.as_str(), + io_client.clone(), + io_stats.clone(), + field_id_mapping, + ) + .await?; + let builder = builder.set_infer_schema_options(schema_infer_options); + + let builder = if let Some(columns) = columns_to_read.as_ref() { + builder.prune_columns(columns.as_slice())? + } else { + builder + }; + + if row_groups.is_some() && (num_rows_to_read.is_some() || start_offset.is_some()) { + return Err(common_error::DaftError::ValueError("Both `row_groups` and `num_rows` or `start_offset` is set at the same time. We only support setting one set or the other.".to_string())); + } + let builder = builder.limit(start_offset, num_rows_to_read)?; + let metadata = builder.metadata().clone(); + + let builder = if let Some(ref row_groups) = row_groups { + builder.set_row_groups(row_groups)? + } else { + builder + }; + + let builder = if let Some(ref predicate) = predicate { + builder.set_filter(predicate.clone()) + } else { + builder + }; + + let parquet_reader = builder.build()?; + let ranges = parquet_reader.prebuffer_ranges(io_client, io_stats)?; + Ok(( + Arc::new(metadata), + parquet_reader + .read_from_ranges_into_table_stream(ranges) + .await, + )) + }?; + + let metadata_num_columns = metadata.schema().fields().len(); + let mut remaining_rows = num_rows_to_return.map(|limit| limit as i64); + let finalized_table_stream = table_stream + .map(move |table| { + let table = table?; + + let expected_num_columns = if let Some(columns) = requested_columns { + columns + } else { + metadata_num_columns + }; + + if (!field_id_mapping_provided + && requested_columns.is_none() + && table.num_columns() != expected_num_columns) + || (field_id_mapping_provided && table.num_columns() > expected_num_columns) + || (requested_columns.is_some() && table.num_columns() > expected_num_columns) + { + return Err(super::Error::ParquetNumColumnMismatch { + path: uri.to_string(), + metadata_num_columns: expected_num_columns, + read_columns: table.num_columns(), + } + .into()); + } + DaftResult::Ok(table) + }) + .try_take_while(move |table| { + match remaining_rows { + // Limit has been met, early-terminate. + Some(rows_left) if rows_left <= 0 => futures::future::ready(Ok(false)), + // Limit has not yet been met, update remaining limit slack and continue. + Some(rows_left) => { + remaining_rows = Some(rows_left - table.len() as i64); + futures::future::ready(Ok(true)) + } + // No limit, never early-terminate. + None => futures::future::ready(Ok(true)), + } + }); + + Ok(finalized_table_stream) +} + #[allow(clippy::too_many_arguments)] async fn read_parquet_single_into_arrow( uri: &str, @@ -501,6 +640,9 @@ pub fn read_parquet( }) } pub type ArrowChunk = Vec>; +pub type ArrowChunkIters = Vec< + Box>> + Send + Sync>, +>; pub type ParquetPyarrowChunk = (arrow2::datatypes::SchemaRef, Vec, usize); #[allow(clippy::too_many_arguments)] pub fn read_parquet_into_pyarrow( @@ -622,6 +764,39 @@ pub fn read_parquet_bulk( Ok(collected.into_iter().map(|(_, v)| v).collect()) } +#[allow(clippy::too_many_arguments)] +pub async fn stream_parquet( + uri: &str, + columns: Option<&[&str]>, + start_offset: Option, + num_rows: Option, + row_groups: Option>, + predicate: Option, + io_client: Arc, + io_stats: Option, + schema_infer_options: &ParquetSchemaInferenceOptions, + field_id_mapping: Option>>, + metadata: Option>, + maintain_order: bool, +) -> DaftResult>> { + let stream = stream_parquet_single( + uri.to_string(), + columns, + start_offset, + num_rows, + row_groups, + predicate, + io_client, + io_stats, + *schema_infer_options, + field_id_mapping, + metadata, + maintain_order, + ) + .await?; + Ok(Box::pin(stream)) +} + #[allow(clippy::too_many_arguments)] pub fn read_parquet_into_pyarrow_bulk( uris: &[&str], diff --git a/src/daft-parquet/src/stream_reader.rs b/src/daft-parquet/src/stream_reader.rs index e0d03c9242..3530c2d3e6 100644 --- a/src/daft-parquet/src/stream_reader.rs +++ b/src/daft-parquet/src/stream_reader.rs @@ -2,16 +2,24 @@ use std::{collections::HashSet, fs::File, sync::Arc}; use arrow2::io::parquet::read; use common_error::DaftResult; -use daft_core::{schema::Schema, utils::arrow::cast_array_for_daft_if_needed, Series}; +use daft_core::{ + schema::{Schema, SchemaRef}, + utils::arrow::cast_array_for_daft_if_needed, + Series, +}; use daft_dsl::ExprRef; use daft_table::Table; +use futures::{stream::BoxStream, StreamExt}; use itertools::Itertools; -use rayon::prelude::{IndexedParallelIterator, IntoParallelIterator, ParallelBridge}; +use rayon::{ + iter::IntoParallelRefMutIterator, + prelude::{IndexedParallelIterator, IntoParallelIterator, ParallelBridge}, +}; use snafu::ResultExt; use crate::{ - file::build_row_ranges, - read::{ArrowChunk, ParquetSchemaInferenceOptions}, + file::{build_row_ranges, RowGroupRange}, + read::{ArrowChunk, ArrowChunkIters, ParquetSchemaInferenceOptions}, UnableToConvertSchemaToDaftSnafu, }; @@ -40,6 +48,104 @@ fn prune_fields_from_schema( } } +#[allow(clippy::too_many_arguments)] +pub(crate) fn local_parquet_read_into_column_iters( + uri: &str, + columns: Option<&[String]>, + start_offset: Option, + num_rows: Option, + row_groups: Option<&[i64]>, + predicate: Option, + schema_infer_options: ParquetSchemaInferenceOptions, + metadata: Option>, + chunk_size: usize, +) -> super::Result<( + Arc, + SchemaRef, + Vec, + impl Iterator>, +)> { + const LOCAL_PROTOCOL: &str = "file://"; + let uri = uri + .strip_prefix(LOCAL_PROTOCOL) + .map(|s| s.to_string()) + .unwrap_or(uri.to_string()); + + let mut reader = File::open(uri.clone()).with_context(|_| super::InternalIOSnafu { + path: uri.to_string(), + })?; + let size = reader + .metadata() + .with_context(|_| super::InternalIOSnafu { + path: uri.to_string(), + })? + .len(); + + if size < 12 { + return Err(super::Error::FileTooSmall { + path: uri, + file_size: size as usize, + }); + } + + let metadata = match metadata { + Some(m) => m, + None => read::read_metadata(&mut reader) + .map(Arc::new) + .with_context(|_| super::UnableToParseMetadataFromLocalFileSnafu { + path: uri.to_string(), + })?, + }; + + let schema = infer_schema_with_options(&metadata, &Some(schema_infer_options.into())) + .with_context(|_| super::UnableToParseSchemaFromMetadataSnafu { + path: uri.to_string(), + })?; + let schema = prune_fields_from_schema(schema, columns)?; + let daft_schema = + Schema::try_from(&schema).with_context(|_| UnableToConvertSchemaToDaftSnafu { + path: uri.to_string(), + })?; + + let row_ranges = build_row_ranges( + num_rows, + start_offset.unwrap_or(0), + row_groups, + predicate.clone(), + &daft_schema, + &metadata, + &uri, + )?; + + let all_row_groups = metadata.row_groups.clone(); + + // Read all the required row groups into memory sequentially + let column_iters_per_rg = row_ranges.clone().into_iter().map(move |rg_range| { + let rg_metadata = all_row_groups.get(rg_range.row_group_index).unwrap(); + + // This operation is IO-bounded O(C) where C is the number of columns in the row group. + // It reads all the columns to memory from the row group associated to the requested fields, + // and returns a Vec of iterators that perform decompression and deserialization for each column. + let single_rg_column_iter = read::read_columns_many( + &mut reader, + rg_metadata, + schema.fields.clone(), + Some(chunk_size), + Some(rg_range.num_rows), + None, + ) + .with_context(|_| super::UnableToReadParquetRowGroupSnafu { path: uri.clone() })?; + Ok(single_rg_column_iter) + }); + + Ok(( + metadata, + Arc::new(daft_schema), + row_ranges, + column_iters_per_rg, + )) +} + #[allow(clippy::too_many_arguments)] pub(crate) fn local_parquet_read_into_arrow( uri: &str, @@ -239,6 +345,159 @@ pub(crate) async fn local_parquet_read_async( recv.await.context(super::OneShotRecvSnafu {})? } +#[allow(clippy::too_many_arguments)] +pub(crate) fn local_parquet_stream( + uri: &str, + original_columns: Option>, + columns: Option>, + start_offset: Option, + original_num_rows: Option, + num_rows: Option, + row_groups: Option>, + predicate: Option, + schema_infer_options: ParquetSchemaInferenceOptions, + metadata: Option>, + maintain_order: bool, +) -> DaftResult<( + Arc, + BoxStream<'static, DaftResult
>, +)> { + let chunk_size = 128 * 1024; + let (metadata, schema_ref, row_ranges, column_iters) = local_parquet_read_into_column_iters( + uri, + columns.as_deref(), + start_offset, + num_rows, + row_groups.as_deref(), + predicate.clone(), + schema_infer_options, + metadata, + chunk_size, + )?; + + // Create a channel for each row group to send the processed tables to the stream + // Each channel is expected to have a number of chunks equal to the number of chunks in the row group + let (senders, receivers): (Vec<_>, Vec<_>) = row_ranges + .iter() + .map(|rg_range| { + let expected_num_chunks = + f32::ceil(rg_range.num_rows as f32 / chunk_size as f32) as usize; + crossbeam_channel::bounded(expected_num_chunks) + }) + .unzip(); + + let uri = uri.to_string(); + rayon::spawn(move || { + // Once a row group has been read into memory and we have the column iterators, + // we can start processing them in parallel. + let par_column_iters = column_iters.zip(senders).zip(row_ranges).par_bridge(); + + // For each vec of column iters, iterate through them in parallel lock step such that each iteration + // produces a chunk of the row group that can be converted into a table. + par_column_iters.for_each(move |((rg_col_iter_result, tx), rg_range)| { + let rg_col_iter = match rg_col_iter_result { + Ok(iter) => iter, + Err(e) => { + if let Err(crossbeam_channel::TrySendError::Full(_)) = + tx.try_send(Err(e.into())) + { + panic!("Parquet stream channel should not be full") + } + return; + } + }; + let owned_schema_ref = schema_ref.clone(); + let owned_predicate = predicate.clone(); + let owned_original_columns = original_columns.clone(); + let owned_uri = uri.clone(); + + struct ParallelLockStepIter { + iters: ArrowChunkIters, + } + impl Iterator for ParallelLockStepIter { + type Item = arrow2::error::Result; + + fn next(&mut self) -> Option { + self.iters.par_iter_mut().map(|iter| iter.next()).collect() + } + } + let par_lock_step_iter = ParallelLockStepIter { iters: rg_col_iter }; + + // Keep track of the current index in the row group so we can throw away arrays that are not needed + // and slice arrays that are partially needed. + let mut index_so_far = 0; + let table_iter = par_lock_step_iter.into_iter().map(move |chunk| { + let chunk = chunk.with_context(|_| { + super::UnableToCreateChunkFromStreamingFileReaderSnafu { + path: owned_uri.clone(), + } + })?; + let all_series = chunk + .into_iter() + .zip(owned_schema_ref.fields.clone()) + .filter_map(|(mut arr, (f_name, _))| { + if (index_so_far + arr.len()) < rg_range.start { + // No need to process arrays that are less than the start offset + return None; + } + if index_so_far < rg_range.start { + // Slice arrays that are partially needed + let offset = rg_range.start.saturating_sub(index_so_far); + arr = arr.sliced(offset, arr.len() - offset); + } + let series_result = + Series::try_from((f_name.as_str(), cast_array_for_daft_if_needed(arr))); + Some(series_result) + }) + .collect::>>()?; + + let len = all_series[0].len(); + if all_series.iter().any(|s| s.len() != len) { + return Err(super::Error::ParquetColumnsDontHaveEqualRows { + path: owned_uri.clone(), + } + .into()); + } + index_so_far += len; + + let mut table = Table::new_with_size(owned_schema_ref.clone(), all_series, len) + .with_context(|_| super::UnableToCreateTableFromChunkSnafu { + path: owned_uri.clone(), + })?; + + // Apply pushdowns if needed + if let Some(predicate) = &owned_predicate { + table = table.filter(&[predicate.clone()])?; + if let Some(oc) = &owned_original_columns { + table = table.get_columns(oc)?; + } + if let Some(nr) = original_num_rows { + table = table.head(nr)?; + } + } + DaftResult::Ok(table) + }); + + for table_result in table_iter { + let table_err = table_result.is_err(); + if let Err(crossbeam_channel::TrySendError::Full(_)) = tx.try_send(table_result) { + panic!("Parquet stream channel should not be full") + } + if table_err { + break; + } + } + }); + }); + + let result_stream = futures::stream::iter(receivers.into_iter().map(futures::stream::iter)); + + match maintain_order { + true => Ok((metadata, Box::pin(result_stream.flatten()))), + false => Ok((metadata, Box::pin(result_stream.flatten_unordered(None)))), + } +} + #[allow(clippy::too_many_arguments)] pub(crate) async fn local_parquet_read_into_arrow_async( uri: &str, diff --git a/src/daft-physical-plan/src/translate.rs b/src/daft-physical-plan/src/translate.rs index 5a7be50259..6d33d8453a 100644 --- a/src/daft-physical-plan/src/translate.rs +++ b/src/daft-physical-plan/src/translate.rs @@ -16,7 +16,7 @@ pub fn translate(plan: &LogicalPlanRef) -> DaftResult { let scan_tasks = scan_tasks_iter.collect::>>()?; Ok(LocalPhysicalPlan::physical_scan( scan_tasks, - info.source_schema.clone(), + source.output_schema.clone(), )) } SourceInfo::PlaceHolder(_) => { diff --git a/tests/benchmarks/test_local_tpch.py b/tests/benchmarks/test_local_tpch.py index 03ae7c64d4..bad174fdb8 100644 --- a/tests/benchmarks/test_local_tpch.py +++ b/tests/benchmarks/test_local_tpch.py @@ -28,36 +28,41 @@ def gen_tpch(request): num_parts = request.param csv_files_location = data_generation.gen_csv_files(TPCH_DBGEN_DIR, num_parts, SCALE_FACTOR) + parquet_files_location = data_generation.gen_parquet(csv_files_location) sqlite_path = data_generation.gen_sqlite_db( csv_filepath=csv_files_location, num_parts=num_parts, ) - return (csv_files_location, num_parts), sqlite_path + return (csv_files_location, parquet_files_location, num_parts), sqlite_path -@pytest.fixture(scope="module") -def get_df(gen_tpch): - (csv_files_location, num_parts), _ = gen_tpch +@pytest.fixture(scope="module", params=["csv", "parquet"]) +def get_df(gen_tpch, request): + (csv_files_location, parquet_files_location, num_parts), _ = gen_tpch + file_type = request.param def _get_df(tbl_name: str): - # TODO (jay): Perhaps we should use Parquet here instead similar to benchmarking and get rid of this CSV parsing stuff? - local_fs = LocalFileSystem() - # Used chunked files if found - nonchunked_filepath = f"{csv_files_location}/{tbl_name}.tbl" - chunked_filepath = nonchunked_filepath + ".*" - try: - local_fs.expand_path(chunked_filepath) - fp = chunked_filepath - except FileNotFoundError: - fp = nonchunked_filepath - - df = daft.read_csv( - fp, - has_headers=False, - delimiter="|", - ) + if file_type == "csv": + local_fs = LocalFileSystem() + nonchunked_filepath = f"{csv_files_location}/{tbl_name}.tbl" + chunked_filepath = nonchunked_filepath + ".*" + try: + local_fs.expand_path(chunked_filepath) + fp = chunked_filepath + except FileNotFoundError: + fp = nonchunked_filepath + + df = daft.read_csv( + fp, + has_headers=False, + delimiter="|", + ) + elif file_type == "parquet": + fp = f"{parquet_files_location}/{tbl_name}/*" + df = daft.read_parquet(fp) + df = df.select( *[ daft.col(autoname).alias(colname)