Skip to content
16 changes: 8 additions & 8 deletions rust/parquet/src/arrow/array_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -953,7 +953,7 @@ mod tests {
use std::rc::Rc;
use std::sync::Arc;

fn make_column_chuncks<T: DataType>(
fn make_column_chunks<T: DataType>(
column_desc: ColumnDescPtr,
encoding: Encoding,
num_levels: usize,
Expand All @@ -964,11 +964,11 @@ mod tests {
values: &mut Vec<T::T>,
page_lists: &mut Vec<Vec<Page>>,
use_v2: bool,
num_chuncks: usize,
num_chunks: usize,
) where
T::T: PartialOrd + SampleUniform + Copy,
{
for _i in 0..num_chuncks {
for _i in 0..num_chunks {
let mut pages = VecDeque::new();
let mut data = Vec::new();
let mut page_def_levels = Vec::new();
Expand Down Expand Up @@ -1039,7 +1039,7 @@ mod tests {
{
let mut data = Vec::new();
let mut page_lists = Vec::new();
make_column_chuncks::<Int32Type>(
make_column_chunks::<Int32Type>(
column_desc.clone(),
Encoding::PLAIN,
100,
Expand All @@ -1061,7 +1061,7 @@ mod tests {
)
.unwrap();

// Read first 50 values, which are all from the first column chunck
// Read first 50 values, which are all from the first column chunk
let array = array_reader.next_batch(50).unwrap();
let array = array
.as_any()
Expand Down Expand Up @@ -1120,7 +1120,7 @@ mod tests {
{
let mut data = Vec::new();
let mut page_lists = Vec::new();
make_column_chuncks::<$arrow_parquet_type>(
make_column_chunks::<$arrow_parquet_type>(
column_desc.clone(),
Encoding::PLAIN,
100,
Expand Down Expand Up @@ -1225,7 +1225,7 @@ mod tests {
let mut def_levels = Vec::new();
let mut rep_levels = Vec::new();
let mut page_lists = Vec::new();
make_column_chuncks::<Int32Type>(
make_column_chunks::<Int32Type>(
column_desc.clone(),
Encoding::PLAIN,
100,
Expand All @@ -1250,7 +1250,7 @@ mod tests {

let mut accu_len: usize = 0;

// Read first 50 values, which are all from the first column chunck
// Read first 50 values, which are all from the first column chunk
let array = array_reader.next_batch(50).unwrap();
assert_eq!(
Some(&def_levels[accu_len..(accu_len + array.len())]),
Expand Down
263 changes: 263 additions & 0 deletions rust/parquet/src/file/footer.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,263 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use std::{
cmp::min,
io::{Cursor, Read, Seek, SeekFrom},
rc::Rc,
};

use byteorder::{ByteOrder, LittleEndian};
use parquet_format::{ColumnOrder as TColumnOrder, FileMetaData as TFileMetaData};
use thrift::protocol::TCompactInputProtocol;

use crate::basic::ColumnOrder;

use crate::errors::{ParquetError, Result};
use crate::file::{
metadata::*, reader::ChunkReader, DEFAULT_FOOTER_READ_SIZE, FOOTER_SIZE,
PARQUET_MAGIC,
};

use crate::schema::types::{self, SchemaDescriptor};

/// Layout of Parquet file
/// +---------------------------+-----+---+
/// | Rest of file | B | A |
/// +---------------------------+-----+---+
/// where A: parquet footer, B: parquet metadata.
///
/// The reader first reads DEFAULT_FOOTER_SIZE bytes from the end of the file.
/// If it is not enough according to the length indicated in the footer, it reads more bytes.
pub fn parse_metadata<R: ChunkReader>(chunk_reader: &R) -> Result<ParquetMetaData> {
// check file is large enough to hold footer
let file_size = chunk_reader.len();
if file_size < (FOOTER_SIZE as u64) {
return Err(general_err!(
"Invalid Parquet file. Size is smaller than footer"
));
}

// read and cache up to DEFAULT_FOOTER_READ_SIZE bytes from the end and process the footer
let default_end_len = min(DEFAULT_FOOTER_READ_SIZE, chunk_reader.len() as usize);
let mut default_end_reader = chunk_reader
.get_read(chunk_reader.len() - default_end_len as u64, default_end_len)?;
let mut default_len_end_buf = vec![0; default_end_len];
default_end_reader.read_exact(&mut default_len_end_buf)?;

// check this is indeed a parquet file
if default_len_end_buf[default_end_len - 4..] != PARQUET_MAGIC {
return Err(general_err!("Invalid Parquet file. Corrupt footer"));
}

// get the metadata length from the footer
let metadata_len = LittleEndian::read_i32(
&default_len_end_buf[default_end_len - 8..default_end_len - 4],
) as i64;
if metadata_len < 0 {
return Err(general_err!(
"Invalid Parquet file. Metadata length is less than zero ({})",
metadata_len
));
}
let footer_metadata_len = FOOTER_SIZE + metadata_len as usize;

// build up the reader covering the entire metadata
let mut default_end_cursor = Cursor::new(default_len_end_buf);
let metadata_read: Box<dyn Read>;
if footer_metadata_len > file_size as usize {
return Err(general_err!(
"Invalid Parquet file. Metadata start is less than zero ({})",
file_size as i64 - footer_metadata_len as i64
));
} else if footer_metadata_len < DEFAULT_FOOTER_READ_SIZE {
// the whole metadata is in the bytes we already read
default_end_cursor.seek(SeekFrom::End(-(footer_metadata_len as i64)))?;
metadata_read = Box::new(default_end_cursor);
} else {
// the end of file read by default is not long enough, read missing bytes
let complementary_end_read = chunk_reader.get_read(
file_size - footer_metadata_len as u64,
FOOTER_SIZE + metadata_len as usize - default_end_len,
)?;
metadata_read = Box::new(complementary_end_read.chain(default_end_cursor));
}

// TODO: row group filtering
let mut prot = TCompactInputProtocol::new(metadata_read);
let t_file_metadata: TFileMetaData = TFileMetaData::read_from_in_protocol(&mut prot)
.map_err(|e| ParquetError::General(format!("Could not parse metadata: {}", e)))?;
let schema = types::from_thrift(&t_file_metadata.schema)?;
let schema_descr = Rc::new(SchemaDescriptor::new(schema.clone()));
let mut row_groups = Vec::new();
for rg in t_file_metadata.row_groups {
row_groups.push(RowGroupMetaData::from_thrift(schema_descr.clone(), rg)?);
}
let column_orders = parse_column_orders(t_file_metadata.column_orders, &schema_descr);

let file_metadata = FileMetaData::new(
t_file_metadata.version,
t_file_metadata.num_rows,
t_file_metadata.created_by,
t_file_metadata.key_value_metadata,
schema,
schema_descr,
column_orders,
);
Ok(ParquetMetaData::new(file_metadata, row_groups))
}

/// Parses column orders from Thrift definition.
/// If no column orders are defined, returns `None`.
fn parse_column_orders(
t_column_orders: Option<Vec<TColumnOrder>>,
schema_descr: &SchemaDescriptor,
) -> Option<Vec<ColumnOrder>> {
match t_column_orders {
Some(orders) => {
// Should always be the case
assert_eq!(
orders.len(),
schema_descr.num_columns(),
"Column order length mismatch"
);
let mut res = Vec::new();
for (i, column) in schema_descr.columns().iter().enumerate() {
match orders[i] {
TColumnOrder::TYPEORDER(_) => {
let sort_order = ColumnOrder::get_sort_order(
column.logical_type(),
column.physical_type(),
);
res.push(ColumnOrder::TYPE_DEFINED_ORDER(sort_order));
}
}
}
Some(res)
}
None => None,
}
}

#[cfg(test)]
mod tests {
use super::*;

use crate::basic::SortOrder;
use crate::basic::Type;
use crate::schema::types::Type as SchemaType;
use crate::util::test_common::get_temp_file;
use parquet_format::TypeDefinedOrder;

#[test]
fn test_parse_metadata_size_smaller_than_footer() {
let test_file = get_temp_file("corrupt-1.parquet", &[]);
let reader_result = parse_metadata(&test_file);
assert!(reader_result.is_err());
assert_eq!(
reader_result.err().unwrap(),
general_err!("Invalid Parquet file. Size is smaller than footer")
);
}

#[test]
fn test_parse_metadata_corrupt_footer() {
let test_file = get_temp_file("corrupt-2.parquet", &[1, 2, 3, 4, 5, 6, 7, 8]);
let reader_result = parse_metadata(&test_file);
assert!(reader_result.is_err());
assert_eq!(
reader_result.err().unwrap(),
general_err!("Invalid Parquet file. Corrupt footer")
);
}

#[test]
fn test_parse_metadata_invalid_length() {
let test_file =
get_temp_file("corrupt-3.parquet", &[0, 0, 0, 255, b'P', b'A', b'R', b'1']);
let reader_result = parse_metadata(&test_file);
assert!(reader_result.is_err());
assert_eq!(
reader_result.err().unwrap(),
general_err!(
"Invalid Parquet file. Metadata length is less than zero (-16777216)"
)
);
}

#[test]
fn test_parse_metadata_invalid_start() {
let test_file =
get_temp_file("corrupt-4.parquet", &[255, 0, 0, 0, b'P', b'A', b'R', b'1']);
let reader_result = parse_metadata(&test_file);
assert!(reader_result.is_err());
assert_eq!(
reader_result.err().unwrap(),
general_err!("Invalid Parquet file. Metadata start is less than zero (-255)")
);
}

#[test]
fn test_metadata_column_orders_parse() {
// Define simple schema, we do not need to provide logical types.
let mut fields = vec![
Rc::new(
SchemaType::primitive_type_builder("col1", Type::INT32)
.build()
.unwrap(),
),
Rc::new(
SchemaType::primitive_type_builder("col2", Type::FLOAT)
.build()
.unwrap(),
),
];
let schema = SchemaType::group_type_builder("schema")
.with_fields(&mut fields)
.build()
.unwrap();
let schema_descr = SchemaDescriptor::new(Rc::new(schema));

let t_column_orders = Some(vec![
TColumnOrder::TYPEORDER(TypeDefinedOrder::new()),
TColumnOrder::TYPEORDER(TypeDefinedOrder::new()),
]);

assert_eq!(
parse_column_orders(t_column_orders, &schema_descr),
Some(vec![
ColumnOrder::TYPE_DEFINED_ORDER(SortOrder::SIGNED),
ColumnOrder::TYPE_DEFINED_ORDER(SortOrder::SIGNED)
])
);

// Test when no column orders are defined.
assert_eq!(parse_column_orders(None, &schema_descr), None);
}

#[test]
#[should_panic(expected = "Column order length mismatch")]
fn test_metadata_column_orders_len_mismatch() {
let schema = SchemaType::group_type_builder("schema").build().unwrap();
let schema_descr = SchemaDescriptor::new(Rc::new(schema));

let t_column_orders =
Some(vec![TColumnOrder::TYPEORDER(TypeDefinedOrder::new())]);

parse_column_orders(t_column_orders, &schema_descr);
}
}
5 changes: 5 additions & 0 deletions rust/parquet/src/file/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,11 +96,16 @@
//! println!("{}", row);
//! }
//! ```
pub mod footer;
pub mod metadata;
pub mod properties;
pub mod reader;
pub mod serialized_reader;
pub mod statistics;
pub mod writer;

const FOOTER_SIZE: usize = 8;
const PARQUET_MAGIC: [u8; 4] = [b'P', b'A', b'R', b'1'];

/// The number of bytes read at the end of the parquet file on first read
const DEFAULT_FOOTER_READ_SIZE: usize = 64 * 1024;
Loading