Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FEAT] Remote parquet streaming #2620

Merged
merged 7 commits into from
Aug 9, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
158 changes: 143 additions & 15 deletions src/daft-parquet/src/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,16 @@ use parquet2::{
read::get_owned_page_stream_from_column_start,
FallibleStreamingIterator,
};
use rayon::iter::{IndexedParallelIterator, IntoParallelIterator, ParallelIterator};
use snafu::ResultExt;

use crate::{
metadata::read_parquet_metadata,
read::ParquetSchemaInferenceOptions,
read_planner::{CoalescePass, RangesContainer, ReadPlanner, SplitLargeRequestPass},
statistics, JoinSnafu, OneShotRecvSnafu, UnableToConvertRowGroupMetadataToStatsSnafu,
statistics,
stream_reader::arrow_column_iters_to_table_iter,
JoinSnafu, OneShotRecvSnafu, UnableToConvertRowGroupMetadataToStatsSnafu,
UnableToConvertSchemaToDaftSnafu, UnableToCreateParquetPageStreamSnafu,
UnableToParseSchemaFromMetadataSnafu, UnableToRunExpressionOnStatsSnafu,
};
Expand Down Expand Up @@ -298,6 +301,12 @@ pub(crate) struct ParquetFileReader {
}

impl ParquetFileReader {
const CHUNK_SIZE: usize = 2048;
// Set to a very high number 256MB to guard against unbounded large
// downloads from remote storage, which likely indicates corrupted Parquet data
// See: https://github.com/Eventual-Inc/Daft/issues/1551
const MAX_HEADER_SIZE: usize = 256 * 1024 * 1024;

fn new(
uri: String,
metadata: parquet2::metadata::FileMetaData,
Expand Down Expand Up @@ -371,9 +380,136 @@ impl ParquetFileReader {

pub async fn read_from_ranges_into_table_stream(
self,
_ranges: Arc<RangesContainer>,
) -> BoxStream<'static, DaftResult<Table>> {
todo!("Implement streaming reads for remote parquet files")
ranges: Arc<RangesContainer>,
maintain_order: bool,
predicate: Option<ExprRef>,
original_columns: Option<Vec<String>>,
original_num_rows: Option<usize>,
) -> DaftResult<BoxStream<'static, DaftResult<Table>>> {
let rt_handle = tokio::runtime::Handle::current();
let daft_schema = daft_core::schema::Schema::try_from(self.arrow_schema.as_ref())?;
let schema_ref = Arc::new(daft_schema);

let mut table_iters = Vec::with_capacity(self.row_ranges.len());
for row_range in self.row_ranges.iter() {
let rg = self
.metadata
.row_groups
.get(row_range.row_group_index)
.expect("Row Group index should be in bounds");
let num_rows = rg.num_rows().min(row_range.start + row_range.num_rows);
let columns = rg.columns();

let mut arr_iters = Vec::with_capacity(self.arrow_schema.fields.len());
for field in self.arrow_schema.fields.iter() {
let filtered_cols_idx = columns
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looks like this may be computed for every row_group. We may want to just compute it once and leverage that?

Copy link
Contributor Author

@colin-ho colin-ho Aug 7, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will they always be the same for all row groups?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe so, but you should double check!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok managed to refactor it so that it doesn't need idxs, just retrieves the column metadata directly for each field per rowgroup, which should be unique.

.iter()
.enumerate()
.filter(|(_, x)| x.descriptor().path_in_schema[0] == field.name)
.map(|(i, _)| i)
.collect::<Vec<_>>();

let needed_byte_ranges = filtered_cols_idx
.iter()
.map(|i| {
let c = columns.get(*i).unwrap();
let (start, len) = c.byte_range();
let end: u64 = start + len;
start as usize..end as usize
})
.collect::<Vec<_>>();

let mut range_readers = Vec::with_capacity(filtered_cols_idx.len());
for range in needed_byte_ranges.into_iter() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

instead of collecting range_readers into a Vec, you can probably do this in 1 loop, producing decompressed_iters, ptypes, num_values, etc

let range_reader = ranges.get_range_reader(range).await?;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this does block on the first IO request being complete. so you may want to not await this but join_all once you have all the range_readers.

range_readers.push(Box::pin(range_reader))
}

let mut decompressed_iters = Vec::with_capacity(filtered_cols_idx.len());
let mut ptypes = Vec::with_capacity(filtered_cols_idx.len());
let mut num_values = Vec::with_capacity(filtered_cols_idx.len());
for (col_idx, range_reader) in filtered_cols_idx.into_iter().zip(range_readers) {
let col = rg
.columns()
.get(col_idx)
.expect("Column index should be in bounds");
ptypes.push(col.descriptor().descriptor.primitive_type.clone());
num_values.push(col.metadata().num_values as usize);

let compressed_page_stream = get_owned_page_stream_from_column_start(
col,
range_reader,
vec![],
Arc::new(|_, _| true),
Self::MAX_HEADER_SIZE,
)
.with_context(|_| {
UnableToCreateParquetPageStreamSnafu::<String> {
path: self.uri.clone(),
}
})?;
let page_stream = streaming_decompression(compressed_page_stream);
let pinned_stream = Box::pin(page_stream);
decompressed_iters.push(StreamIterator::new(pinned_stream, rt_handle.clone()))
}
let arr_iter = column_iter_to_arrays(
decompressed_iters,
ptypes.iter().collect(),
field.clone(),
Some(Self::CHUNK_SIZE),
num_rows,
num_values,
)?;
arr_iters.push(arr_iter);
}

let table_iter = arrow_column_iters_to_table_iter(
arr_iters,
row_range.start,
schema_ref.clone(),
self.uri.clone(),
predicate.clone(),
original_columns.clone(),
original_num_rows,
);
table_iters.push(table_iter);
}

let (senders, receivers): (Vec<_>, Vec<_>) = self
.row_ranges
.iter()
.map(|rg_range| {
let expected_num_chunks =
f32::ceil(rg_range.num_rows as f32 / Self::CHUNK_SIZE as f32) as usize;
crossbeam_channel::bounded(expected_num_chunks)
})
.unzip();

rayon::spawn(move || {
table_iters
.into_par_iter()
.zip(senders.into_par_iter())
.for_each(|(table_iter, tx)| {
for table_result in table_iter {
let is_err = table_result.is_err();
if let Err(crossbeam_channel::TrySendError::Full(_)) =
tx.try_send(table_result)
{
panic!("Parquet stream channel should not be full")
}
if is_err {
break;
}
}
});
});

let combined_stream =
futures::stream::iter(receivers.into_iter().map(futures::stream::iter));
match maintain_order {
true => Ok(Box::pin(combined_stream.flatten())),
false => Ok(Box::pin(combined_stream.flatten_unordered(None))),
}
}

pub async fn read_from_ranges_into_table(
Expand Down Expand Up @@ -454,12 +590,8 @@ impl ParquetFileReader {
range_reader,
vec![],
Arc::new(|_, _| true),
// Set to a very high number 256MB to guard against unbounded large
// downloads from remote storage, which likely indicates corrupted Parquet data
// See: https://github.com/Eventual-Inc/Daft/issues/1551
256 * 1024 * 1024,
Self::MAX_HEADER_SIZE,
)
.await
.with_context(|_| {
UnableToCreateParquetPageStreamSnafu::<String> {
path: owned_uri.clone(),
Expand All @@ -477,7 +609,7 @@ impl ParquetFileReader {
decompressed_iters,
ptypes.iter().collect(),
field.clone(),
Some(2048),
Some(Self::CHUNK_SIZE),
num_rows,
num_values,
);
Expand Down Expand Up @@ -641,12 +773,8 @@ impl ParquetFileReader {
range_reader,
vec![],
Arc::new(|_, _| true),
// Set to a very high number 256MB to guard against unbounded large
// downloads from remote storage, which likely indicates corrupted Parquet data
// See: https://github.com/Eventual-Inc/Daft/issues/1551
256 * 1024 * 1024,
Self::MAX_HEADER_SIZE,
)
.await
.with_context(|_| {
UnableToCreateParquetPageStreamSnafu::<String> {
path: owned_uri.clone(),
Expand Down
51 changes: 48 additions & 3 deletions src/daft-parquet/src/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -410,8 +410,14 @@ async fn stream_parquet_single(
Ok((
Arc::new(metadata),
parquet_reader
.read_from_ranges_into_table_stream(ranges)
.await,
.read_from_ranges_into_table_stream(
ranges,
maintain_order,
predicate.clone(),
columns_to_return,
num_rows_to_return,
)
.await?,
))
}?;

Expand Down Expand Up @@ -1014,11 +1020,15 @@ mod tests {
use common_error::DaftResult;

use daft_io::{IOClient, IOConfig};
use futures::StreamExt;

use super::read_parquet;
use super::stream_parquet;

const PARQUET_FILE: &str = "s3://daft-public-data/test_fixtures/parquet-dev/mvp.parquet";
#[test]
fn test_parquet_read_from_s3() -> DaftResult<()> {
let file = "s3://daft-public-data/test_fixtures/parquet-dev/mvp.parquet";
let file = PARQUET_FILE;

let mut io_config = IOConfig::default();
io_config.s3.anonymous = true;
Expand All @@ -1043,4 +1053,39 @@ mod tests {

Ok(())
}

#[test]
fn test_parquet_streaming_read_from_s3() -> DaftResult<()> {
let file = PARQUET_FILE;

let mut io_config = IOConfig::default();
io_config.s3.anonymous = true;

let io_client = Arc::new(IOClient::new(io_config.into())?);
let runtime_handle = daft_io::get_runtime(true)?;
runtime_handle.block_on(async move {
let tables = stream_parquet(
file,
None,
None,
None,
None,
None,
io_client,
None,
&Default::default(),
None,
None,
false,
)
.await?
.collect::<Vec<_>>()
.await
.into_iter()
.collect::<DaftResult<Vec<_>>>()?;
let total_tables_len = tables.iter().map(|t| t.len()).sum::<usize>();
assert_eq!(total_tables_len, 100);
Ok(())
})
}
}
Loading
Loading