From 15d5fff8bb288f4155f733f424a35bcc671195b3 Mon Sep 17 00:00:00 2001 From: alamb Date: Mon, 26 Oct 2020 16:00:09 -0400 Subject: [PATCH 1/3] ARROW-10396: [Rust][Parquet] Publically exort SliceableCursor and FileSource, Add Seek and Debug --- rust/parquet/src/file/serialized_reader.rs | 10 +-- rust/parquet/src/util/cursor.rs | 95 +++++++++++++++++++++- rust/parquet/src/util/io.rs | 15 +++- 3 files changed, 111 insertions(+), 9 deletions(-) diff --git a/rust/parquet/src/file/serialized_reader.rs b/rust/parquet/src/file/serialized_reader.rs index 82997005e76..220970d3c1e 100644 --- a/rust/parquet/src/file/serialized_reader.rs +++ b/rust/parquet/src/file/serialized_reader.rs @@ -31,11 +31,11 @@ use crate::file::{footer, metadata::*, reader::*, statistics}; use crate::record::reader::RowIter; use crate::record::Row; use crate::schema::types::Type as SchemaType; -use crate::util::{ - cursor::SliceableCursor, - io::{FileSource, TryClone}, - memory::ByteBufferPtr, -}; +use crate::util::{io::TryClone, memory::ByteBufferPtr}; + +// export `SliceableCursor` and `FileSource` publically so clients can +// re-use the logic in their own ParquetFileWriter wrappers +pub use crate::util::{cursor::SliceableCursor, io::FileSource}; // ---------------------------------------------------------------------- // Implementations of traits facilitating the creation of a new reader diff --git a/rust/parquet/src/util/cursor.rs b/rust/parquet/src/util/cursor.rs index ae3c8ddc2ef..6762447255b 100644 --- a/rust/parquet/src/util/cursor.rs +++ b/rust/parquet/src/util/cursor.rs @@ -15,9 +15,9 @@ // specific language governing permissions and limitations // under the License. -use std::cmp; -use std::io::{self, Error, ErrorKind, Read}; +use std::io::{self, Error, ErrorKind, Read, Seek, SeekFrom}; use std::rc::Rc; +use std::{cmp, fmt}; /// This is object to use if your file is already in memory. /// The sliceable cursor is similar to std::io::Cursor, except that it makes it easy to create "cursor slices". @@ -25,10 +25,21 @@ use std::rc::Rc; /// because the lack of Generic Associated Type implies that you would require complex lifetime propagation when /// returning such a cursor. pub struct SliceableCursor { - inner: Rc>, start: u64, length: usize, pos: u64, + inner: Rc>, +} + +impl fmt::Debug for SliceableCursor { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("SliceableCursor") + .field("start", &self.start) + .field("length", &self.length) + .field("pos", &self.pos) + .field("inner.len", &self.inner.len()) + .finish() + } } impl SliceableCursor { @@ -79,6 +90,40 @@ impl Read for SliceableCursor { } } +impl Seek for SliceableCursor { + fn seek(&mut self, pos: SeekFrom) -> io::Result { + let new_pos = match pos { + SeekFrom::Start(pos) => pos as i64, + SeekFrom::End(pos) => self.inner.len() as i64 - 1 + pos as i64, + SeekFrom::Current(pos) => self.pos as i64 + pos as i64, + }; + + if new_pos < 0 { + Err(Error::new( + ErrorKind::InvalidInput, + format!( + "Request out of bounds: cur position {} + seek {:?} < 0: {}", + self.pos, pos, new_pos + ), + )) + } else if new_pos >= self.inner.len() as i64 { + Err(Error::new( + ErrorKind::InvalidInput, + format!( + "Request out of bounds: cur position {} + seek {:?} >= length {}: {}", + self.pos, + pos, + self.inner.len(), + new_pos + ), + )) + } else { + self.pos = new_pos as u64; + Ok(self.start) + } + } +} + #[cfg(test)] mod tests { use super::*; @@ -110,4 +155,48 @@ mod tests { let cursor = get_u8_range().slice(10, 10).expect("error while slicing"); check_read_all(cursor, 10, 19); } + + #[test] + fn seek_cursor_start() { + let mut cursor = get_u8_range(); + + cursor.seek(SeekFrom::Start(5)).unwrap(); + check_read_all(cursor, 5, 255); + } + + #[test] + fn seek_cursor_current() { + let mut cursor = get_u8_range(); + cursor.seek(SeekFrom::Start(10)).unwrap(); + cursor.seek(SeekFrom::Current(10)).unwrap(); + check_read_all(cursor, 20, 255); + } + + #[test] + fn seek_cursor_end() { + let mut cursor = get_u8_range(); + + cursor.seek(SeekFrom::End(-10)).unwrap(); + check_read_all(cursor, 245, 255); + } + + #[test] + fn seek_cursor_error_too_long() { + let mut cursor = get_u8_range(); + let res = cursor.seek(SeekFrom::Start(1000)); + let actual_error = res.expect_err("expected error").to_string(); + let expected_error = + "Request out of bounds: cur position 0 + seek Start(1000) >= length 256: 1000"; + assert_eq!(actual_error, expected_error); + } + + #[test] + fn seek_cursor_error_too_short() { + let mut cursor = get_u8_range(); + let res = cursor.seek(SeekFrom::End(-1000)); + let actual_error = res.expect_err("expected error").to_string(); + let expected_error = + "Request out of bounds: cur position 0 + seek End(-1000) < 0: -745"; + assert_eq!(actual_error, expected_error); + } } diff --git a/rust/parquet/src/util/io.rs b/rust/parquet/src/util/io.rs index 223b8d5e676..1403de0f529 100644 --- a/rust/parquet/src/util/io.rs +++ b/rust/parquet/src/util/io.rs @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -use std::{cell::RefCell, cmp, io::*}; +use std::{cell::RefCell, cmp, fmt, io::*}; use crate::file::{reader::Length, writer::ParquetWriter}; @@ -63,6 +63,19 @@ pub struct FileSource { buf_cap: usize, // current number of bytes read into the buffer } +impl fmt::Debug for FileSource { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("FileSource") + .field("reader", &"OPAQUE") + .field("start", &self.start) + .field("end", &self.end) + .field("buf.len", &self.buf.len()) + .field("buf_pos", &self.buf_pos) + .field("buf_cap", &self.buf_cap) + .finish() + } +} + impl FileSource { /// Creates new file reader with start and length from a file handle pub fn new(fd: &R, start: u64, length: usize) -> Self { From 7b7ac97796c9eb08b8870e997e252233ee691652 Mon Sep 17 00:00:00 2001 From: alamb Date: Tue, 27 Oct 2020 06:54:27 -0400 Subject: [PATCH 2/3] Restore field order --- rust/parquet/src/util/cursor.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rust/parquet/src/util/cursor.rs b/rust/parquet/src/util/cursor.rs index 6762447255b..b22749952a1 100644 --- a/rust/parquet/src/util/cursor.rs +++ b/rust/parquet/src/util/cursor.rs @@ -25,10 +25,10 @@ use std::{cmp, fmt}; /// because the lack of Generic Associated Type implies that you would require complex lifetime propagation when /// returning such a cursor. pub struct SliceableCursor { + inner: Rc>, start: u64, length: usize, pos: u64, - inner: Rc>, } impl fmt::Debug for SliceableCursor { From 8c9f1b56f2c3e89379c560fde4a15cfb4ae918c6 Mon Sep 17 00:00:00 2001 From: alamb Date: Tue, 27 Oct 2020 07:25:54 -0400 Subject: [PATCH 3/3] Correct SeekFrom::End implementation --- rust/parquet/src/util/cursor.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/rust/parquet/src/util/cursor.rs b/rust/parquet/src/util/cursor.rs index b22749952a1..0ada2ef05de 100644 --- a/rust/parquet/src/util/cursor.rs +++ b/rust/parquet/src/util/cursor.rs @@ -94,7 +94,7 @@ impl Seek for SliceableCursor { fn seek(&mut self, pos: SeekFrom) -> io::Result { let new_pos = match pos { SeekFrom::Start(pos) => pos as i64, - SeekFrom::End(pos) => self.inner.len() as i64 - 1 + pos as i64, + SeekFrom::End(pos) => self.inner.len() as i64 + pos as i64, SeekFrom::Current(pos) => self.pos as i64 + pos as i64, }; @@ -177,7 +177,7 @@ mod tests { let mut cursor = get_u8_range(); cursor.seek(SeekFrom::End(-10)).unwrap(); - check_read_all(cursor, 245, 255); + check_read_all(cursor, 246, 255); } #[test] @@ -196,7 +196,7 @@ mod tests { let res = cursor.seek(SeekFrom::End(-1000)); let actual_error = res.expect_err("expected error").to_string(); let expected_error = - "Request out of bounds: cur position 0 + seek End(-1000) < 0: -745"; + "Request out of bounds: cur position 0 + seek End(-1000) < 0: -744"; assert_eq!(actual_error, expected_error); } }