Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 34 additions & 26 deletions rust/parquet/src/file/footer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
// under the License.

use std::{
cmp::min,
io::{Cursor, Read, Seek, SeekFrom},
rc::Rc,
};
Expand All @@ -29,8 +28,9 @@ use crate::basic::ColumnOrder;

use crate::errors::{ParquetError, Result};
use crate::file::{
metadata::*, reader::ChunkReader, DEFAULT_FOOTER_READ_SIZE, FOOTER_SIZE,
PARQUET_MAGIC,
metadata::*,
reader::{ChunkMode, ChunkReader, Length},
DEFAULT_FOOTER_READ_SIZE, FOOTER_SIZE, PARQUET_MAGIC,
};

use crate::schema::types::{self, SchemaDescriptor};
Expand All @@ -44,30 +44,31 @@ use crate::schema::types::{self, SchemaDescriptor};
/// The reader first reads DEFAULT_FOOTER_SIZE bytes from the end of the file.
/// If it is not enough according to the length indicated in the footer, it reads more bytes.
pub fn parse_metadata<R: ChunkReader>(chunk_reader: &R) -> Result<ParquetMetaData> {
// check file is large enough to hold footer
let file_size = chunk_reader.len();
if file_size < (FOOTER_SIZE as u64) {
// read and cache up to DEFAULT_FOOTER_READ_SIZE bytes from the end and process the footer
let mut first_end_read = chunk_reader.get_read(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's weird that we are handling the "chunked read" logic in the footer reader - should this be inside the ChunkReader implementation? e.g., the reader itself should do lazy loading on the input stream based on how application decide to seek & read the data.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a good point. I copied this logic from the C++ implem but it's not great in terms of seperation of concerns. If I understand you point:

  • here we should request a reader with get_read(ChunkMode::FromEnd(FOOTER_SIZE))
  • it should be up to the ChunkReader implem to buffer extra bytes if it founds that FOOTER_SIZE is too small and it is cheap for him to get more bytes (and for instance the S3 ChunkReader will get 16kB instead)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes exactly, I feel the footer reader should not be aware of how the input stream is processed and also the logic can vary depending on the remote storage so the DEFAULT_FOOTER_READ_SIZE may not fit for all.

ChunkMode::FromEnd(DEFAULT_FOOTER_READ_SIZE as u64),
DEFAULT_FOOTER_READ_SIZE,
)?;
let first_end_len = first_end_read.len() as usize;

if first_end_len < FOOTER_SIZE {
return Err(general_err!(
"Invalid Parquet file. Size is smaller than footer"
));
}

// read and cache up to DEFAULT_FOOTER_READ_SIZE bytes from the end and process the footer
let default_end_len = min(DEFAULT_FOOTER_READ_SIZE, chunk_reader.len() as usize);
let mut default_end_reader = chunk_reader
.get_read(chunk_reader.len() - default_end_len as u64, default_end_len)?;
let mut default_len_end_buf = vec![0; default_end_len];
default_end_reader.read_exact(&mut default_len_end_buf)?;
let mut first_len_end_buf = vec![0; first_end_len];
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: perhaps a better name first_len_end_buf is confusing.

first_end_read.read_exact(&mut first_len_end_buf)?;

// check this is indeed a parquet file
if default_len_end_buf[default_end_len - 4..] != PARQUET_MAGIC {
if first_len_end_buf[first_end_len - 4..] != PARQUET_MAGIC {
return Err(general_err!("Invalid Parquet file. Corrupt footer"));
}

// get the metadata length from the footer
let metadata_len = LittleEndian::read_i32(
&default_len_end_buf[default_end_len - 8..default_end_len - 4],
) as i64;
let metadata_len =
LittleEndian::read_i32(&first_len_end_buf[first_end_len - 8..first_end_len - 4])
as i64;
if metadata_len < 0 {
return Err(general_err!(
"Invalid Parquet file. Metadata length is less than zero ({})",
Expand All @@ -77,24 +78,31 @@ pub fn parse_metadata<R: ChunkReader>(chunk_reader: &R) -> Result<ParquetMetaDat
let footer_metadata_len = FOOTER_SIZE + metadata_len as usize;

// build up the reader covering the entire metadata
let mut default_end_cursor = Cursor::new(default_len_end_buf);
let mut first_end_cursor = Cursor::new(first_len_end_buf);
let metadata_read: Box<dyn Read>;
if footer_metadata_len > file_size as usize {
if first_end_len < DEFAULT_FOOTER_READ_SIZE
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

to check my understanding -- this branch is checking if the total file was smaller than DEFAULT_FOOTER_READ_SIZE (b/c it was all read in a single chunk) and the metadata size claims to be larger than this chunk

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

exactly, I should add a comment to explicit this

&& footer_metadata_len > first_end_len as usize
{
return Err(general_err!(
"Invalid Parquet file. Metadata start is less than zero ({})",
file_size as i64 - footer_metadata_len as i64
"Invalid Parquet file. Metadata size exceeds file size."
));
} else if footer_metadata_len < DEFAULT_FOOTER_READ_SIZE {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should this be <=?

// the whole metadata is in the bytes we already read
default_end_cursor.seek(SeekFrom::End(-(footer_metadata_len as i64)))?;
metadata_read = Box::new(default_end_cursor);
first_end_cursor.seek(SeekFrom::End(-(footer_metadata_len as i64)))?;
metadata_read = Box::new(first_end_cursor);
} else {
// the end of file read by default is not long enough, read missing bytes
let complementary_end_len = FOOTER_SIZE + metadata_len as usize - first_end_len;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: reuse footer_metadata_len

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good idea

let complementary_end_read = chunk_reader.get_read(
file_size - footer_metadata_len as u64,
FOOTER_SIZE + metadata_len as usize - default_end_len,
ChunkMode::FromEnd(footer_metadata_len as u64),
complementary_end_len,
)?;
metadata_read = Box::new(complementary_end_read.chain(default_end_cursor));
if complementary_end_read.len() < complementary_end_len as u64 {
return Err(general_err!(
"Invalid Parquet file. Metadata size exceeds file size."
));
}
metadata_read = Box::new(complementary_end_read.chain(first_end_cursor));
}

// TODO: row group filtering
Expand Down Expand Up @@ -207,7 +215,7 @@ mod tests {
assert!(reader_result.is_err());
assert_eq!(
reader_result.err().unwrap(),
general_err!("Invalid Parquet file. Metadata start is less than zero (-255)")
general_err!("Invalid Parquet file. Metadata size exceeds file size.")
);
}

Expand Down
27 changes: 22 additions & 5 deletions rust/parquet/src/file/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,22 @@ use crate::basic::Type;

use crate::column::reader::ColumnReaderImpl;

/// Parquet files must be read from end for the footer then from start for columns
pub enum ChunkMode {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One suggestion I have is to call this ReadFrom rather than ChunkModeto make the name more specific.

Alternatively, given its similarity, I wonder if it would make sense to use std::io::SeekFrom here https://doc.rust-lang.org/std/io/enum.SeekFrom.html rather than a custom enum

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

std::io::SeekFrom is similar but it has also the SeekFrom::Current state that we are not interested in.
About the name, I agree that ChunkMode is not ideal, but I wanted it to explicitly relate to ChunkReader. What about the more verbose but probably also more explicit ReadChunkFrom ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thinkReadChunkFrom is reasonable. Or maybe ChunkFrom ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good idea

FromStart(u64),
FromEnd(u64),
}

impl ChunkMode {
/// FromStart offset can always be computed if you know the length
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pls update to make it a more proper doc for the method, e.g., what should the caller pass for len?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good idea

pub fn from_start(&self, len: u64) -> u64 {
match self {
ChunkMode::FromStart(start_offset) => *start_offset,
ChunkMode::FromEnd(end_offset) => len.saturating_sub(*end_offset),
}
}
}

/// Length should return the total number of bytes in the input source.
/// It's mainly used to read the metadata, which is at the end of the source.
#[allow(clippy::len_without_is_empty)]
Expand All @@ -43,11 +59,12 @@ pub trait Length {
/// The ChunkReader trait generates readers of chunks of a source.
/// For a file system reader, each chunk might contain a clone of File bounded on a given range.
/// For an object store reader, each read can be mapped to a range request.
pub trait ChunkReader: Length {
type T: Read;
/// get a serialy readeable slice of the current reader
/// This should fail if the slice exceeds the current bounds
fn get_read(&self, start: u64, length: usize) -> Result<Self::T>;
pub trait ChunkReader {
type T: Read + Length;
/// Get a serialy readeable slice of the current reader.
/// If one end of the slice exceeds the bounds of the source, the slice will be clamped to the source.
/// In that case, the length of the resulting Read will be smaller than the requested length.
fn get_read(&self, start: ChunkMode, length: usize) -> Result<Self::T>;
}

// ----------------------------------------------------------------------
Expand Down
10 changes: 5 additions & 5 deletions rust/parquet/src/file/serialized_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,8 @@ impl TryClone for File {
impl ChunkReader for File {
type T = FileSource<File>;

fn get_read(&self, start: u64, length: usize) -> Result<Self::T> {
Ok(FileSource::new(self, start, length))
fn get_read(&self, start: ChunkMode, length: usize) -> Result<Self::T> {
Ok(FileSource::new(self, start.from_start(self.len()), length))
}
}

Expand All @@ -69,8 +69,8 @@ impl Length for SliceableCursor {
impl ChunkReader for SliceableCursor {
type T = SliceableCursor;

fn get_read(&self, start: u64, length: usize) -> Result<Self::T> {
self.slice(start, length).map_err(|e| e.into())
fn get_read(&self, start: ChunkMode, length: usize) -> Result<Self::T> {
Ok(self.slice(start.from_start(self.len()), length))
}
}

Expand Down Expand Up @@ -199,7 +199,7 @@ impl<'a, R: 'static + ChunkReader> RowGroupReader for SerializedRowGroupReader<'
let col_length = col.compressed_size();
let file_chunk = self
.chunk_reader
.get_read(col_start as u64, col_length as usize)?;
.get_read(ChunkMode::FromStart(col_start as u64), col_length as usize)?;
let page_reader = SerializedPageReader::new(
file_chunk,
col.num_values(),
Expand Down
41 changes: 27 additions & 14 deletions rust/parquet/src/util/cursor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
// under the License.

use std::cmp;
use std::io::{self, Error, ErrorKind, Read};
use std::io::{self, Read};
use std::rc::Rc;

/// This is object to use if your file is already in memory.
Expand All @@ -42,20 +42,21 @@ impl SliceableCursor {
}
}

/// Create a slice cursor using the same data as a current one.
pub fn slice(&self, start: u64, length: usize) -> io::Result<Self> {
let new_start = self.start + start;
if new_start >= self.inner.len() as u64
|| new_start as usize + length > self.inner.len()
{
return Err(Error::new(ErrorKind::InvalidInput, "out of bound"));
/// Create a slice cursor backed by the same data as a current one.
/// If the slice length is larger than the remaining bytes in the source, the slice is clamped.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we replace clamp with something else more plain? like "shorten"? as a non-native English speaker, I had to check dictionary for the meaning of this word :)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like the word but you are right 😄

/// Panics if start is larger than the vector size.
pub fn slice(&self, start: u64, length: usize) -> Self {
if start > self.length as u64 {
panic!("Slice start larger than cursor");
}
Ok(SliceableCursor {
let absolute_start = self.start + start;
let clamped_length = std::cmp::min(length, self.length - start as usize);
SliceableCursor {
inner: Rc::clone(&self.inner),
start: new_start,
pos: new_start,
length,
})
start: absolute_start,
pos: absolute_start,
length: clamped_length,
}
}

fn remaining_slice(&self) -> &[u8] {
Expand Down Expand Up @@ -107,7 +108,19 @@ mod tests {

#[test]
fn read_all_slice() {
let cursor = get_u8_range().slice(10, 10).expect("error while slicing");
let cursor = get_u8_range().slice(10, 10);
check_read_all(cursor, 10, 19);
}

#[test]
fn read_all_clipped_slice() {
let cursor = get_u8_range().slice(250, 10);
check_read_all(cursor, 250, 255);
}

#[test]
fn chaining_slices() {
let cursor = get_u8_range().slice(200, 50).slice(10, 10);
check_read_all(cursor, 210, 219);
}
}
35 changes: 28 additions & 7 deletions rust/parquet/src/util/io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ pub trait Position {
pub struct FileSource<R: ParquetReader> {
reader: RefCell<R>,
start: u64, // start position in a file
pos: u64, // current position in a file
end: u64, // end position in a file
buf: Vec<u8>, // buffer where bytes read in advance are stored
buf_pos: usize, // current position of the reader in the buffer
Expand All @@ -65,12 +66,19 @@ pub struct FileSource<R: ParquetReader> {

impl<R: ParquetReader> FileSource<R> {
/// Creates new file reader with start and length from a file handle
/// If length is larger than the remaining bytes in the source, it is clamped.
/// Panics if start is larger than the file size
pub fn new(fd: &R, start: u64, length: usize) -> Self {
let reader = RefCell::new(fd.try_clone().unwrap());
if start > fd.len() as u64 {
panic!("Slice start larger than file");
}
let length = std::cmp::min(length as u64, fd.len() - start);
Self {
reader,
start,
end: start + length as u64,
pos: start,
end: start + length,
buf: vec![0 as u8; DEFAULT_BUF_SIZE],
buf_pos: 0,
buf_cap: 0,
Expand All @@ -85,7 +93,7 @@ impl<R: ParquetReader> FileSource<R> {
// to tell the compiler that the pos..cap slice is always valid.
debug_assert!(self.buf_pos == self.buf_cap);
let mut reader = self.reader.borrow_mut();
reader.seek(SeekFrom::Start(self.start))?; // always seek to start before reading
reader.seek(SeekFrom::Start(self.pos))?; // always seek to start before reading
self.buf_cap = reader.read(&mut self.buf)?;
self.buf_pos = 0;
}
Expand All @@ -98,16 +106,16 @@ impl<R: ParquetReader> FileSource<R> {
self.buf_cap = 0;
// read directly into param buffer
let mut reader = self.reader.borrow_mut();
reader.seek(SeekFrom::Start(self.start))?; // always seek to start before reading
reader.seek(SeekFrom::Start(self.pos))?; // always seek to start before reading
let nread = reader.read(buf)?;
self.start += nread as u64;
self.pos += nread as u64;
Ok(nread)
}
}

impl<R: ParquetReader> Read for FileSource<R> {
fn read(&mut self, buf: &mut [u8]) -> Result<usize> {
let bytes_to_read = cmp::min(buf.len(), (self.end - self.start) as usize);
let bytes_to_read = cmp::min(buf.len(), (self.end - self.pos) as usize);
let buf = &mut buf[0..bytes_to_read];

// If we don't have any buffered data and we're doing a massive read
Expand All @@ -124,14 +132,14 @@ impl<R: ParquetReader> Read for FileSource<R> {
// consume from buffer
self.buf_pos = cmp::min(self.buf_pos + nread, self.buf_cap);

self.start += nread as u64;
self.pos += nread as u64;
Ok(nread)
}
}

impl<R: ParquetReader> Position for FileSource<R> {
fn pos(&self) -> u64 {
self.start
self.pos
}
}

Expand Down Expand Up @@ -243,6 +251,19 @@ mod tests {
assert_eq!(src.pos(), 4);
}

#[test]
fn test_io_slice_over_limit() {
let mut buf = vec![0; 8];
let file = get_test_file("alltypes_plain.parquet");
// read the footer
let mut src = FileSource::new(&file, file.len() - 4, 10);
assert_eq!(src.len(), 4);

let bytes_read = src.read(&mut buf[..]).unwrap();
assert_eq!(bytes_read, 4);
assert_eq!(buf, vec![b'P', b'A', b'R', b'1', 0, 0, 0, 0]);
}

#[test]
fn test_io_seek_switch() {
let mut buf = vec![0; 4];
Expand Down