diff --git a/rust/parquet/Cargo.toml b/rust/parquet/Cargo.toml index 122c5b6356d..9e45af2bcab 100644 --- a/rust/parquet/Cargo.toml +++ b/rust/parquet/Cargo.toml @@ -51,6 +51,7 @@ lz4 = "1.23" zstd = "0.5" arrow = { path = "../arrow", version = "3.0.0-SNAPSHOT" } serde_json = { version = "1.0", features = ["preserve_order"] } +lazy_static = "1.4.0" [features] default = ["arrow", "snap", "brotli", "flate2", "lz4", "zstd", "base64"] diff --git a/rust/parquet/benches/codec.rs b/rust/parquet/benches/codec.rs new file mode 100644 index 00000000000..33d31b8499c --- /dev/null +++ b/rust/parquet/benches/codec.rs @@ -0,0 +1,196 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#![feature(test)] +extern crate parquet; +#[macro_use] +extern crate lazy_static; +extern crate test; +use test::Bencher; + +use std::{env, fs::File}; + +use parquet::{basic::Compression, compression::*, file::reader::*}; + +// 10k rows written in page v2 with type: +// +// message test { +// required binary binary_field, +// required int32 int32_field, +// required int64 int64_field, +// required boolean boolean_field, +// required float float_field, +// required double double_field, +// required fixed_len_byte_array(1024) flba_field, +// required int96 int96_field +// } +// +// filled with random values. +const TEST_FILE: &str = "10k-v2.parquet"; + +fn get_f_reader() -> SerializedFileReader { + let mut path_buf = env::current_dir().unwrap(); + path_buf.push("data"); + path_buf.push(TEST_FILE); + let file = File::open(path_buf.as_path()).unwrap(); + SerializedFileReader::new(file).unwrap() +} + +fn get_pages_bytes(col_idx: usize) -> Vec { + let mut data: Vec = Vec::new(); + let f_reader = get_f_reader(); + let rg_reader = f_reader.get_row_group(0).unwrap(); + let mut pg_reader = rg_reader.get_column_page_reader(col_idx).unwrap(); + while let Some(p) = pg_reader.get_next_page().unwrap() { + data.extend_from_slice(p.buffer().data()); + } + data +} + +macro_rules! compress { + ($fname:ident, $codec:expr, $col_idx:expr) => { + #[bench] + fn $fname(bench: &mut Bencher) { + lazy_static! { + static ref DATA: Vec = { get_pages_bytes($col_idx) }; + } + + let mut codec = create_codec($codec).unwrap().unwrap(); + let mut v = vec![]; + bench.bytes = DATA.len() as u64; + bench.iter(|| { + codec.compress(&DATA[..], &mut v).unwrap(); + }) + } + }; +} + +macro_rules! decompress { + ($fname:ident, $codec:expr, $col_idx:expr) => { + #[bench] + fn $fname(bench: &mut Bencher) { + lazy_static! { + static ref COMPRESSED_PAGES: Vec = { + let mut codec = create_codec($codec).unwrap().unwrap(); + let raw_data = get_pages_bytes($col_idx); + let mut v = vec![]; + codec.compress(&raw_data[..], &mut v).unwrap(); + v + }; + } + + let mut codec = create_codec($codec).unwrap().unwrap(); + let f_reader = get_f_reader(); + let rg_reader = f_reader.get_row_group(0).unwrap(); + bench.bytes = rg_reader.metadata().total_byte_size() as u64; + bench.iter(|| { + let mut v = Vec::new(); + let _ = codec.decompress(&COMPRESSED_PAGES[..], &mut v).unwrap(); + }) + } + }; +} + +compress!(compress_brotli_binary, Compression::BROTLI, 0); +compress!(compress_brotli_int32, Compression::BROTLI, 1); +compress!(compress_brotli_int64, Compression::BROTLI, 2); +compress!(compress_brotli_boolean, Compression::BROTLI, 3); +compress!(compress_brotli_float, Compression::BROTLI, 4); +compress!(compress_brotli_double, Compression::BROTLI, 5); +compress!(compress_brotli_fixed, Compression::BROTLI, 6); +compress!(compress_brotli_int96, Compression::BROTLI, 7); + +compress!(compress_gzip_binary, Compression::GZIP, 0); +compress!(compress_gzip_int32, Compression::GZIP, 1); +compress!(compress_gzip_int64, Compression::GZIP, 2); +compress!(compress_gzip_boolean, Compression::GZIP, 3); +compress!(compress_gzip_float, Compression::GZIP, 4); +compress!(compress_gzip_double, Compression::GZIP, 5); +compress!(compress_gzip_fixed, Compression::GZIP, 6); +compress!(compress_gzip_int96, Compression::GZIP, 7); + +compress!(compress_snappy_binary, Compression::SNAPPY, 0); +compress!(compress_snappy_int32, Compression::SNAPPY, 1); +compress!(compress_snappy_int64, Compression::SNAPPY, 2); +compress!(compress_snappy_boolean, Compression::SNAPPY, 3); +compress!(compress_snappy_float, Compression::SNAPPY, 4); +compress!(compress_snappy_double, Compression::SNAPPY, 5); +compress!(compress_snappy_fixed, Compression::SNAPPY, 6); +compress!(compress_snappy_int96, Compression::SNAPPY, 7); + +compress!(compress_lz4_binary, Compression::LZ4, 0); +compress!(compress_lz4_int32, Compression::LZ4, 1); +compress!(compress_lz4_int64, Compression::LZ4, 2); +compress!(compress_lz4_boolean, Compression::LZ4, 3); +compress!(compress_lz4_float, Compression::LZ4, 4); +compress!(compress_lz4_double, Compression::LZ4, 5); +compress!(compress_lz4_fixed, Compression::LZ4, 6); +compress!(compress_lz4_int96, Compression::LZ4, 7); + +compress!(compress_zstd_binary, Compression::ZSTD, 0); +compress!(compress_zstd_int32, Compression::ZSTD, 1); +compress!(compress_zstd_int64, Compression::ZSTD, 2); +compress!(compress_zstd_boolean, Compression::ZSTD, 3); +compress!(compress_zstd_float, Compression::ZSTD, 4); +compress!(compress_zstd_double, Compression::ZSTD, 5); +compress!(compress_zstd_fixed, Compression::ZSTD, 6); +compress!(compress_zstd_int96, Compression::ZSTD, 7); + +decompress!(decompress_brotli_binary, Compression::BROTLI, 0); +decompress!(decompress_brotli_int32, Compression::BROTLI, 1); +decompress!(decompress_brotli_int64, Compression::BROTLI, 2); +decompress!(decompress_brotli_boolean, Compression::BROTLI, 3); +decompress!(decompress_brotli_float, Compression::BROTLI, 4); +decompress!(decompress_brotli_double, Compression::BROTLI, 5); +decompress!(decompress_brotli_fixed, Compression::BROTLI, 6); +decompress!(decompress_brotli_int96, Compression::BROTLI, 7); + +decompress!(decompress_gzip_binary, Compression::GZIP, 0); +decompress!(decompress_gzip_int32, Compression::GZIP, 1); +decompress!(decompress_gzip_int64, Compression::GZIP, 2); +decompress!(decompress_gzip_boolean, Compression::GZIP, 3); +decompress!(decompress_gzip_float, Compression::GZIP, 4); +decompress!(decompress_gzip_double, Compression::GZIP, 5); +decompress!(decompress_gzip_fixed, Compression::GZIP, 6); +decompress!(decompress_gzip_int96, Compression::GZIP, 7); + +decompress!(decompress_snappy_binary, Compression::SNAPPY, 0); +decompress!(decompress_snappy_int32, Compression::SNAPPY, 1); +decompress!(decompress_snappy_int64, Compression::SNAPPY, 2); +decompress!(decompress_snappy_boolean, Compression::SNAPPY, 3); +decompress!(decompress_snappy_float, Compression::SNAPPY, 4); +decompress!(decompress_snappy_double, Compression::SNAPPY, 5); +decompress!(decompress_snappy_fixed, Compression::SNAPPY, 6); +decompress!(decompress_snappy_int96, Compression::SNAPPY, 7); + +decompress!(decompress_lz4_binary, Compression::LZ4, 0); +decompress!(decompress_lz4_int32, Compression::LZ4, 1); +decompress!(decompress_lz4_int64, Compression::LZ4, 2); +decompress!(decompress_lz4_boolean, Compression::LZ4, 3); +decompress!(decompress_lz4_float, Compression::LZ4, 4); +decompress!(decompress_lz4_double, Compression::LZ4, 5); +decompress!(decompress_lz4_fixed, Compression::LZ4, 6); +decompress!(decompress_lz4_int96, Compression::LZ4, 7); + +decompress!(decompress_zstd_binary, Compression::ZSTD, 0); +decompress!(decompress_zstd_int32, Compression::ZSTD, 1); +decompress!(decompress_zstd_int64, Compression::ZSTD, 2); +decompress!(decompress_zstd_boolean, Compression::ZSTD, 3); +decompress!(decompress_zstd_float, Compression::ZSTD, 4); +decompress!(decompress_zstd_double, Compression::ZSTD, 5); +decompress!(decompress_zstd_fixed, Compression::ZSTD, 6); +decompress!(decompress_zstd_int96, Compression::ZSTD, 7); diff --git a/rust/parquet/benches/common.rs b/rust/parquet/benches/common.rs new file mode 100644 index 00000000000..5b6b942578a --- /dev/null +++ b/rust/parquet/benches/common.rs @@ -0,0 +1,77 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +extern crate parquet; +extern crate rand; + +use rand::{thread_rng, Rng}; +use std::rc::Rc; + +use parquet::{ + basic::*, + data_type::*, + schema::types::{ColumnDescriptor, ColumnPath, Type as SchemaType}, +}; + +macro_rules! gen_random_ints { + ($fname:ident, $limit:expr) => { + pub fn $fname(total: usize) -> (usize, Vec) { + let mut values = Vec::with_capacity(total); + let mut rng = thread_rng(); + for _ in 0..total { + values.push(rng.gen_range(0, $limit)); + } + let bytes = values.len() * ::std::mem::size_of::(); + (bytes, values) + } + }; +} + +gen_random_ints!(gen_10, 10); +gen_random_ints!(gen_100, 100); +gen_random_ints!(gen_1000, 1000); + +pub fn gen_test_strs(total: usize) -> (usize, Vec) { + let mut words = Vec::new(); + words.push("aaaaaaaaaa"); + words.push("bbbbbbbbbb"); + words.push("cccccccccc"); + words.push("dddddddddd"); + words.push("eeeeeeeeee"); + words.push("ffffffffff"); + words.push("gggggggggg"); + words.push("hhhhhhhhhh"); + words.push("iiiiiiiiii"); + words.push("jjjjjjjjjj"); + + let mut rnd = rand::thread_rng(); + let mut values = Vec::new(); + for _ in 0..total { + let idx = rnd.gen_range(0, 10); + values.push(ByteArray::from(words[idx])); + } + let bytes = values.iter().fold(0, |acc, w| acc + w.len()); + (bytes, values) +} + +pub fn col_desc(type_length: i32, primitive_ty: Type) -> ColumnDescriptor { + let ty = SchemaType::primitive_type_builder("col", primitive_ty) + .with_length(type_length) + .build() + .unwrap(); + ColumnDescriptor::new(Rc::new(ty), 0, 0, ColumnPath::new(vec![])) +} diff --git a/rust/parquet/benches/decoding.rs b/rust/parquet/benches/decoding.rs new file mode 100644 index 00000000000..33ead19dd0f --- /dev/null +++ b/rust/parquet/benches/decoding.rs @@ -0,0 +1,213 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#![feature(test)] +extern crate parquet; +extern crate rand; +extern crate test; +use test::Bencher; + +#[allow(dead_code)] +#[path = "common.rs"] +mod common; +use common::*; + +use std::rc::Rc; + +use parquet::{ + basic::*, + data_type::*, + decoding::*, + encoding::*, + memory::{ByteBufferPtr, MemTracker}, +}; + +macro_rules! plain { + ($fname:ident, $num_values:expr, $batch_size:expr, $ty:ident, $pty:expr, + $gen_data_fn:expr) => { + #[bench] + fn $fname(bench: &mut Bencher) { + let mem_tracker = Rc::new(MemTracker::new()); + let mut encoder = + PlainEncoder::<$ty>::new(Rc::new(col_desc(0, $pty)), mem_tracker, vec![]); + + let (_, values) = $gen_data_fn($num_values); + encoder.put(&values[..]).expect("put() should be OK"); + let buffer = encoder.flush_buffer().expect("flush_buffer() should be OK"); + + let decoder = PlainDecoder::<$ty>::new(0); + bench_decoding(bench, $num_values, $batch_size, buffer, Box::new(decoder)); + } + }; +} + +macro_rules! dict { + ($fname:ident, $num_values:expr, $batch_size:expr, $ty:ident, $pty:expr, + $gen_data_fn:expr) => { + #[bench] + fn $fname(bench: &mut Bencher) { + let mem_tracker = Rc::new(MemTracker::new()); + let mut encoder = + DictEncoder::<$ty>::new(Rc::new(col_desc(0, $pty)), mem_tracker); + + let (_, values) = $gen_data_fn($num_values); + encoder.put(&values[..]).expect("put() should be OK"); + let mut dict_decoder = PlainDecoder::<$ty>::new(0); + dict_decoder + .set_data( + encoder.write_dict().expect("write_dict() should be OK"), + encoder.num_entries(), + ) + .expect("set_data() should be OK"); + + let buffer = encoder.flush_buffer().expect("flush_buffer() should be OK"); + let mut decoder = DictDecoder::<$ty>::new(); + decoder + .set_dict(Box::new(dict_decoder)) + .expect("set_dict() should be OK"); + + bench_decoding(bench, $num_values, $batch_size, buffer, Box::new(decoder)); + } + }; +} + +macro_rules! delta_bit_pack { + ($fname:ident, $num_values:expr, $batch_size:expr, $ty:ident, $gen_data_fn:expr) => { + #[bench] + fn $fname(bench: &mut Bencher) { + let mut encoder = DeltaBitPackEncoder::<$ty>::new(); + + let (_, values) = $gen_data_fn($num_values); + encoder.put(&values[..]).expect("put() should be OK"); + let buffer = encoder.flush_buffer().expect("flush_buffer() should be OK"); + + let decoder = DeltaBitPackDecoder::<$ty>::new(); + bench_decoding(bench, $num_values, $batch_size, buffer, Box::new(decoder)); + } + }; +} + +fn bench_decoding( + bench: &mut Bencher, + num_values: usize, + batch_size: usize, + buffer: ByteBufferPtr, + mut decoder: Box>, +) { + bench.bytes = buffer.len() as u64; + bench.iter(|| { + decoder + .set_data(buffer.clone(), num_values) + .expect("set_data() should be OK"); + let mut values = vec![T::T::default(); batch_size]; + loop { + if decoder.get(&mut values[..]).expect("get() should be OK") < batch_size { + break; + } + } + }) +} + +plain!(plain_i32_1k_32, 1024, 32, Int32Type, Type::INT32, gen_1000); +plain!(plain_i32_1k_64, 1024, 64, Int32Type, Type::INT32, gen_1000); +plain!( + plain_i32_1k_128, + 1024, + 128, + Int32Type, + Type::INT32, + gen_1000 +); +plain!(plain_i32_1m_32, 1024, 32, Int32Type, Type::INT32, gen_1000); +plain!(plain_i32_1m_64, 1024, 64, Int32Type, Type::INT32, gen_1000); +plain!( + plain_i32_1m_128, + 1024, + 128, + Int32Type, + Type::INT32, + gen_1000 +); +plain!( + plain_str_1m_128, + 1024, + 128, + ByteArrayType, + Type::BYTE_ARRAY, + gen_test_strs +); + +dict!(dict_i32_1k_32, 1024, 32, Int32Type, Type::INT32, gen_1000); +dict!(dict_i32_1k_64, 1024, 64, Int32Type, Type::INT32, gen_1000); +dict!(dict_i32_1k_128, 1024, 128, Int32Type, Type::INT32, gen_1000); +dict!( + dict_i32_1m_32, + 1024 * 1024, + 32, + Int32Type, + Type::INT32, + gen_1000 +); +dict!( + dict_i32_1m_64, + 1024 * 1024, + 64, + Int32Type, + Type::INT32, + gen_1000 +); +dict!( + dict_i32_1m_128, + 1024 * 1024, + 128, + Int32Type, + Type::INT32, + gen_1000 +); +dict!( + dict_str_1m_128, + 1024 * 1024, + 128, + ByteArrayType, + Type::BYTE_ARRAY, + gen_test_strs +); + +delta_bit_pack!(delta_bit_pack_i32_1k_32, 1024, 32, Int32Type, gen_1000); +delta_bit_pack!(delta_bit_pack_i32_1k_64, 1024, 64, Int32Type, gen_1000); +delta_bit_pack!(delta_bit_pack_i32_1k_128, 1024, 128, Int32Type, gen_1000); +delta_bit_pack!( + delta_bit_pack_i32_1m_32, + 1024 * 1024, + 32, + Int32Type, + gen_1000 +); +delta_bit_pack!( + delta_bit_pack_i32_1m_64, + 1024 * 1024, + 64, + Int32Type, + gen_1000 +); +delta_bit_pack!( + delta_bit_pack_i32_1m_128, + 1024 * 1024, + 128, + Int32Type, + gen_1000 +); diff --git a/rust/parquet/benches/encoding.rs b/rust/parquet/benches/encoding.rs new file mode 100644 index 00000000000..1e15ce30d00 --- /dev/null +++ b/rust/parquet/benches/encoding.rs @@ -0,0 +1,140 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#![feature(test)] +extern crate parquet; +extern crate rand; +extern crate test; +use test::Bencher; + +#[allow(dead_code)] +#[path = "common.rs"] +mod common; +use common::*; + +use std::rc::Rc; + +use parquet::{basic::*, data_type::*, encoding::*, memory::MemTracker}; + +macro_rules! plain { + ($fname:ident, $batch_size:expr, $ty:ident, $pty:expr, $gen_data_fn:expr) => { + #[bench] + fn $fname(bench: &mut Bencher) { + let mem_tracker = Rc::new(MemTracker::new()); + let encoder = + PlainEncoder::<$ty>::new(Rc::new(col_desc(0, $pty)), mem_tracker, vec![]); + let (bytes, values) = $gen_data_fn($batch_size); + bench_encoding(bench, bytes, values, Box::new(encoder)); + } + }; +} + +macro_rules! dict { + ($fname:ident, $batch_size:expr, $ty:ident, $pty:expr, $gen_data_fn:expr) => { + #[bench] + fn $fname(bench: &mut Bencher) { + let mem_tracker = Rc::new(MemTracker::new()); + let encoder = + DictEncoder::<$ty>::new(Rc::new(col_desc(0, $pty)), mem_tracker); + let (bytes, values) = $gen_data_fn($batch_size); + bench_encoding(bench, bytes, values, Box::new(encoder)); + } + }; +} + +macro_rules! delta_bit_pack { + ($fname:ident, $batch_size:expr, $ty:ident, $gen_data_fn:expr) => { + #[bench] + fn $fname(bench: &mut Bencher) { + let encoder = DeltaBitPackEncoder::<$ty>::new(); + let (bytes, values) = $gen_data_fn($batch_size); + bench_encoding(bench, bytes, values, Box::new(encoder)); + } + }; +} + +fn bench_encoding( + bench: &mut Bencher, + bytes: usize, + values: Vec, + mut encoder: Box>, +) { + bench.bytes = bytes as u64; + bench.iter(|| { + encoder.put(&values[..]).expect("put() should be OK"); + encoder.flush_buffer().expect("flush_buffer() should be OK"); + }) +} + +plain!(plain_i32_1k_10, 1024, Int32Type, Type::INT32, gen_10); +plain!(plain_i32_1k_100, 1024, Int32Type, Type::INT32, gen_100); +plain!(plain_i32_1k_1000, 1024, Int32Type, Type::INT32, gen_1000); +plain!(plain_i32_1m_10, 1024 * 1024, Int32Type, Type::INT32, gen_10); +plain!( + plain_i32_1m_100, + 1024 * 1024, + Int32Type, + Type::INT32, + gen_100 +); +plain!( + plain_i32_1m_1000, + 1024 * 1024, + Int32Type, + Type::INT32, + gen_1000 +); +plain!( + plain_str_1m, + 1024 * 1024, + ByteArrayType, + Type::BYTE_ARRAY, + gen_test_strs +); + +dict!(dict_i32_1k_10, 1024, Int32Type, Type::INT32, gen_10); +dict!(dict_i32_1k_100, 1024, Int32Type, Type::INT32, gen_100); +dict!(dict_i32_1k_1000, 1024, Int32Type, Type::INT32, gen_1000); +dict!(dict_i32_1m_10, 1024 * 1024, Int32Type, Type::INT32, gen_10); +dict!( + dict_i32_1m_100, + 1024 * 1024, + Int32Type, + Type::INT32, + gen_100 +); +dict!( + dict_i32_1m_1000, + 1024 * 1024, + Int32Type, + Type::INT32, + gen_1000 +); +plain!( + dict_str_1m, + 1024 * 1024, + ByteArrayType, + Type::BYTE_ARRAY, + gen_test_strs +); + +delta_bit_pack!(delta_bit_pack_i32_1k_10, 1024, Int32Type, gen_10); +delta_bit_pack!(delta_bit_pack_i32_1k_100, 1024, Int32Type, gen_100); +delta_bit_pack!(delta_bit_pack_i32_1k_1000, 1024, Int32Type, gen_1000); +delta_bit_pack!(delta_bit_pack_i32_1m_10, 1024 * 1024, Int32Type, gen_10); +delta_bit_pack!(delta_bit_pack_i32_1m_100, 1024 * 1024, Int32Type, gen_100); +delta_bit_pack!(delta_bit_pack_i32_1m_1000, 1024 * 1024, Int32Type, gen_1000); diff --git a/rust/parquet/benches/reader.rs b/rust/parquet/benches/reader.rs new file mode 100644 index 00000000000..289d69265ff --- /dev/null +++ b/rust/parquet/benches/reader.rs @@ -0,0 +1,167 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#![feature(test)] +extern crate parquet; +extern crate test; + +use std::{collections::HashMap, fs::File, path::Path}; + +use parquet::{ + column::reader::{get_typed_column_reader, ColumnReader}, + data_type::*, + file::reader::{FileReader, SerializedFileReader}, + schema::{parser::parse_message_type, types::ColumnPath}, +}; + +use test::Bencher; + +#[bench] +fn record_reader_10k_collect(bench: &mut Bencher) { + let path = Path::new("data/10k-v2.parquet"); + let file = File::open(&path).unwrap(); + let len = file.metadata().unwrap().len(); + let parquet_reader = SerializedFileReader::new(file).unwrap(); + + bench.bytes = len; + bench.iter(|| { + // TODO: + // We pass projection that excludes Int96 field, otherwise we get the following error: + // Expected non-negative milliseconds when converting Int96, found -210866803200000 + // Field I removed: + // REQUIRED INT96 int96_field; + // See issue #201 for more information and follow-up. + let projection = parse_message_type( + " + message test { + REQUIRED BYTE_ARRAY binary_field; + REQUIRED INT32 int32_field; + REQUIRED INT64 int64_field; + REQUIRED BOOLEAN boolean_field; + REQUIRED FLOAT float_field; + REQUIRED DOUBLE double_field; + REQUIRED FIXED_LEN_BYTE_ARRAY (1024) flba_field; + } + ", + ) + .unwrap(); + let iter = parquet_reader.get_row_iter(Some(projection)).unwrap(); + let _ = iter.collect::>(); + }) +} + +#[bench] +fn record_reader_stock_simulated_collect(bench: &mut Bencher) { + let path = Path::new("data/stock_simulated.parquet"); + let file = File::open(&path).unwrap(); + let len = file.metadata().unwrap().len(); + let parquet_reader = SerializedFileReader::new(file).unwrap(); + + bench.bytes = len; + bench.iter(|| { + let iter = parquet_reader.get_row_iter(None).unwrap(); + let _ = iter.collect::>(); + }) +} + +#[bench] +fn record_reader_stock_simulated_column(bench: &mut Bencher) { + // WARNING THIS BENCH IS INTENDED FOR THIS DATA FILE ONLY + // COPY OR CHANGE THE DATA FILE MAY NOT WORK AS YOU WISH + let path = Path::new("data/stock_simulated.parquet"); + let file = File::open(&path).unwrap(); + let len = file.metadata().unwrap().len(); + let parquet_reader = SerializedFileReader::new(file).unwrap(); + + let descr = parquet_reader.metadata().file_metadata().schema_descr_ptr(); + let num_row_groups = parquet_reader.num_row_groups(); + let batch_size = 256; + + bench.bytes = len; + bench.iter(|| { + let mut current_row_group = 0; + + while current_row_group < num_row_groups { + let row_group_reader = + parquet_reader.get_row_group(current_row_group).unwrap(); + let num_rows = row_group_reader.metadata().num_rows() as usize; + + let mut paths = HashMap::new(); + let row_group_metadata = row_group_reader.metadata(); + + for col_index in 0..row_group_reader.num_columns() { + let col_meta = row_group_metadata.column(col_index); + let col_path = col_meta.column_path().clone(); + paths.insert(col_path, col_index); + } + + let mut readers = Vec::new(); + for field in descr.root_schema().get_fields() { + let col_path = ColumnPath::new(vec![field.name().to_owned()]); + let orig_index = *paths.get(&col_path).unwrap(); + let col_reader = row_group_reader.get_column_reader(orig_index).unwrap(); + readers.push(col_reader); + } + + let mut def_levels = Some(vec![0; batch_size]); + let mut rep_levels = None::>; + + for col_reader in readers.into_iter() { + match col_reader { + r @ ColumnReader::Int64ColumnReader(..) => { + let mut data_collected = Vec::with_capacity(num_rows); + let mut val = vec![0; batch_size]; + let mut typed_reader = get_typed_column_reader::(r); + while let Ok((values_read, _levels_read)) = typed_reader + .read_batch( + batch_size, + def_levels.as_mut().map(|x| &mut x[..]), + rep_levels.as_mut().map(|x| &mut x[..]), + &mut val, + ) + { + data_collected.extend_from_slice(&val); + if values_read < batch_size { + break; + } + } + } + r @ ColumnReader::DoubleColumnReader(..) => { + let mut data_collected = Vec::with_capacity(num_rows); + let mut val = vec![0.0; batch_size]; + let mut typed_reader = get_typed_column_reader::(r); + while let Ok((values_read, _levels_read)) = typed_reader + .read_batch( + batch_size, + def_levels.as_mut().map(|x| &mut x[..]), + rep_levels.as_mut().map(|x| &mut x[..]), + &mut val, + ) + { + data_collected.extend_from_slice(&val); + if values_read < batch_size { + break; + } + } + } + _ => unimplemented!(), + } + } + current_row_group += 1; + } + }) +} diff --git a/rust/parquet/data/10k-v2.parquet b/rust/parquet/data/10k-v2.parquet new file mode 100644 index 00000000000..231686a3dc7 Binary files /dev/null and b/rust/parquet/data/10k-v2.parquet differ diff --git a/rust/parquet/data/stock_simulated.parquet b/rust/parquet/data/stock_simulated.parquet new file mode 100644 index 00000000000..9d2c5fb2f04 Binary files /dev/null and b/rust/parquet/data/stock_simulated.parquet differ