Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions rust/parquet/src/file/serialized_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,11 @@ use crate::file::{footer, metadata::*, reader::*, statistics};
use crate::record::reader::RowIter;
use crate::record::Row;
use crate::schema::types::Type as SchemaType;
use crate::util::{
cursor::SliceableCursor,
io::{FileSource, TryClone},
memory::ByteBufferPtr,
};
use crate::util::{io::TryClone, memory::ByteBufferPtr};

// export `SliceableCursor` and `FileSource` publically so clients can
// re-use the logic in their own ParquetFileWriter wrappers
pub use crate::util::{cursor::SliceableCursor, io::FileSource};

// ----------------------------------------------------------------------
// Implementations of traits facilitating the creation of a new reader
Expand Down
93 changes: 91 additions & 2 deletions rust/parquet/src/util/cursor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@
// specific language governing permissions and limitations
// under the License.

use std::cmp;
use std::io::{self, Error, ErrorKind, Read};
use std::io::{self, Error, ErrorKind, Read, Seek, SeekFrom};
use std::rc::Rc;
use std::{cmp, fmt};

/// This is object to use if your file is already in memory.
/// The sliceable cursor is similar to std::io::Cursor, except that it makes it easy to create "cursor slices".
Expand All @@ -31,6 +31,17 @@ pub struct SliceableCursor {
pos: u64,
}

impl fmt::Debug for SliceableCursor {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I used this formulation rather than [#derive(Debug)] to avoid getting the buffer contents dumped which can be lots of data to sort through.

fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("SliceableCursor")
.field("start", &self.start)
.field("length", &self.length)
.field("pos", &self.pos)
.field("inner.len", &self.inner.len())
.finish()
}
}

impl SliceableCursor {
pub fn new(content: Vec<u8>) -> Self {
let size = content.len();
Expand Down Expand Up @@ -79,6 +90,40 @@ impl Read for SliceableCursor {
}
}

impl Seek for SliceableCursor {
fn seek(&mut self, pos: SeekFrom) -> io::Result<u64> {
let new_pos = match pos {
SeekFrom::Start(pos) => pos as i64,
SeekFrom::End(pos) => self.inner.len() as i64 + pos as i64,
SeekFrom::Current(pos) => self.pos as i64 + pos as i64,
};

if new_pos < 0 {
Err(Error::new(
ErrorKind::InvalidInput,
format!(
"Request out of bounds: cur position {} + seek {:?} < 0: {}",
self.pos, pos, new_pos
),
))
} else if new_pos >= self.inner.len() as i64 {
Err(Error::new(
ErrorKind::InvalidInput,
format!(
"Request out of bounds: cur position {} + seek {:?} >= length {}: {}",
self.pos,
pos,
self.inner.len(),
new_pos
),
))
} else {
self.pos = new_pos as u64;
Ok(self.start)
}
}
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down Expand Up @@ -110,4 +155,48 @@ mod tests {
let cursor = get_u8_range().slice(10, 10).expect("error while slicing");
check_read_all(cursor, 10, 19);
}

#[test]
fn seek_cursor_start() {
let mut cursor = get_u8_range();

cursor.seek(SeekFrom::Start(5)).unwrap();
check_read_all(cursor, 5, 255);
}

#[test]
fn seek_cursor_current() {
let mut cursor = get_u8_range();
cursor.seek(SeekFrom::Start(10)).unwrap();
cursor.seek(SeekFrom::Current(10)).unwrap();
check_read_all(cursor, 20, 255);
}

#[test]
fn seek_cursor_end() {
let mut cursor = get_u8_range();

cursor.seek(SeekFrom::End(-10)).unwrap();
check_read_all(cursor, 246, 255);
}

#[test]
fn seek_cursor_error_too_long() {
let mut cursor = get_u8_range();
let res = cursor.seek(SeekFrom::Start(1000));
let actual_error = res.expect_err("expected error").to_string();
let expected_error =
"Request out of bounds: cur position 0 + seek Start(1000) >= length 256: 1000";
assert_eq!(actual_error, expected_error);
}

#[test]
fn seek_cursor_error_too_short() {
let mut cursor = get_u8_range();
let res = cursor.seek(SeekFrom::End(-1000));
let actual_error = res.expect_err("expected error").to_string();
let expected_error =
"Request out of bounds: cur position 0 + seek End(-1000) < 0: -744";
assert_eq!(actual_error, expected_error);
}
}
15 changes: 14 additions & 1 deletion rust/parquet/src/util/io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.

use std::{cell::RefCell, cmp, io::*};
use std::{cell::RefCell, cmp, fmt, io::*};

use crate::file::{reader::Length, writer::ParquetWriter};

Expand Down Expand Up @@ -63,6 +63,19 @@ pub struct FileSource<R: ParquetReader> {
buf_cap: usize, // current number of bytes read into the buffer
}

impl<R: ParquetReader> fmt::Debug for FileSource<R> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("FileSource")
.field("reader", &"OPAQUE")
.field("start", &self.start)
.field("end", &self.end)
.field("buf.len", &self.buf.len())
.field("buf_pos", &self.buf_pos)
.field("buf_cap", &self.buf_cap)
.finish()
}
}

impl<R: ParquetReader> FileSource<R> {
/// Creates new file reader with start and length from a file handle
pub fn new(fd: &R, start: u64, length: usize) -> Self {
Expand Down