diff --git a/integrations/parquet/Cargo.toml b/integrations/parquet/Cargo.toml index 153beb051182..61cc78188dd1 100644 --- a/integrations/parquet/Cargo.toml +++ b/integrations/parquet/Cargo.toml @@ -32,13 +32,13 @@ async-trait = "0.1" bytes = "1" futures = "0.3" opendal = { version = "0.50.0", path = "../../core" } -parquet = { version = "53.0", default-features = false, features = [ +parquet = { version = "53.1", default-features = false, features = [ "async", "arrow", ] } [dev-dependencies] -arrow = { version = "53.0" } +arrow = { version = "53.1" } opendal = { version = "0.50.0", path = "../../core", features = [ "services-memory", "services-s3", diff --git a/integrations/parquet/src/async_reader.rs b/integrations/parquet/src/async_reader.rs index 4bffdd06009c..d4cc308be697 100644 --- a/integrations/parquet/src/async_reader.rs +++ b/integrations/parquet/src/async_reader.rs @@ -24,8 +24,8 @@ use futures::FutureExt; use opendal::Reader; use parquet::arrow::async_reader::AsyncFileReader; use parquet::errors::{ParquetError, Result as ParquetResult}; -use parquet::file::footer::{decode_footer, decode_metadata}; use parquet::file::metadata::ParquetMetaData; +use parquet::file::metadata::ParquetMetaDataReader; use parquet::file::FOOTER_SIZE; const PREFETCH_FOOTER_SIZE: usize = 512 * 1024; @@ -156,40 +156,12 @@ impl AsyncFileReader for AsyncReader { fn get_metadata(&mut self) -> BoxFuture<'_, ParquetResult>> { async move { - let prefetched_footer_content = self - .inner - .read(self.content_length - self.prefetch_footer_size as u64..self.content_length) - .await - .map_err(|err| ParquetError::External(Box::new(err)))?; - let prefetched_footer_length = prefetched_footer_content.len(); - - // Decode the metadata length from the last 8 bytes of the file. - let metadata_length = { - let buf = &prefetched_footer_content - .slice((prefetched_footer_length - FOOTER_SIZE)..prefetched_footer_length); - // Safety: checked above. - let buf: [u8; 8] = buf.to_vec().try_into().unwrap(); - decode_footer(&buf)? - }; - - // Try to read the metadata from the `prefetched_footer_content`. - // Otherwise, fetch exact metadata from the remote. - let buf = if prefetched_footer_length >= metadata_length + FOOTER_SIZE { - prefetched_footer_content.slice( - (prefetched_footer_length - metadata_length - FOOTER_SIZE) - ..(prefetched_footer_length - FOOTER_SIZE), - ) - } else { - self.inner - .read( - self.content_length - metadata_length as u64 - FOOTER_SIZE as u64 - ..self.content_length - FOOTER_SIZE as u64, - ) - .await - .map_err(|err| ParquetError::External(Box::new(err)))? - }; + let reader = + ParquetMetaDataReader::new().with_prefetch_hint(Some(self.prefetch_footer_size)); + let size = self.content_length as usize; + let meta = reader.load_and_finish(self, size).await?; - Ok(Arc::new(decode_metadata(&buf.to_vec())?)) + Ok(Arc::new(meta)) } .boxed() }