diff --git a/rust/parquet/src/file/footer.rs b/rust/parquet/src/file/footer.rs index 240381c3038..a338c01d499 100644 --- a/rust/parquet/src/file/footer.rs +++ b/rust/parquet/src/file/footer.rs @@ -16,7 +16,6 @@ // under the License. use std::{ - cmp::min, io::{Cursor, Read, Seek, SeekFrom}, rc::Rc, }; @@ -29,8 +28,9 @@ use crate::basic::ColumnOrder; use crate::errors::{ParquetError, Result}; use crate::file::{ - metadata::*, reader::ChunkReader, DEFAULT_FOOTER_READ_SIZE, FOOTER_SIZE, - PARQUET_MAGIC, + metadata::*, + reader::{ChunkMode, ChunkReader, Length}, + DEFAULT_FOOTER_READ_SIZE, FOOTER_SIZE, PARQUET_MAGIC, }; use crate::schema::types::{self, SchemaDescriptor}; @@ -44,30 +44,31 @@ use crate::schema::types::{self, SchemaDescriptor}; /// The reader first reads DEFAULT_FOOTER_SIZE bytes from the end of the file. /// If it is not enough according to the length indicated in the footer, it reads more bytes. pub fn parse_metadata(chunk_reader: &R) -> Result { - // check file is large enough to hold footer - let file_size = chunk_reader.len(); - if file_size < (FOOTER_SIZE as u64) { + // read and cache up to DEFAULT_FOOTER_READ_SIZE bytes from the end and process the footer + let mut first_end_read = chunk_reader.get_read( + ChunkMode::FromEnd(DEFAULT_FOOTER_READ_SIZE as u64), + DEFAULT_FOOTER_READ_SIZE, + )?; + let first_end_len = first_end_read.len() as usize; + + if first_end_len < FOOTER_SIZE { return Err(general_err!( "Invalid Parquet file. Size is smaller than footer" )); } - // read and cache up to DEFAULT_FOOTER_READ_SIZE bytes from the end and process the footer - let default_end_len = min(DEFAULT_FOOTER_READ_SIZE, chunk_reader.len() as usize); - let mut default_end_reader = chunk_reader - .get_read(chunk_reader.len() - default_end_len as u64, default_end_len)?; - let mut default_len_end_buf = vec![0; default_end_len]; - default_end_reader.read_exact(&mut default_len_end_buf)?; + let mut first_len_end_buf = vec![0; first_end_len]; + first_end_read.read_exact(&mut first_len_end_buf)?; // check this is indeed a parquet file - if default_len_end_buf[default_end_len - 4..] != PARQUET_MAGIC { + if first_len_end_buf[first_end_len - 4..] != PARQUET_MAGIC { return Err(general_err!("Invalid Parquet file. Corrupt footer")); } // get the metadata length from the footer - let metadata_len = LittleEndian::read_i32( - &default_len_end_buf[default_end_len - 8..default_end_len - 4], - ) as i64; + let metadata_len = + LittleEndian::read_i32(&first_len_end_buf[first_end_len - 8..first_end_len - 4]) + as i64; if metadata_len < 0 { return Err(general_err!( "Invalid Parquet file. Metadata length is less than zero ({})", @@ -77,24 +78,31 @@ pub fn parse_metadata(chunk_reader: &R) -> Result; - if footer_metadata_len > file_size as usize { + if first_end_len < DEFAULT_FOOTER_READ_SIZE + && footer_metadata_len > first_end_len as usize + { return Err(general_err!( - "Invalid Parquet file. Metadata start is less than zero ({})", - file_size as i64 - footer_metadata_len as i64 + "Invalid Parquet file. Metadata size exceeds file size." )); } else if footer_metadata_len < DEFAULT_FOOTER_READ_SIZE { // the whole metadata is in the bytes we already read - default_end_cursor.seek(SeekFrom::End(-(footer_metadata_len as i64)))?; - metadata_read = Box::new(default_end_cursor); + first_end_cursor.seek(SeekFrom::End(-(footer_metadata_len as i64)))?; + metadata_read = Box::new(first_end_cursor); } else { // the end of file read by default is not long enough, read missing bytes + let complementary_end_len = FOOTER_SIZE + metadata_len as usize - first_end_len; let complementary_end_read = chunk_reader.get_read( - file_size - footer_metadata_len as u64, - FOOTER_SIZE + metadata_len as usize - default_end_len, + ChunkMode::FromEnd(footer_metadata_len as u64), + complementary_end_len, )?; - metadata_read = Box::new(complementary_end_read.chain(default_end_cursor)); + if complementary_end_read.len() < complementary_end_len as u64 { + return Err(general_err!( + "Invalid Parquet file. Metadata size exceeds file size." + )); + } + metadata_read = Box::new(complementary_end_read.chain(first_end_cursor)); } // TODO: row group filtering @@ -207,7 +215,7 @@ mod tests { assert!(reader_result.is_err()); assert_eq!( reader_result.err().unwrap(), - general_err!("Invalid Parquet file. Metadata start is less than zero (-255)") + general_err!("Invalid Parquet file. Metadata size exceeds file size.") ); } diff --git a/rust/parquet/src/file/reader.rs b/rust/parquet/src/file/reader.rs index 50991872eaf..82c83636d98 100644 --- a/rust/parquet/src/file/reader.rs +++ b/rust/parquet/src/file/reader.rs @@ -32,6 +32,22 @@ use crate::basic::Type; use crate::column::reader::ColumnReaderImpl; +/// Parquet files must be read from end for the footer then from start for columns +pub enum ChunkMode { + FromStart(u64), + FromEnd(u64), +} + +impl ChunkMode { + /// FromStart offset can always be computed if you know the length + pub fn from_start(&self, len: u64) -> u64 { + match self { + ChunkMode::FromStart(start_offset) => *start_offset, + ChunkMode::FromEnd(end_offset) => len.saturating_sub(*end_offset), + } + } +} + /// Length should return the total number of bytes in the input source. /// It's mainly used to read the metadata, which is at the end of the source. #[allow(clippy::len_without_is_empty)] @@ -43,11 +59,12 @@ pub trait Length { /// The ChunkReader trait generates readers of chunks of a source. /// For a file system reader, each chunk might contain a clone of File bounded on a given range. /// For an object store reader, each read can be mapped to a range request. -pub trait ChunkReader: Length { - type T: Read; - /// get a serialy readeable slice of the current reader - /// This should fail if the slice exceeds the current bounds - fn get_read(&self, start: u64, length: usize) -> Result; +pub trait ChunkReader { + type T: Read + Length; + /// Get a serialy readeable slice of the current reader. + /// If one end of the slice exceeds the bounds of the source, the slice will be clamped to the source. + /// In that case, the length of the resulting Read will be smaller than the requested length. + fn get_read(&self, start: ChunkMode, length: usize) -> Result; } // ---------------------------------------------------------------------- diff --git a/rust/parquet/src/file/serialized_reader.rs b/rust/parquet/src/file/serialized_reader.rs index 82997005e76..852cfc3ff34 100644 --- a/rust/parquet/src/file/serialized_reader.rs +++ b/rust/parquet/src/file/serialized_reader.rs @@ -55,8 +55,8 @@ impl TryClone for File { impl ChunkReader for File { type T = FileSource; - fn get_read(&self, start: u64, length: usize) -> Result { - Ok(FileSource::new(self, start, length)) + fn get_read(&self, start: ChunkMode, length: usize) -> Result { + Ok(FileSource::new(self, start.from_start(self.len()), length)) } } @@ -69,8 +69,8 @@ impl Length for SliceableCursor { impl ChunkReader for SliceableCursor { type T = SliceableCursor; - fn get_read(&self, start: u64, length: usize) -> Result { - self.slice(start, length).map_err(|e| e.into()) + fn get_read(&self, start: ChunkMode, length: usize) -> Result { + Ok(self.slice(start.from_start(self.len()), length)) } } @@ -199,7 +199,7 @@ impl<'a, R: 'static + ChunkReader> RowGroupReader for SerializedRowGroupReader<' let col_length = col.compressed_size(); let file_chunk = self .chunk_reader - .get_read(col_start as u64, col_length as usize)?; + .get_read(ChunkMode::FromStart(col_start as u64), col_length as usize)?; let page_reader = SerializedPageReader::new( file_chunk, col.num_values(), diff --git a/rust/parquet/src/util/cursor.rs b/rust/parquet/src/util/cursor.rs index ae3c8ddc2ef..3e5c07b58ea 100644 --- a/rust/parquet/src/util/cursor.rs +++ b/rust/parquet/src/util/cursor.rs @@ -16,7 +16,7 @@ // under the License. use std::cmp; -use std::io::{self, Error, ErrorKind, Read}; +use std::io::{self, Read}; use std::rc::Rc; /// This is object to use if your file is already in memory. @@ -42,20 +42,21 @@ impl SliceableCursor { } } - /// Create a slice cursor using the same data as a current one. - pub fn slice(&self, start: u64, length: usize) -> io::Result { - let new_start = self.start + start; - if new_start >= self.inner.len() as u64 - || new_start as usize + length > self.inner.len() - { - return Err(Error::new(ErrorKind::InvalidInput, "out of bound")); + /// Create a slice cursor backed by the same data as a current one. + /// If the slice length is larger than the remaining bytes in the source, the slice is clamped. + /// Panics if start is larger than the vector size. + pub fn slice(&self, start: u64, length: usize) -> Self { + if start > self.length as u64 { + panic!("Slice start larger than cursor"); } - Ok(SliceableCursor { + let absolute_start = self.start + start; + let clamped_length = std::cmp::min(length, self.length - start as usize); + SliceableCursor { inner: Rc::clone(&self.inner), - start: new_start, - pos: new_start, - length, - }) + start: absolute_start, + pos: absolute_start, + length: clamped_length, + } } fn remaining_slice(&self) -> &[u8] { @@ -107,7 +108,19 @@ mod tests { #[test] fn read_all_slice() { - let cursor = get_u8_range().slice(10, 10).expect("error while slicing"); + let cursor = get_u8_range().slice(10, 10); check_read_all(cursor, 10, 19); } + + #[test] + fn read_all_clipped_slice() { + let cursor = get_u8_range().slice(250, 10); + check_read_all(cursor, 250, 255); + } + + #[test] + fn chaining_slices() { + let cursor = get_u8_range().slice(200, 50).slice(10, 10); + check_read_all(cursor, 210, 219); + } } diff --git a/rust/parquet/src/util/io.rs b/rust/parquet/src/util/io.rs index 223b8d5e676..2211a6995d4 100644 --- a/rust/parquet/src/util/io.rs +++ b/rust/parquet/src/util/io.rs @@ -57,6 +57,7 @@ pub trait Position { pub struct FileSource { reader: RefCell, start: u64, // start position in a file + pos: u64, // current position in a file end: u64, // end position in a file buf: Vec, // buffer where bytes read in advance are stored buf_pos: usize, // current position of the reader in the buffer @@ -65,12 +66,19 @@ pub struct FileSource { impl FileSource { /// Creates new file reader with start and length from a file handle + /// If length is larger than the remaining bytes in the source, it is clamped. + /// Panics if start is larger than the file size pub fn new(fd: &R, start: u64, length: usize) -> Self { let reader = RefCell::new(fd.try_clone().unwrap()); + if start > fd.len() as u64 { + panic!("Slice start larger than file"); + } + let length = std::cmp::min(length as u64, fd.len() - start); Self { reader, start, - end: start + length as u64, + pos: start, + end: start + length, buf: vec![0 as u8; DEFAULT_BUF_SIZE], buf_pos: 0, buf_cap: 0, @@ -85,7 +93,7 @@ impl FileSource { // to tell the compiler that the pos..cap slice is always valid. debug_assert!(self.buf_pos == self.buf_cap); let mut reader = self.reader.borrow_mut(); - reader.seek(SeekFrom::Start(self.start))?; // always seek to start before reading + reader.seek(SeekFrom::Start(self.pos))?; // always seek to start before reading self.buf_cap = reader.read(&mut self.buf)?; self.buf_pos = 0; } @@ -98,16 +106,16 @@ impl FileSource { self.buf_cap = 0; // read directly into param buffer let mut reader = self.reader.borrow_mut(); - reader.seek(SeekFrom::Start(self.start))?; // always seek to start before reading + reader.seek(SeekFrom::Start(self.pos))?; // always seek to start before reading let nread = reader.read(buf)?; - self.start += nread as u64; + self.pos += nread as u64; Ok(nread) } } impl Read for FileSource { fn read(&mut self, buf: &mut [u8]) -> Result { - let bytes_to_read = cmp::min(buf.len(), (self.end - self.start) as usize); + let bytes_to_read = cmp::min(buf.len(), (self.end - self.pos) as usize); let buf = &mut buf[0..bytes_to_read]; // If we don't have any buffered data and we're doing a massive read @@ -124,14 +132,14 @@ impl Read for FileSource { // consume from buffer self.buf_pos = cmp::min(self.buf_pos + nread, self.buf_cap); - self.start += nread as u64; + self.pos += nread as u64; Ok(nread) } } impl Position for FileSource { fn pos(&self) -> u64 { - self.start + self.pos } } @@ -243,6 +251,19 @@ mod tests { assert_eq!(src.pos(), 4); } + #[test] + fn test_io_slice_over_limit() { + let mut buf = vec![0; 8]; + let file = get_test_file("alltypes_plain.parquet"); + // read the footer + let mut src = FileSource::new(&file, file.len() - 4, 10); + assert_eq!(src.len(), 4); + + let bytes_read = src.read(&mut buf[..]).unwrap(); + assert_eq!(bytes_read, 4); + assert_eq!(buf, vec![b'P', b'A', b'R', b'1', 0, 0, 0, 0]); + } + #[test] fn test_io_seek_switch() { let mut buf = vec![0; 4];