Skip to content
Closed
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions rust/parquet/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ lz4 = "1.23"
zstd = "0.5"
arrow = { path = "../arrow", version = "3.0.0-SNAPSHOT" }
serde_json = { version = "1.0", features = ["preserve_order"] }
lazy_static = "1.4.0"

[features]
default = ["arrow", "snap", "brotli", "flate2", "lz4", "zstd", "base64"]
196 changes: 196 additions & 0 deletions rust/parquet/benches/codec.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,196 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#![feature(test)]
extern crate parquet;
#[macro_use]
extern crate lazy_static;
extern crate test;
use test::Bencher;

use std::{env, fs::File};

use parquet::{basic::Compression, compression::*, file::reader::*};

// 10k rows written in page v2 with type:
//
// message test {
// required binary binary_field,
// required int32 int32_field,
// required int64 int64_field,
// required boolean boolean_field,
// required float float_field,
// required double double_field,
// required fixed_len_byte_array(1024) flba_field,
// required int96 int96_field
// }
//
// filled with random values.
const TEST_FILE: &str = "10k-v2.parquet";
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we normally put test data in https://github.com/apache/parquet-testing so perhaps we should add this one there as well (or if there any existing file there that we can use instead)?

Copy link
Copy Markdown
Contributor Author

@alamb alamb Nov 18, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I created apache/parquet-testing#15 -- if/when that gets merged in, I'll update this PR to pick up a later version of parquet-testing and remove the binary from this PR as well.


fn get_f_reader() -> SerializedFileReader<File> {
let mut path_buf = env::current_dir().unwrap();
path_buf.push("data");
path_buf.push(TEST_FILE);
let file = File::open(path_buf.as_path()).unwrap();
SerializedFileReader::new(file).unwrap()
}

fn get_pages_bytes(col_idx: usize) -> Vec<u8> {
let mut data: Vec<u8> = Vec::new();
let f_reader = get_f_reader();
let rg_reader = f_reader.get_row_group(0).unwrap();
let mut pg_reader = rg_reader.get_column_page_reader(col_idx).unwrap();
while let Some(p) = pg_reader.get_next_page().unwrap() {
data.extend_from_slice(p.buffer().data());
}
data
}

macro_rules! compress {
($fname:ident, $codec:expr, $col_idx:expr) => {
#[bench]
fn $fname(bench: &mut Bencher) {
lazy_static! {
static ref DATA: Vec<u8> = { get_pages_bytes($col_idx) };
}

let mut codec = create_codec($codec).unwrap().unwrap();
let mut v = vec![];
bench.bytes = DATA.len() as u64;
bench.iter(|| {
codec.compress(&DATA[..], &mut v).unwrap();
})
}
};
}

macro_rules! decompress {
($fname:ident, $codec:expr, $col_idx:expr) => {
#[bench]
fn $fname(bench: &mut Bencher) {
lazy_static! {
static ref COMPRESSED_PAGES: Vec<u8> = {
let mut codec = create_codec($codec).unwrap().unwrap();
let raw_data = get_pages_bytes($col_idx);
let mut v = vec![];
codec.compress(&raw_data[..], &mut v).unwrap();
v
};
}

let mut codec = create_codec($codec).unwrap().unwrap();
let f_reader = get_f_reader();
let rg_reader = f_reader.get_row_group(0).unwrap();
bench.bytes = rg_reader.metadata().total_byte_size() as u64;
bench.iter(|| {
let mut v = Vec::new();
let _ = codec.decompress(&COMPRESSED_PAGES[..], &mut v).unwrap();
})
}
};
}

compress!(compress_brotli_binary, Compression::BROTLI, 0);
compress!(compress_brotli_int32, Compression::BROTLI, 1);
compress!(compress_brotli_int64, Compression::BROTLI, 2);
compress!(compress_brotli_boolean, Compression::BROTLI, 3);
compress!(compress_brotli_float, Compression::BROTLI, 4);
compress!(compress_brotli_double, Compression::BROTLI, 5);
compress!(compress_brotli_fixed, Compression::BROTLI, 6);
compress!(compress_brotli_int96, Compression::BROTLI, 7);

compress!(compress_gzip_binary, Compression::GZIP, 0);
compress!(compress_gzip_int32, Compression::GZIP, 1);
compress!(compress_gzip_int64, Compression::GZIP, 2);
compress!(compress_gzip_boolean, Compression::GZIP, 3);
compress!(compress_gzip_float, Compression::GZIP, 4);
compress!(compress_gzip_double, Compression::GZIP, 5);
compress!(compress_gzip_fixed, Compression::GZIP, 6);
compress!(compress_gzip_int96, Compression::GZIP, 7);

compress!(compress_snappy_binary, Compression::SNAPPY, 0);
compress!(compress_snappy_int32, Compression::SNAPPY, 1);
compress!(compress_snappy_int64, Compression::SNAPPY, 2);
compress!(compress_snappy_boolean, Compression::SNAPPY, 3);
compress!(compress_snappy_float, Compression::SNAPPY, 4);
compress!(compress_snappy_double, Compression::SNAPPY, 5);
compress!(compress_snappy_fixed, Compression::SNAPPY, 6);
compress!(compress_snappy_int96, Compression::SNAPPY, 7);

compress!(compress_lz4_binary, Compression::LZ4, 0);
compress!(compress_lz4_int32, Compression::LZ4, 1);
compress!(compress_lz4_int64, Compression::LZ4, 2);
compress!(compress_lz4_boolean, Compression::LZ4, 3);
compress!(compress_lz4_float, Compression::LZ4, 4);
compress!(compress_lz4_double, Compression::LZ4, 5);
compress!(compress_lz4_fixed, Compression::LZ4, 6);
compress!(compress_lz4_int96, Compression::LZ4, 7);

compress!(compress_zstd_binary, Compression::ZSTD, 0);
compress!(compress_zstd_int32, Compression::ZSTD, 1);
compress!(compress_zstd_int64, Compression::ZSTD, 2);
compress!(compress_zstd_boolean, Compression::ZSTD, 3);
compress!(compress_zstd_float, Compression::ZSTD, 4);
compress!(compress_zstd_double, Compression::ZSTD, 5);
compress!(compress_zstd_fixed, Compression::ZSTD, 6);
compress!(compress_zstd_int96, Compression::ZSTD, 7);

decompress!(decompress_brotli_binary, Compression::BROTLI, 0);
decompress!(decompress_brotli_int32, Compression::BROTLI, 1);
decompress!(decompress_brotli_int64, Compression::BROTLI, 2);
decompress!(decompress_brotli_boolean, Compression::BROTLI, 3);
decompress!(decompress_brotli_float, Compression::BROTLI, 4);
decompress!(decompress_brotli_double, Compression::BROTLI, 5);
decompress!(decompress_brotli_fixed, Compression::BROTLI, 6);
decompress!(decompress_brotli_int96, Compression::BROTLI, 7);

decompress!(decompress_gzip_binary, Compression::GZIP, 0);
decompress!(decompress_gzip_int32, Compression::GZIP, 1);
decompress!(decompress_gzip_int64, Compression::GZIP, 2);
decompress!(decompress_gzip_boolean, Compression::GZIP, 3);
decompress!(decompress_gzip_float, Compression::GZIP, 4);
decompress!(decompress_gzip_double, Compression::GZIP, 5);
decompress!(decompress_gzip_fixed, Compression::GZIP, 6);
decompress!(decompress_gzip_int96, Compression::GZIP, 7);

decompress!(decompress_snappy_binary, Compression::SNAPPY, 0);
decompress!(decompress_snappy_int32, Compression::SNAPPY, 1);
decompress!(decompress_snappy_int64, Compression::SNAPPY, 2);
decompress!(decompress_snappy_boolean, Compression::SNAPPY, 3);
decompress!(decompress_snappy_float, Compression::SNAPPY, 4);
decompress!(decompress_snappy_double, Compression::SNAPPY, 5);
decompress!(decompress_snappy_fixed, Compression::SNAPPY, 6);
decompress!(decompress_snappy_int96, Compression::SNAPPY, 7);

decompress!(decompress_lz4_binary, Compression::LZ4, 0);
decompress!(decompress_lz4_int32, Compression::LZ4, 1);
decompress!(decompress_lz4_int64, Compression::LZ4, 2);
decompress!(decompress_lz4_boolean, Compression::LZ4, 3);
decompress!(decompress_lz4_float, Compression::LZ4, 4);
decompress!(decompress_lz4_double, Compression::LZ4, 5);
decompress!(decompress_lz4_fixed, Compression::LZ4, 6);
decompress!(decompress_lz4_int96, Compression::LZ4, 7);

decompress!(decompress_zstd_binary, Compression::ZSTD, 0);
decompress!(decompress_zstd_int32, Compression::ZSTD, 1);
decompress!(decompress_zstd_int64, Compression::ZSTD, 2);
decompress!(decompress_zstd_boolean, Compression::ZSTD, 3);
decompress!(decompress_zstd_float, Compression::ZSTD, 4);
decompress!(decompress_zstd_double, Compression::ZSTD, 5);
decompress!(decompress_zstd_fixed, Compression::ZSTD, 6);
decompress!(decompress_zstd_int96, Compression::ZSTD, 7);
77 changes: 77 additions & 0 deletions rust/parquet/benches/common.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

extern crate parquet;
extern crate rand;

use rand::{thread_rng, Rng};
use std::rc::Rc;

use parquet::{
basic::*,
data_type::*,
schema::types::{ColumnDescriptor, ColumnPath, Type as SchemaType},
};

macro_rules! gen_random_ints {
($fname:ident, $limit:expr) => {
pub fn $fname(total: usize) -> (usize, Vec<i32>) {
let mut values = Vec::with_capacity(total);
let mut rng = thread_rng();
for _ in 0..total {
values.push(rng.gen_range(0, $limit));
}
let bytes = values.len() * ::std::mem::size_of::<i32>();
(bytes, values)
}
};
}

gen_random_ints!(gen_10, 10);
gen_random_ints!(gen_100, 100);
gen_random_ints!(gen_1000, 1000);

pub fn gen_test_strs(total: usize) -> (usize, Vec<ByteArray>) {
let mut words = Vec::new();
words.push("aaaaaaaaaa");
words.push("bbbbbbbbbb");
words.push("cccccccccc");
words.push("dddddddddd");
words.push("eeeeeeeeee");
words.push("ffffffffff");
words.push("gggggggggg");
words.push("hhhhhhhhhh");
words.push("iiiiiiiiii");
words.push("jjjjjjjjjj");

let mut rnd = rand::thread_rng();
let mut values = Vec::new();
for _ in 0..total {
let idx = rnd.gen_range(0, 10);
values.push(ByteArray::from(words[idx]));
}
let bytes = values.iter().fold(0, |acc, w| acc + w.len());
(bytes, values)
}

pub fn col_desc(type_length: i32, primitive_ty: Type) -> ColumnDescriptor {
let ty = SchemaType::primitive_type_builder("col", primitive_ty)
.with_length(type_length)
.build()
.unwrap();
ColumnDescriptor::new(Rc::new(ty), 0, 0, ColumnPath::new(vec![]))
}
Loading