Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FEAT] Remote parquet streaming #2620

Merged
merged 7 commits into from
Aug 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
171 changes: 156 additions & 15 deletions src/daft-parquet/src/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@
metadata::read_parquet_metadata,
read::ParquetSchemaInferenceOptions,
read_planner::{CoalescePass, RangesContainer, ReadPlanner, SplitLargeRequestPass},
statistics, JoinSnafu, OneShotRecvSnafu, UnableToConvertRowGroupMetadataToStatsSnafu,
statistics,
stream_reader::arrow_column_iters_to_table_iter,
JoinSnafu, OneShotRecvSnafu, UnableToConvertRowGroupMetadataToStatsSnafu,
UnableToConvertSchemaToDaftSnafu, UnableToCreateParquetPageStreamSnafu,
UnableToParseSchemaFromMetadataSnafu, UnableToRunExpressionOnStatsSnafu,
};
Expand Down Expand Up @@ -298,6 +300,12 @@
}

impl ParquetFileReader {
const CHUNK_SIZE: usize = 2048;
// Set to a very high number 256MB to guard against unbounded large
// downloads from remote storage, which likely indicates corrupted Parquet data
// See: https://github.com/Eventual-Inc/Daft/issues/1551
const MAX_HEADER_SIZE: usize = 256 * 1024 * 1024;

fn new(
uri: String,
metadata: parquet2::metadata::FileMetaData,
Expand Down Expand Up @@ -371,9 +379,150 @@

pub async fn read_from_ranges_into_table_stream(
self,
_ranges: Arc<RangesContainer>,
) -> BoxStream<'static, DaftResult<Table>> {
todo!("Implement streaming reads for remote parquet files")
ranges: Arc<RangesContainer>,
maintain_order: bool,
predicate: Option<ExprRef>,
original_columns: Option<Vec<String>>,
original_num_rows: Option<usize>,
) -> DaftResult<BoxStream<'static, DaftResult<Table>>> {
let daft_schema = Arc::new(daft_core::schema::Schema::try_from(
self.arrow_schema.as_ref(),
)?);

let (senders, receivers): (Vec<_>, Vec<_>) = self
.row_ranges
.iter()
.map(|rg_range| {
let expected_num_chunks =
f32::ceil(rg_range.num_rows as f32 / Self::CHUNK_SIZE as f32) as usize;
crossbeam_channel::bounded(expected_num_chunks)
})
.unzip();

let table_iter_handles =
self.row_ranges
.iter()
.zip(senders.into_iter())
.map(|(row_range, sender)| {
let uri = self.uri.clone();
let metadata = self.metadata.clone();
let arrow_schema = self.arrow_schema.clone();
let daft_schema = daft_schema.clone();
let ranges = ranges.clone();
let predicate = predicate.clone();
let original_columns = original_columns.clone();
let row_range = *row_range;

tokio::task::spawn(async move {
let arr_iter_handles = arrow_schema.fields.iter().map(|field| {
let rt_handle = tokio::runtime::Handle::current();
let ranges = ranges.clone();
let uri = uri.clone();
let field = field.clone();
let metadata = metadata.clone();

tokio::task::spawn(async move {
let rg = metadata
.row_groups
.get(row_range.row_group_index)
.expect("Row Group index should be in bounds");
let num_rows =
rg.num_rows().min(row_range.start + row_range.num_rows);
let filtered_columns = rg
.columns()
.iter()
.filter(|x| x.descriptor().path_in_schema[0] == field.name)
.collect::<Vec<_>>();
let mut decompressed_iters =
Vec::with_capacity(filtered_columns.len());
let mut ptypes = Vec::with_capacity(filtered_columns.len());
let mut num_values = Vec::with_capacity(filtered_columns.len());
for col in filtered_columns.into_iter() {
num_values.push(col.metadata().num_values as usize);
ptypes.push(col.descriptor().descriptor.primitive_type.clone());

let byte_range = {
let (start, len) = col.byte_range();
let end: u64 = start + len;
start as usize..end as usize
};
let range_reader =
Box::pin(ranges.get_range_reader(byte_range).await?);
let compressed_page_stream =
get_owned_page_stream_from_column_start(
col,
range_reader,
vec![],
Arc::new(|_, _| true),
Self::MAX_HEADER_SIZE,
)
.with_context(
|_| UnableToCreateParquetPageStreamSnafu::<String> {
path: uri.clone(),

Check warning on line 461 in src/daft-parquet/src/file.rs

View check run for this annotation

Codecov / codecov/patch

src/daft-parquet/src/file.rs#L461

Added line #L461 was not covered by tests
},
)?;
let page_stream =
streaming_decompression(compressed_page_stream);
let pinned_stream = Box::pin(page_stream);
decompressed_iters
.push(StreamIterator::new(pinned_stream, rt_handle.clone()))
}
let arr_iter = column_iter_to_arrays(
decompressed_iters,
ptypes.iter().collect(),
field,
Some(Self::CHUNK_SIZE),
num_rows,
num_values,
)?;
Ok(arr_iter)
})
});

let arr_iters = try_join_all(arr_iter_handles)
.await
.context(JoinSnafu { path: uri.clone() })?
.into_iter()
.collect::<DaftResult<Vec<_>>>()?;

let table_iter = arrow_column_iters_to_table_iter(
arr_iters,
row_range.start,
daft_schema,
uri,
predicate,
original_columns,
original_num_rows,
);
rayon::spawn(move || {
for table_result in table_iter {
let is_err = table_result.is_err();
if let Err(crossbeam_channel::TrySendError::Full(_)) =

Check warning on line 500 in src/daft-parquet/src/file.rs

View check run for this annotation

Codecov / codecov/patch

src/daft-parquet/src/file.rs#L500

Added line #L500 was not covered by tests
sender.try_send(table_result)
{
panic!("Parquet stream channel should not be full")

Check warning on line 503 in src/daft-parquet/src/file.rs

View check run for this annotation

Codecov / codecov/patch

src/daft-parquet/src/file.rs#L503

Added line #L503 was not covered by tests
}
if is_err {
break;

Check warning on line 506 in src/daft-parquet/src/file.rs

View check run for this annotation

Codecov / codecov/patch

src/daft-parquet/src/file.rs#L506

Added line #L506 was not covered by tests
}
}
});
Ok(())
})
});

let _ = try_join_all(table_iter_handles)
.await
.context(JoinSnafu { path: self.uri })?
.into_iter()
.collect::<DaftResult<Vec<_>>>()?;

let combined_stream =
futures::stream::iter(receivers.into_iter().map(futures::stream::iter));
match maintain_order {
true => Ok(Box::pin(combined_stream.flatten())),

Check warning on line 523 in src/daft-parquet/src/file.rs

View check run for this annotation

Codecov / codecov/patch

src/daft-parquet/src/file.rs#L523

Added line #L523 was not covered by tests
false => Ok(Box::pin(combined_stream.flatten_unordered(None))),
}
}

pub async fn read_from_ranges_into_table(
Expand Down Expand Up @@ -454,12 +603,8 @@
range_reader,
vec![],
Arc::new(|_, _| true),
// Set to a very high number 256MB to guard against unbounded large
// downloads from remote storage, which likely indicates corrupted Parquet data
// See: https://github.com/Eventual-Inc/Daft/issues/1551
256 * 1024 * 1024,
Self::MAX_HEADER_SIZE,
)
.await
.with_context(|_| {
UnableToCreateParquetPageStreamSnafu::<String> {
path: owned_uri.clone(),
Expand All @@ -477,7 +622,7 @@
decompressed_iters,
ptypes.iter().collect(),
field.clone(),
Some(2048),
Some(Self::CHUNK_SIZE),
num_rows,
num_values,
);
Expand Down Expand Up @@ -641,12 +786,8 @@
range_reader,
vec![],
Arc::new(|_, _| true),
// Set to a very high number 256MB to guard against unbounded large
// downloads from remote storage, which likely indicates corrupted Parquet data
// See: https://github.com/Eventual-Inc/Daft/issues/1551
256 * 1024 * 1024,
Self::MAX_HEADER_SIZE,

Check warning on line 789 in src/daft-parquet/src/file.rs

View check run for this annotation

Codecov / codecov/patch

src/daft-parquet/src/file.rs#L789

Added line #L789 was not covered by tests
)
.await
.with_context(|_| {
UnableToCreateParquetPageStreamSnafu::<String> {
path: owned_uri.clone(),
Expand Down
51 changes: 48 additions & 3 deletions src/daft-parquet/src/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -410,8 +410,14 @@ async fn stream_parquet_single(
Ok((
Arc::new(metadata),
parquet_reader
.read_from_ranges_into_table_stream(ranges)
.await,
.read_from_ranges_into_table_stream(
ranges,
maintain_order,
predicate.clone(),
columns_to_return,
num_rows_to_return,
)
.await?,
))
}?;

Expand Down Expand Up @@ -1014,11 +1020,15 @@ mod tests {
use common_error::DaftResult;

use daft_io::{IOClient, IOConfig};
use futures::StreamExt;

use super::read_parquet;
use super::stream_parquet;

const PARQUET_FILE: &str = "s3://daft-public-data/test_fixtures/parquet-dev/mvp.parquet";
#[test]
fn test_parquet_read_from_s3() -> DaftResult<()> {
let file = "s3://daft-public-data/test_fixtures/parquet-dev/mvp.parquet";
let file = PARQUET_FILE;

let mut io_config = IOConfig::default();
io_config.s3.anonymous = true;
Expand All @@ -1043,4 +1053,39 @@ mod tests {

Ok(())
}

#[test]
fn test_parquet_streaming_read_from_s3() -> DaftResult<()> {
let file = PARQUET_FILE;

let mut io_config = IOConfig::default();
io_config.s3.anonymous = true;

let io_client = Arc::new(IOClient::new(io_config.into())?);
let runtime_handle = daft_io::get_runtime(true)?;
runtime_handle.block_on(async move {
let tables = stream_parquet(
file,
None,
None,
None,
None,
None,
io_client,
None,
&Default::default(),
None,
None,
false,
)
.await?
.collect::<Vec<_>>()
.await
.into_iter()
.collect::<DaftResult<Vec<_>>>()?;
let total_tables_len = tables.iter().map(|t| t.len()).sum::<usize>();
assert_eq!(total_tables_len, 100);
Ok(())
})
}
}
Loading
Loading