Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parquet/async: Default to suffix requests on supporting readers/object stores #6157

Draft
wants to merge 7 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion parquet/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ serde_json = { version = "1.0", features = ["std"], default-features = false }
arrow = { workspace = true, features = ["ipc", "test_utils", "prettyprint", "json"] }
tokio = { version = "1.0", default-features = false, features = ["macros", "rt", "io-util", "fs"] }
rand = { version = "0.8", default-features = false, features = ["std", "std_rng"] }
object_store = { version = "0.10.0", default-features = false, features = ["azure"] }
object_store = { version = "0.10.0", default-features = false, features = ["azure", "aws"] }

# TODO: temporary to fix parquet wasm build
# upstream issue: https://github.com/gyscos/zstd-rs/issues/269
Expand Down
2 changes: 1 addition & 1 deletion parquet/examples/read_with_rowgroup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ impl InMemoryRowGroup {
if self.mask.leaf_included(leaf_idx) {
let (start, len) = meta.byte_range();
let data = reader
.get_bytes(start as usize..(start + len) as usize)
.get_bytes((start as usize..(start + len) as usize).into())
.await?;

vs[leaf_idx] = Some(Arc::new(ColumnChunkData {
Expand Down
170 changes: 145 additions & 25 deletions parquet/src/arrow/async_reader/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.

use crate::arrow::async_reader::AsyncFileReader;
use crate::arrow::async_reader::{AsyncFileReader, GetRange};
use crate::errors::{ParquetError, Result};
use crate::file::footer::{decode_footer, decode_metadata};
use crate::file::metadata::ParquetMetaData;
Expand All @@ -25,15 +25,14 @@ use bytes::Bytes;
use futures::future::BoxFuture;
use futures::FutureExt;
use std::future::Future;
use std::ops::Range;

/// A data source that can be used with [`MetadataLoader`] to load [`ParquetMetaData`]
pub trait MetadataFetch {
fn fetch(&mut self, range: Range<usize>) -> BoxFuture<'_, Result<Bytes>>;
fn fetch(&mut self, range: GetRange) -> BoxFuture<'_, Result<Bytes>>;
}

impl<'a, T: AsyncFileReader> MetadataFetch for &'a mut T {
fn fetch(&mut self, range: Range<usize>) -> BoxFuture<'_, Result<Bytes>> {
fn fetch(&mut self, range: GetRange) -> BoxFuture<'_, Result<Bytes>> {
self.get_bytes(range)
}
}
Expand All @@ -52,6 +51,39 @@ impl<F: MetadataFetch> MetadataLoader<F> {
/// Create a new [`MetadataLoader`] by reading the footer information
///
/// See [`fetch_parquet_metadata`] for the meaning of the individual parameters
pub async fn load_without_size(mut fetch: F, prefetch: Option<usize>) -> Result<Self> {
let suffix = fetch.fetch(GetRange::Suffix(prefetch.unwrap_or(8))).await?;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi, thanks a lot for this effort. However, using suffix fetch by default may break users on storage services that don't support this, like azblob. Have you considered not changing load, but adding a new API called load_without_size, load_with_suffix_fetch, or something similar?

Copy link
Contributor

@kylebarron kylebarron Aug 21, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As described here, I believe the default implementation for azure is to make two requests: one for the the length and another for the suffix data. But then this is much more performant on all other platforms

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As described here, I believe the default implementation for azure is to make two requests: one for the the length and another for the suffix data. But then this is much more performant on all other platforms

Hi, I disagree with this because, in most cases, we already have the file size from ListObjects or other metadata services. So, this change doesn't perform better on other platforms and has a negative effect on azblob.

Would you reconsider the choice by adding a new function instead of changing the existing one? This would also allow us to include it in the next minor version, avoiding a breaking change.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't have a preference myself whether the default load uses a suffix request or not. As @H-Plus-Time noted above, we're looking for consensus on this.

Aspects that need consensus:

  • which of the two options (suffix, or non-suffix) gets the load method name (i.e. which is the default), and what the non-default method name should be.

Separately,

in most cases, we already have the file size from ListObjects or other metadata services

this seems to depend heavily on your use case. In my case I rarely have this information already.

include it in the next minor version, avoiding a breaking change

This is moot anyways, because the next release is breaking, right?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is moot anyways, because the next release is breaking, right?

Thanks for pointing out this, let's ignore this part.

let suffix_len = suffix.len();

let mut footer = [0; 8];
footer.copy_from_slice(&suffix[suffix_len - 8..suffix_len]);

let length = decode_footer(&footer)?;

// Did not fetch the entire file metadata in the initial read, need to make a second request
let (metadata, remainder) = if length > suffix_len - 8 {
let metadata_offset = length + 8;
let meta = fetch.fetch(GetRange::Suffix(metadata_offset)).await?;
let slice = &meta[0..length];
(decode_metadata(slice)?, None)
} else {
let metadata_offset = length + 8;
let metadata_start = suffix_len - metadata_offset;

let slice = &suffix[metadata_start..suffix_len - 8];
(
decode_metadata(slice)?,
Some((0, suffix.slice(..metadata_start))),
)
};

Ok(Self {
fetch,
metadata,
remainder,
})
}

pub async fn load(mut fetch: F, file_size: usize, prefetch: Option<usize>) -> Result<Self> {
if file_size < 8 {
return Err(ParquetError::EOF(format!(
Expand All @@ -67,7 +99,7 @@ impl<F: MetadataFetch> MetadataLoader<F> {
file_size - 8
};

let suffix = fetch.fetch(footer_start..file_size).await?;
let suffix = fetch.fetch((footer_start..file_size).into()).await?;
let suffix_len = suffix.len();

let mut footer = [0; 8];
Expand All @@ -86,18 +118,18 @@ impl<F: MetadataFetch> MetadataLoader<F> {
// Did not fetch the entire file metadata in the initial read, need to make a second request
let (metadata, remainder) = if length > suffix_len - 8 {
let metadata_start = file_size - length - 8;
let meta = fetch.fetch(metadata_start..file_size - 8).await?;
let meta = fetch.fetch((metadata_start..file_size - 8).into()).await?;
(decode_metadata(&meta)?, None)
} else {
let metadata_start = file_size - length - 8 - footer_start;
let metadata_offset = length + 8;
let metadata_start = suffix_len - metadata_offset;

let slice = &suffix[metadata_start..suffix_len - 8];
(
decode_metadata(slice)?,
Some((footer_start, suffix.slice(..metadata_start))),
Some((0, suffix.slice(..metadata_start))),
)
};

Ok(Self {
fetch,
metadata,
Expand Down Expand Up @@ -133,13 +165,15 @@ impl<F: MetadataFetch> MetadataLoader<F> {
Some(range) => range,
};

let page_index_len = range.end - range.start;
// TODO: determine if _remainder_start is needed even in the non-suffix request case
let data = match &self.remainder {
Some((remainder_start, remainder)) if *remainder_start <= range.start => {
let offset = range.start - *remainder_start;
remainder.slice(offset..range.end - *remainder_start + offset)
Some((_remainder_start, remainder)) if remainder.len() >= page_index_len => {
let offset = remainder.len() - page_index_len;
remainder.slice(offset..)
}
// Note: this will potentially fetch data already in remainder, this keeps things simple
_ => self.fetch.fetch(range.start..range.end).await?,
_ => self.fetch.fetch((range.start..range.end).into()).await?,
};

// Sanity check
Expand Down Expand Up @@ -200,10 +234,10 @@ struct MetadataFetchFn<F>(F);

impl<F, Fut> MetadataFetch for MetadataFetchFn<F>
where
F: FnMut(Range<usize>) -> Fut + Send,
F: FnMut(GetRange) -> Fut + Send,
Fut: Future<Output = Result<Bytes>> + Send,
{
fn fetch(&mut self, range: Range<usize>) -> BoxFuture<'_, Result<Bytes>> {
fn fetch(&mut self, range: GetRange) -> BoxFuture<'_, Result<Bytes>> {
async move { self.0(range).await }.boxed()
}
}
Expand All @@ -226,15 +260,18 @@ where
/// significantly reduce the number of `fetch` requests, and consequently latency
pub async fn fetch_parquet_metadata<F, Fut>(
fetch: F,
file_size: usize,
file_size: Option<usize>,
prefetch: Option<usize>,
) -> Result<ParquetMetaData>
where
F: FnMut(Range<usize>) -> Fut + Send,
F: FnMut(GetRange) -> Fut + Send,
Fut: Future<Output = Result<Bytes>> + Send,
{
let fetch = MetadataFetchFn(fetch);
let loader = MetadataLoader::load(fetch, file_size, prefetch).await?;
let loader = match file_size {
Some(file_size) => MetadataLoader::load(fetch, file_size, prefetch).await?,
None => MetadataLoader::load_without_size(fetch, prefetch).await?,
};
Ok(loader.finish())
}

Expand All @@ -247,7 +284,13 @@ mod tests {
use std::io::{Read, Seek, SeekFrom};
use std::sync::atomic::{AtomicUsize, Ordering};

fn read_range(file: &mut File, range: Range<usize>) -> Result<Bytes> {
fn read_range(file: &mut File, range: GetRange) -> Result<Bytes> {
let file_size = file.len().try_into().unwrap();
let range = match range {
GetRange::Bounded(range) => range,
GetRange::Offset(offset) => offset..file_size,
GetRange::Suffix(end_offset) => file_size.saturating_sub(end_offset)..file_size,
};
file.seek(SeekFrom::Start(range.start as _))?;
let len = range.end - range.start;
let mut buf = Vec::with_capacity(len);
Expand All @@ -268,42 +311,51 @@ mod tests {
fetch_count.fetch_add(1, Ordering::SeqCst);
futures::future::ready(read_range(&mut file, range))
};
// Known file size, unknown metadata size
let actual = fetch_parquet_metadata(&mut fetch, Some(len), None)
.await
.unwrap();
assert_eq!(actual.file_metadata().schema(), expected);
assert_eq!(fetch_count.load(Ordering::SeqCst), 2);

let actual = fetch_parquet_metadata(&mut fetch, len, None).await.unwrap();
fetch_count.store(0, Ordering::SeqCst);
let actual = fetch_parquet_metadata(&mut fetch, None, None)
.await
.unwrap();
assert_eq!(actual.file_metadata().schema(), expected);
assert_eq!(fetch_count.load(Ordering::SeqCst), 2);

// Metadata hint too small
fetch_count.store(0, Ordering::SeqCst);
let actual = fetch_parquet_metadata(&mut fetch, len, Some(10))
let actual = fetch_parquet_metadata(&mut fetch, None, Some(10))
.await
.unwrap();
assert_eq!(actual.file_metadata().schema(), expected);
assert_eq!(fetch_count.load(Ordering::SeqCst), 2);

// Metadata hint too large
fetch_count.store(0, Ordering::SeqCst);
let actual = fetch_parquet_metadata(&mut fetch, len, Some(500))
let actual = fetch_parquet_metadata(&mut fetch, None, Some(500))
.await
.unwrap();
assert_eq!(actual.file_metadata().schema(), expected);
assert_eq!(fetch_count.load(Ordering::SeqCst), 1);

// Metadata hint exactly correct
fetch_count.store(0, Ordering::SeqCst);
let actual = fetch_parquet_metadata(&mut fetch, len, Some(428))
let actual = fetch_parquet_metadata(&mut fetch, None, Some(428))
.await
.unwrap();
assert_eq!(actual.file_metadata().schema(), expected);
assert_eq!(fetch_count.load(Ordering::SeqCst), 1);

let err = fetch_parquet_metadata(&mut fetch, 4, None)
let err = fetch_parquet_metadata(&mut fetch, Some(4), None)
.await
.unwrap_err()
.to_string();
assert_eq!(err, "EOF: file size of 4 is less than footer");

let err = fetch_parquet_metadata(&mut fetch, 20, None)
let err = fetch_parquet_metadata(&mut fetch, Some(20), None)
.await
.unwrap_err()
.to_string();
Expand All @@ -320,6 +372,64 @@ mod tests {
futures::future::ready(read_range(&mut file, range))
};

let f = MetadataFetchFn(&mut fetch);
let mut loader = MetadataLoader::load_without_size(f, None).await.unwrap();
assert_eq!(fetch_count.load(Ordering::SeqCst), 2);
loader.load_page_index(true, true).await.unwrap();
assert_eq!(fetch_count.load(Ordering::SeqCst), 3);
let metadata = loader.finish();
assert!(metadata.offset_index().is_some() && metadata.column_index().is_some());

// Prefetch just footer exactly
fetch_count.store(0, Ordering::SeqCst);
let f = MetadataFetchFn(&mut fetch);
let mut loader = MetadataLoader::load_without_size(f, Some(1729))
.await
.unwrap();
assert_eq!(fetch_count.load(Ordering::SeqCst), 1);
loader.load_page_index(true, true).await.unwrap();
assert_eq!(fetch_count.load(Ordering::SeqCst), 2);
let metadata = loader.finish();
assert!(metadata.offset_index().is_some() && metadata.column_index().is_some());

// Prefetch more than footer but not enough
fetch_count.store(0, Ordering::SeqCst);
let f = MetadataFetchFn(&mut fetch);
let mut loader = MetadataLoader::load_without_size(f, Some(130649))
.await
.unwrap();
assert_eq!(fetch_count.load(Ordering::SeqCst), 1);
loader.load_page_index(true, true).await.unwrap();
assert_eq!(fetch_count.load(Ordering::SeqCst), 2);
let metadata = loader.finish();
assert!(metadata.offset_index().is_some() && metadata.column_index().is_some());

// Prefetch exactly enough
fetch_count.store(0, Ordering::SeqCst);
let f = MetadataFetchFn(&mut fetch);
let mut loader = MetadataLoader::load_without_size(f, Some(130650))
.await
.unwrap();
assert_eq!(fetch_count.load(Ordering::SeqCst), 1);
loader.load_page_index(true, true).await.unwrap();
assert_eq!(fetch_count.load(Ordering::SeqCst), 1);
let metadata = loader.finish();
assert!(metadata.offset_index().is_some() && metadata.column_index().is_some());

// Prefetch more than enough
fetch_count.store(0, Ordering::SeqCst);
let f = MetadataFetchFn(&mut fetch);
let mut loader = MetadataLoader::load_without_size(f, Some(131651))
.await
.unwrap();
assert_eq!(fetch_count.load(Ordering::SeqCst), 1);
loader.load_page_index(true, true).await.unwrap();
assert_eq!(fetch_count.load(Ordering::SeqCst), 1);
let metadata = loader.finish();
assert!(metadata.offset_index().is_some() && metadata.column_index().is_some());

// Known-size file
fetch_count.store(0, Ordering::SeqCst);
let f = MetadataFetchFn(&mut fetch);
let mut loader = MetadataLoader::load(f, len, None).await.unwrap();
assert_eq!(fetch_count.load(Ordering::SeqCst), 2);
Expand Down Expand Up @@ -357,5 +467,15 @@ mod tests {
assert_eq!(fetch_count.load(Ordering::SeqCst), 1);
let metadata = loader.finish();
assert!(metadata.offset_index().is_some() && metadata.column_index().is_some());

// Prefetch more than enough
fetch_count.store(0, Ordering::SeqCst);
let f = MetadataFetchFn(&mut fetch);
let mut loader = MetadataLoader::load(f, len, Some(131651)).await.unwrap();
assert_eq!(fetch_count.load(Ordering::SeqCst), 1);
loader.load_page_index(true, true).await.unwrap();
assert_eq!(fetch_count.load(Ordering::SeqCst), 1);
let metadata = loader.finish();
assert!(metadata.offset_index().is_some() && metadata.column_index().is_some());
}
}
Loading