From 23742c0169a348e9c27d67289d0788533f7603c6 Mon Sep 17 00:00:00 2001 From: Gidon Gershinsky Date: Thu, 21 Mar 2024 15:13:27 +0200 Subject: [PATCH 1/6] first commit --- parquet/Cargo.toml | 4 ++ parquet/src/encryption/ciphers.rs | 105 ++++++++++++++++++++++++++++++ parquet/src/encryption/mod.rs | 23 +++++++ parquet/src/file/footer.rs | 13 +++- parquet/src/lib.rs | 5 ++ tmp-bench1/Cargo.toml | 8 +++ tmp-bench1/src/main.rs | 75 +++++++++++++++++++++ 7 files changed, 232 insertions(+), 1 deletion(-) create mode 100644 parquet/src/encryption/ciphers.rs create mode 100644 parquet/src/encryption/mod.rs create mode 100644 tmp-bench1/Cargo.toml create mode 100644 tmp-bench1/src/main.rs diff --git a/parquet/Cargo.toml b/parquet/Cargo.toml index e6d612e0cc62..eb7ed869409e 100644 --- a/parquet/Cargo.toml +++ b/parquet/Cargo.toml @@ -67,6 +67,8 @@ hashbrown = { version = "0.14", default-features = false } twox-hash = { version = "1.6", default-features = false } paste = { version = "1.0" } half = { version = "2.1", default-features = false, features = ["num-traits"] } +aes-gcm = { version = "0.10.3" , default-features = false, features = ["std", "aes"]} +rand = { version = "0.8", default-features = false, features = ["std", "std_rng"] } [dev-dependencies] base64 = { version = "0.22", default-features = false, features = ["std"] } @@ -82,6 +84,7 @@ arrow = { workspace = true, features = ["ipc", "test_utils", "prettyprint", "jso tokio = { version = "1.0", default-features = false, features = ["macros", "rt", "io-util", "fs"] } rand = { version = "0.8", default-features = false, features = ["std", "std_rng"] } object_store = { version = "0.9.0", default-features = false, features = ["azure"] } +aes-gcm = { version = "0.10.3" , default-features = false, features = ["std", "aes"] } [package.metadata.docs.rs] all-features = true @@ -104,6 +107,7 @@ experimental = [] async = ["futures", "tokio"] # Enable object_store integration object_store = ["dep:object_store", "async"] +#encryption = ["aes-gcm", "base64"] [[example]] name = "read_parquet" diff --git a/parquet/src/encryption/ciphers.rs b/parquet/src/encryption/ciphers.rs new file mode 100644 index 000000000000..bfe57f75b3c1 --- /dev/null +++ b/parquet/src/encryption/ciphers.rs @@ -0,0 +1,105 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Encryption implementation specific to Parquet, as described +//! in the [spec](https://github.com/apache/parquet-format/blob/master/Encryption.md). + +use aes_gcm::{aead::{Aead, KeyInit, Payload}, Nonce, Key, Aes128Gcm, AesGcm}; +use aes_gcm::aead::consts::U12; +use aes_gcm::aes::Aes128; +use crate::encryption::GCM_NONCE_LENGTH; +use rand::prelude::*; + +pub trait BlockEncryptor { + fn encrypt(&mut self, plaintext: &[u8], aad: &[u8]) -> Vec ; +} + +pub trait BlockDecryptor { + fn decrypt(&self, length_and_ciphertext: &[u8], aad: &[u8]) -> Vec ; +} + +pub(crate) struct GcmBlockEncryptor { + rng: ThreadRng, + cipher: AesGcm // todo support other key sizes +} + +impl GcmBlockEncryptor { + pub(crate) fn new(key_bytes: &[u8]) -> Self { + let key_size = key_bytes.len(); //todo check len + let key_vec = Vec::from(key_bytes); + let key = Key::::from_slice(&key_vec[..]); + + Self { + rng: rand::thread_rng(), + cipher: Aes128Gcm::new(&key) + } + } +} + +impl BlockEncryptor for GcmBlockEncryptor { + fn encrypt(&mut self, plaintext: &[u8], aad: &[u8]) -> Vec { + let nonce_buf: [u8; GCM_NONCE_LENGTH] = self.rng.gen(); + let nonce = Nonce::from_slice(&nonce_buf[..]); + + let plaint_text = Payload { + msg: &plaintext[..], + aad: &aad[..], + }; + + match self.cipher.encrypt(&nonce, plaint_text) { + Ok(encrypted) => { + let len: [u8; 4] = [0; 4]; + // todo fill len + let ctext = [&nonce_buf, &*encrypted].concat(); + [&len, &*ctext].concat() + }, + Err(e) => panic!("Failed to encrypt {}", e), + } + } +} + +pub(crate) struct GcmBlockDecryptor { + cipher: AesGcm // todo support other key sizes +} + +impl GcmBlockDecryptor { + pub(crate) fn new(key_bytes: &[u8]) -> Self { + let key_size = key_bytes.len(); //todo check key len + let key_vec = Vec::from(key_bytes); + let key = Key::::from_slice(&key_vec[..]); + + Self { + cipher: Aes128Gcm::new(&key) + } + } +} + +impl BlockDecryptor for GcmBlockDecryptor { + fn decrypt(&self, length_and_ciphertext: &[u8], aad: &[u8]) -> Vec { + let nonce = Nonce::from_slice(&length_and_ciphertext[4..16]); + + let cipher_text = Payload { + msg: &length_and_ciphertext[16..], + aad: &aad[..], + }; + + match self.cipher.decrypt(&nonce, cipher_text) { + Ok(decrypted) => decrypted, + Err(e) => panic!("{}", e), + } + } +} diff --git a/parquet/src/encryption/mod.rs b/parquet/src/encryption/mod.rs new file mode 100644 index 000000000000..aecb5f72580c --- /dev/null +++ b/parquet/src/encryption/mod.rs @@ -0,0 +1,23 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Encryption implementation specific to Parquet, as described +//! in the [spec](https://github.com/apache/parquet-format/blob/master/Encryption.md). + +pub mod ciphers; + +const GCM_NONCE_LENGTH: usize = 12; \ No newline at end of file diff --git a/parquet/src/file/footer.rs b/parquet/src/file/footer.rs index 9695dbeae6e1..fee3c24e7bd3 100644 --- a/parquet/src/file/footer.rs +++ b/parquet/src/file/footer.rs @@ -21,6 +21,8 @@ use crate::format::{ColumnOrder as TColumnOrder, FileMetaData as TFileMetaData}; use crate::thrift::{TCompactSliceInputProtocol, TSerializable}; use crate::basic::ColumnOrder; +use crate::encryption::ciphers; +use crate::encryption::ciphers::{BlockDecryptor, BlockEncryptor}; use crate::errors::{ParquetError, Result}; use crate::file::{metadata::*, reader::ChunkReader, FOOTER_SIZE, PARQUET_MAGIC}; @@ -67,8 +69,17 @@ pub fn parse_metadata(chunk_reader: &R) -> Result Result { + let aad: &[u8] = "abcdefgh".as_bytes(); + let key_code: Vec = vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15]; + + let mut encryptor = ciphers::GcmBlockEncryptor::new(&key_code); + let encrypted_foot = encryptor.encrypt(buf, aad); + + let decryptor = ciphers::GcmBlockDecryptor::new(&key_code); + let decrypted_foot = decryptor.decrypt(encrypted_foot.as_ref(), aad); + // TODO: row group filtering - let mut prot = TCompactSliceInputProtocol::new(buf); + let mut prot = TCompactSliceInputProtocol::new(decrypted_foot.as_ref()); let t_file_metadata: TFileMetaData = TFileMetaData::read_from_in_protocol(&mut prot) .map_err(|e| ParquetError::General(format!("Could not parse metadata: {e}")))?; let schema = types::from_thrift(&t_file_metadata.schema)?; diff --git a/parquet/src/lib.rs b/parquet/src/lib.rs index db5d72634389..16d22e5d8a95 100644 --- a/parquet/src/lib.rs +++ b/parquet/src/lib.rs @@ -124,6 +124,11 @@ pub mod column; experimental!(mod compression); experimental!(mod encodings); pub mod bloom_filter; + +//#[cfg(feature = "encryption")] +//pub mod encryption; +experimental!(mod encryption); + pub mod file; pub mod record; pub mod schema; diff --git a/tmp-bench1/Cargo.toml b/tmp-bench1/Cargo.toml new file mode 100644 index 000000000000..199770ab65de --- /dev/null +++ b/tmp-bench1/Cargo.toml @@ -0,0 +1,8 @@ +[package] +name = "gcm-bench" +version = "0.1.0" +edition = "2021" + +[dependencies] +aes-gcm = "=0.10.3" +rand = "0.9.0-alpha.0" diff --git a/tmp-bench1/src/main.rs b/tmp-bench1/src/main.rs new file mode 100644 index 000000000000..d8ec7b97a86a --- /dev/null +++ b/tmp-bench1/src/main.rs @@ -0,0 +1,75 @@ +use aes_gcm::{aead::{Aead, KeyInit, Payload}, Aes128Gcm, Nonce, Key}; +use std::time::SystemTime; +use rand::prelude::*; + +fn main() { + const GCM_NONCE_LENGTH: usize = 12; + + const P_LEN: usize = 1 * 1024 * 1024; + + let mut input: [u8; P_LEN] = [0; P_LEN]; + for i in 1..input.len() { + input[i] = i as u8; + } + + let mut prev_total: usize = 0; + let mut new_total: usize = 0; + let ini_time = SystemTime::now(); + let mut prev_time = SystemTime::now(); + let mut counter: i32 = 0; + + let key_code: Vec = vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15]; + let key = Key::::from_slice(&key_code[..]); + let cipher = Aes128Gcm::new(&key); + + let aad: &[u8] = "abcdefgh".as_bytes(); + let plaintext = Payload { + msg: &input[..], + aad: &aad[..], + }; + + + println!("Plaintext length: {}", input.len()); + + let mut rng = rand::thread_rng(); + let mut nonce: [u8; GCM_NONCE_LENGTH] = [0; GCM_NONCE_LENGTH]; + nonce.shuffle(&mut rng); + let nonce = Nonce::from_slice(&nonce[..]); + + let ciphertext: Vec; + + // Encrypt + match cipher.encrypt(&nonce, plaintext) { + Ok(ciphertxt) => { + println!("Ciphertext length: {}", ciphertxt.len()); + ciphertext = ciphertxt; + }, + Err(e) => panic!("{}", e), + } + + loop { + let cipher_text = Payload { + msg: &ciphertext[..], + aad: &aad[..], + }; + + match cipher.decrypt(&nonce, cipher_text) { + Ok(decrypted) => new_total += decrypted.len(), + Err(e) => panic!("{}", e), + } + + counter += 1; + + if counter % 1000 == 0 { + let tdelta = prev_time.elapsed().unwrap().as_millis(); + if tdelta > 3000 { + let ddelta: usize = new_total - prev_total; + let rate: f64 = (ddelta as f64) / (tdelta as f64) / 1024.0; // ~ MB/sec + println!("Rate: {} MB/sec. Counter: {}. Total, MB: {}. Time: {}", + rate, counter, new_total / 1024 / 1024, (ini_time.elapsed().unwrap().as_millis()) / 1000); + prev_time = SystemTime::now(); + prev_total = new_total; + } + } + } +} From 42c2bb962475715c470d410c24c0d562a071e41c Mon Sep 17 00:00:00 2001 From: Ben Hoberman Date: Sun, 24 Mar 2024 15:43:13 -0700 Subject: [PATCH 2/6] Add Ring Encryption Test --- Cargo.toml | 3 +- parquet/Cargo.toml | 1 + parquet/src/encryption/ciphers.rs | 243 ++++++++++++++++++++++++++++-- parquet/src/file/footer.rs | 2 +- tmp-bench1/Cargo.toml | 1 + 5 files changed, 236 insertions(+), 14 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index e09660941d60..53d3955115ca 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -58,7 +58,8 @@ exclude = [ # scratch this way, this is a stand-alone package that compiles independently of the others. "arrow-pyarrow-integration-testing", # object_store is excluded because it follows a separate release cycle from the other arrow crates - "object_store" + "object_store", + "tmp-bench1" ] [workspace.package] diff --git a/parquet/Cargo.toml b/parquet/Cargo.toml index eb7ed869409e..bfd9119559c5 100644 --- a/parquet/Cargo.toml +++ b/parquet/Cargo.toml @@ -69,6 +69,7 @@ paste = { version = "1.0" } half = { version = "2.1", default-features = false, features = ["num-traits"] } aes-gcm = { version = "0.10.3" , default-features = false, features = ["std", "aes"]} rand = { version = "0.8", default-features = false, features = ["std", "std_rng"] } +ring = { version = "0.17", default-features = false, features = ["std"]} [dev-dependencies] base64 = { version = "0.22", default-features = false, features = ["std"] } diff --git a/parquet/src/encryption/ciphers.rs b/parquet/src/encryption/ciphers.rs index bfe57f75b3c1..3574de3a1bd0 100644 --- a/parquet/src/encryption/ciphers.rs +++ b/parquet/src/encryption/ciphers.rs @@ -18,26 +18,31 @@ //! Encryption implementation specific to Parquet, as described //! in the [spec](https://github.com/apache/parquet-format/blob/master/Encryption.md). -use aes_gcm::{aead::{Aead, KeyInit, Payload}, Nonce, Key, Aes128Gcm, AesGcm}; +use crate::encryption::GCM_NONCE_LENGTH; use aes_gcm::aead::consts::U12; use aes_gcm::aes::Aes128; -use crate::encryption::GCM_NONCE_LENGTH; +use aes_gcm::{ + aead::{Aead, KeyInit, Payload}, + Aes128Gcm, AesGcm, Key, Nonce, +}; use rand::prelude::*; +use ring::aead::{Aad, LessSafeKey, NonceSequence, UnboundKey, AES_128_GCM}; +use ring::rand::{SecureRandom, SystemRandom}; pub trait BlockEncryptor { - fn encrypt(&mut self, plaintext: &[u8], aad: &[u8]) -> Vec ; + fn encrypt(&mut self, plaintext: &[u8], aad: &[u8]) -> Vec; } pub trait BlockDecryptor { - fn decrypt(&self, length_and_ciphertext: &[u8], aad: &[u8]) -> Vec ; + fn decrypt(&self, length_and_ciphertext: &[u8], aad: &[u8]) -> Vec; } -pub(crate) struct GcmBlockEncryptor { +pub(crate) struct AesGcmGcmBlockEncryptor { rng: ThreadRng, - cipher: AesGcm // todo support other key sizes + cipher: AesGcm, // todo support other key sizes } -impl GcmBlockEncryptor { +impl AesGcmGcmBlockEncryptor { pub(crate) fn new(key_bytes: &[u8]) -> Self { let key_size = key_bytes.len(); //todo check len let key_vec = Vec::from(key_bytes); @@ -45,12 +50,12 @@ impl GcmBlockEncryptor { Self { rng: rand::thread_rng(), - cipher: Aes128Gcm::new(&key) + cipher: Aes128Gcm::new(&key), } } } -impl BlockEncryptor for GcmBlockEncryptor { +impl BlockEncryptor for AesGcmGcmBlockEncryptor { fn encrypt(&mut self, plaintext: &[u8], aad: &[u8]) -> Vec { let nonce_buf: [u8; GCM_NONCE_LENGTH] = self.rng.gen(); let nonce = Nonce::from_slice(&nonce_buf[..]); @@ -66,14 +71,14 @@ impl BlockEncryptor for GcmBlockEncryptor { // todo fill len let ctext = [&nonce_buf, &*encrypted].concat(); [&len, &*ctext].concat() - }, + } Err(e) => panic!("Failed to encrypt {}", e), } } } pub(crate) struct GcmBlockDecryptor { - cipher: AesGcm // todo support other key sizes + cipher: AesGcm, // todo support other key sizes } impl GcmBlockDecryptor { @@ -83,7 +88,7 @@ impl GcmBlockDecryptor { let key = Key::::from_slice(&key_vec[..]); Self { - cipher: Aes128Gcm::new(&key) + cipher: Aes128Gcm::new(&key), } } } @@ -103,3 +108,217 @@ impl BlockDecryptor for GcmBlockDecryptor { } } } + +const LEFT_FOUR: u128 = 0xffff_ffff_0000_0000_0000_0000_0000_0000; +const RIGHT_TWELVE: u128 = 0x0000_0000_ffff_ffff_ffff_ffff_ffff_ffff; +const NONCE_LEN: usize = 12; + +struct CounterNonce { + start: u128, + counter: u128, +} + +impl CounterNonce { + pub fn new(rng: &SystemRandom) -> Self { + let mut buf = [0; 16]; + rng.fill(&mut buf).unwrap(); + + // Since this is a random seed value, endianess doesn't matter at all, + // and we can use whatever is platform-native. + let start = u128::from_ne_bytes(buf) & RIGHT_TWELVE; + let counter = start.wrapping_add(1); + + Self { start, counter } + } + + /// One accessor for the nonce bytes to avoid potentially flipping endianess + #[inline] + pub fn get_bytes(&self) -> [u8; NONCE_LEN] { + self.counter.to_le_bytes()[0..NONCE_LEN].try_into().unwrap() + } +} + +impl NonceSequence for CounterNonce { + fn advance(&mut self) -> Result { + // If we've wrapped around, we've exhausted this nonce sequence + if (self.counter & RIGHT_TWELVE) == (self.start & RIGHT_TWELVE) { + Err(ring::error::Unspecified) + } else { + // Otherwise, just advance and return the new value + let buf: [u8; NONCE_LEN] = self.get_bytes(); + self.counter = self.counter.wrapping_add(1); + Ok(ring::aead::Nonce::assume_unique_for_key(buf)) + } + } +} + +pub(crate) struct RingGcmBlockEncryptor { + key: LessSafeKey, + nonce_sequence: CounterNonce, +} + +impl RingGcmBlockEncryptor { + /// Create a new `RingGcmBlockEncryptor` with a random key and random nonce. + /// The nonce will advance appropriately with each block encryption and + /// return an error if it wraps around. + pub(crate) fn new() -> Self { + let rng = SystemRandom::new(); + let mut key_bytes = [0; 16]; + rng.fill(&mut key_bytes).unwrap(); + + let key = UnboundKey::new(&AES_128_GCM, key_bytes.as_ref()).unwrap(); + let nonce = CounterNonce::new(&rng); + + Self { + key: LessSafeKey::new(key), + nonce_sequence: nonce, + } + } +} + +impl BlockEncryptor for RingGcmBlockEncryptor { + fn encrypt(&mut self, plaintext: &[u8], aad: &[u8]) -> Vec { + let nonce = self.nonce_sequence.advance().unwrap(); + let mut result = + Vec::with_capacity(plaintext.len() + AES_128_GCM.tag_len() + AES_128_GCM.nonce_len()); + result.extend_from_slice(nonce.as_ref()); + result.extend_from_slice(plaintext); + + let tag = self + .key + .seal_in_place_separate_tag(nonce, Aad::from(aad), &mut result[NONCE_LEN..]) + .unwrap(); + result.extend_from_slice(tag.as_ref()); + + result + } +} + +pub(crate) struct RingGcmBlockDecryptor { + key: LessSafeKey, +} + +impl RingGcmBlockDecryptor { + pub(crate) fn new(key_bytes: &[u8]) -> Self { + let key = UnboundKey::new(&AES_128_GCM, key_bytes).unwrap(); + + Self { + key: LessSafeKey::new(key), + } + } + + fn new_from_less_safe_key(key: LessSafeKey) -> Self { + Self { key } + } +} + +impl BlockDecryptor for RingGcmBlockDecryptor { + fn decrypt(&self, length_and_ciphertext: &[u8], aad: &[u8]) -> Vec { + let mut result = Vec::with_capacity( + length_and_ciphertext.len() - AES_128_GCM.tag_len() - AES_128_GCM.nonce_len(), + ); + result.extend_from_slice(&length_and_ciphertext[AES_128_GCM.nonce_len()..]); + + let nonce = ring::aead::Nonce::try_assume_unique_for_key( + &length_and_ciphertext[0..AES_128_GCM.nonce_len()], + ) + .unwrap(); + + self.key + .open_in_place(nonce, Aad::from(aad), &mut result) + .unwrap(); + + result + } +} + +#[cfg(test)] +mod tests { + use std::{ + hint::black_box, + time::{Duration, Instant}, + }; + + use ring::aead::{Aad, LessSafeKey, UnboundKey, AES_128_GCM}; + + #[test] + fn bench_ring_encdec() { + // Pick a block large enough to avoid measuring start-up cost but small + // enough to fit into any reasonable L1 cache + const BLOCK_BYTES: usize = 16 * (1 << 10); // 16 KiB + let mut input: [u8; BLOCK_BYTES] = [0; BLOCK_BYTES]; + for i in 0..input.len() { + input[i] = i as u8; + } + + let key = LessSafeKey::new(UnboundKey::new(&AES_128_GCM, b"0123456789abcdef").unwrap()); + let nonce = || ring::aead::Nonce::assume_unique_for_key([0; 12]); + + // Benchmark the decryptor for some amount of time and see how much data has been processed + const TEST_MS: usize = 2000; + let mut enc_data: usize = 0; + let mut dec_data: usize = 0; + let mut enc_time: Duration = Duration::from_micros(0); + let mut dec_time = Duration::from_micros(0); + + // Warmup + for _ in 0..1000 { + let tag = black_box( + key.seal_in_place_separate_tag(nonce(), Aad::empty(), &mut input) + .unwrap(), + ); + + black_box( + key.open_in_place_separate_tag(nonce(), Aad::empty(), tag, &mut input, 0..) + .unwrap(), + ); + } + + loop { + let start = Instant::now(); + let tag = black_box( + key.seal_in_place_separate_tag( + black_box(nonce()), + black_box(Aad::empty()), + black_box(&mut input), + ) + .unwrap(), + ); + enc_time += start.elapsed(); + enc_data += input.len(); + + let start = Instant::now(); + black_box( + key.open_in_place_separate_tag( + black_box(nonce()), + black_box(Aad::empty()), + black_box(tag), + black_box(&mut input), + black_box(0..), + ) + .unwrap(), + ); + dec_time += start.elapsed(); + dec_data += input.len(); + + if enc_time.as_millis() as usize >= TEST_MS && dec_time.as_millis() as usize >= TEST_MS + { + break; + } + } + + println!( + "Encryption performance: {} MiB processed in {} ms, throughput is {} MiB/s", + (enc_data as f32 / (1 << 20) as f32), + enc_time.as_millis(), + enc_data as f32 / (1 << 20) as f32 / enc_time.as_secs_f32() + ); + + println!( + "Decryption performance: {} MiB processed in {} ms, throughput is {} MiB/s", + (dec_data as f32 / (1 << 20) as f32), + dec_time.as_millis(), + dec_data as f32 / (1 << 20) as f32 / dec_time.as_secs_f32() + ); + } +} diff --git a/parquet/src/file/footer.rs b/parquet/src/file/footer.rs index fee3c24e7bd3..622e2f657906 100644 --- a/parquet/src/file/footer.rs +++ b/parquet/src/file/footer.rs @@ -72,7 +72,7 @@ pub fn decode_metadata(buf: &[u8]) -> Result { let aad: &[u8] = "abcdefgh".as_bytes(); let key_code: Vec = vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15]; - let mut encryptor = ciphers::GcmBlockEncryptor::new(&key_code); + let mut encryptor = ciphers::AesGcmGcmBlockEncryptor::new(&key_code); let encrypted_foot = encryptor.encrypt(buf, aad); let decryptor = ciphers::GcmBlockDecryptor::new(&key_code); diff --git a/tmp-bench1/Cargo.toml b/tmp-bench1/Cargo.toml index 199770ab65de..4a1d398414d7 100644 --- a/tmp-bench1/Cargo.toml +++ b/tmp-bench1/Cargo.toml @@ -6,3 +6,4 @@ edition = "2021" [dependencies] aes-gcm = "=0.10.3" rand = "0.9.0-alpha.0" +ring="0.17" From 0ceb980107d2fa56dff929a6ae29ddff16079b57 Mon Sep 17 00:00:00 2001 From: Gidon Gershinsky Date: Thu, 28 Mar 2024 15:17:53 +0200 Subject: [PATCH 3/6] switch to ring; footer decryption --- parquet/Cargo.toml | 2 - parquet/src/arrow/arrow_reader/mod.rs | 11 ++ parquet/src/encryption/ciphers.rs | 261 ++++++++------------------ parquet/src/encryption/mod.rs | 2 - parquet/src/file/footer.rs | 65 +++++-- parquet/src/file/mod.rs | 1 + tmp-bench1/Cargo.toml | 9 - tmp-bench1/src/main.rs | 75 -------- 8 files changed, 143 insertions(+), 283 deletions(-) delete mode 100644 tmp-bench1/Cargo.toml delete mode 100644 tmp-bench1/src/main.rs diff --git a/parquet/Cargo.toml b/parquet/Cargo.toml index bfd9119559c5..8a4d4019681f 100644 --- a/parquet/Cargo.toml +++ b/parquet/Cargo.toml @@ -67,8 +67,6 @@ hashbrown = { version = "0.14", default-features = false } twox-hash = { version = "1.6", default-features = false } paste = { version = "1.0" } half = { version = "2.1", default-features = false, features = ["num-traits"] } -aes-gcm = { version = "0.10.3" , default-features = false, features = ["std", "aes"]} -rand = { version = "0.8", default-features = false, features = ["std", "std_rng"] } ring = { version = "0.17", default-features = false, features = ["std"]} [dev-dependencies] diff --git a/parquet/src/arrow/arrow_reader/mod.rs b/parquet/src/arrow/arrow_reader/mod.rs index 83d6f6f553fc..361b83de5963 100644 --- a/parquet/src/arrow/arrow_reader/mod.rs +++ b/parquet/src/arrow/arrow_reader/mod.rs @@ -1410,6 +1410,17 @@ mod tests { assert!(col.value(2).is_nan()); } + #[test] + fn test_uniform_encryption() { + let path = format!( + "{}/uniform_encryption.parquet.encrypted", + arrow::util::test_util::parquet_test_data(), + ); + let file = File::open(path).unwrap(); + let record_reader = ParquetRecordBatchReader::try_new(file, 128).unwrap(); + // todo check contents + } + #[test] fn test_read_float32_float64_byte_stream_split() { let path = format!( diff --git a/parquet/src/encryption/ciphers.rs b/parquet/src/encryption/ciphers.rs index 3574de3a1bd0..09c106e0c8f7 100644 --- a/parquet/src/encryption/ciphers.rs +++ b/parquet/src/encryption/ciphers.rs @@ -18,16 +18,9 @@ //! Encryption implementation specific to Parquet, as described //! in the [spec](https://github.com/apache/parquet-format/blob/master/Encryption.md). -use crate::encryption::GCM_NONCE_LENGTH; -use aes_gcm::aead::consts::U12; -use aes_gcm::aes::Aes128; -use aes_gcm::{ - aead::{Aead, KeyInit, Payload}, - Aes128Gcm, AesGcm, Key, Nonce, -}; -use rand::prelude::*; use ring::aead::{Aad, LessSafeKey, NonceSequence, UnboundKey, AES_128_GCM}; use ring::rand::{SecureRandom, SystemRandom}; +use crate::errors::{ParquetError, Result}; pub trait BlockEncryptor { fn encrypt(&mut self, plaintext: &[u8], aad: &[u8]) -> Vec; @@ -37,81 +30,10 @@ pub trait BlockDecryptor { fn decrypt(&self, length_and_ciphertext: &[u8], aad: &[u8]) -> Vec; } -pub(crate) struct AesGcmGcmBlockEncryptor { - rng: ThreadRng, - cipher: AesGcm, // todo support other key sizes -} - -impl AesGcmGcmBlockEncryptor { - pub(crate) fn new(key_bytes: &[u8]) -> Self { - let key_size = key_bytes.len(); //todo check len - let key_vec = Vec::from(key_bytes); - let key = Key::::from_slice(&key_vec[..]); - - Self { - rng: rand::thread_rng(), - cipher: Aes128Gcm::new(&key), - } - } -} - -impl BlockEncryptor for AesGcmGcmBlockEncryptor { - fn encrypt(&mut self, plaintext: &[u8], aad: &[u8]) -> Vec { - let nonce_buf: [u8; GCM_NONCE_LENGTH] = self.rng.gen(); - let nonce = Nonce::from_slice(&nonce_buf[..]); - - let plaint_text = Payload { - msg: &plaintext[..], - aad: &aad[..], - }; - - match self.cipher.encrypt(&nonce, plaint_text) { - Ok(encrypted) => { - let len: [u8; 4] = [0; 4]; - // todo fill len - let ctext = [&nonce_buf, &*encrypted].concat(); - [&len, &*ctext].concat() - } - Err(e) => panic!("Failed to encrypt {}", e), - } - } -} - -pub(crate) struct GcmBlockDecryptor { - cipher: AesGcm, // todo support other key sizes -} - -impl GcmBlockDecryptor { - pub(crate) fn new(key_bytes: &[u8]) -> Self { - let key_size = key_bytes.len(); //todo check key len - let key_vec = Vec::from(key_bytes); - let key = Key::::from_slice(&key_vec[..]); - - Self { - cipher: Aes128Gcm::new(&key), - } - } -} - -impl BlockDecryptor for GcmBlockDecryptor { - fn decrypt(&self, length_and_ciphertext: &[u8], aad: &[u8]) -> Vec { - let nonce = Nonce::from_slice(&length_and_ciphertext[4..16]); - - let cipher_text = Payload { - msg: &length_and_ciphertext[16..], - aad: &aad[..], - }; - - match self.cipher.decrypt(&nonce, cipher_text) { - Ok(decrypted) => decrypted, - Err(e) => panic!("{}", e), - } - } -} - -const LEFT_FOUR: u128 = 0xffff_ffff_0000_0000_0000_0000_0000_0000; const RIGHT_TWELVE: u128 = 0x0000_0000_ffff_ffff_ffff_ffff_ffff_ffff; const NONCE_LEN: usize = 12; +const TAG_LEN: usize = 16; +const SIZE_LEN: usize = 4; struct CounterNonce { start: u128, @@ -158,14 +80,15 @@ pub(crate) struct RingGcmBlockEncryptor { } impl RingGcmBlockEncryptor { - /// Create a new `RingGcmBlockEncryptor` with a random key and random nonce. + // todo TBD: some KMS systems produce data keys, need to be able to pass them to Encryptor. + // todo TBD: for other KMSs, we will create data keys inside arrow-rs, making sure to use SystemRandom + /// Create a new `RingGcmBlockEncryptor` with a given key and random nonce. /// The nonce will advance appropriately with each block encryption and /// return an error if it wraps around. - pub(crate) fn new() -> Self { + pub(crate) fn new(key_bytes: &[u8]) -> Self { let rng = SystemRandom::new(); - let mut key_bytes = [0; 16]; - rng.fill(&mut key_bytes).unwrap(); + // todo support other key sizes let key = UnboundKey::new(&AES_128_GCM, key_bytes.as_ref()).unwrap(); let nonce = CounterNonce::new(&rng); @@ -179,14 +102,16 @@ impl RingGcmBlockEncryptor { impl BlockEncryptor for RingGcmBlockEncryptor { fn encrypt(&mut self, plaintext: &[u8], aad: &[u8]) -> Vec { let nonce = self.nonce_sequence.advance().unwrap(); - let mut result = - Vec::with_capacity(plaintext.len() + AES_128_GCM.tag_len() + AES_128_GCM.nonce_len()); + let ciphertext_len = plaintext.len() + NONCE_LEN + TAG_LEN; + // todo TBD: add first 4 bytes with the length, per https://github.com/apache/parquet-format/blob/master/Encryption.md#51-encrypted-module-serialization + let mut result = Vec::with_capacity(SIZE_LEN + ciphertext_len); + result.extend_from_slice((ciphertext_len as i32).to_le_bytes().as_ref()); result.extend_from_slice(nonce.as_ref()); result.extend_from_slice(plaintext); let tag = self .key - .seal_in_place_separate_tag(nonce, Aad::from(aad), &mut result[NONCE_LEN..]) + .seal_in_place_separate_tag(nonce, Aad::from(aad), &mut result[SIZE_LEN + NONCE_LEN..]) .unwrap(); result.extend_from_slice(tag.as_ref()); @@ -200,27 +125,24 @@ pub(crate) struct RingGcmBlockDecryptor { impl RingGcmBlockDecryptor { pub(crate) fn new(key_bytes: &[u8]) -> Self { + // todo support other key sizes let key = UnboundKey::new(&AES_128_GCM, key_bytes).unwrap(); Self { key: LessSafeKey::new(key), } } - - fn new_from_less_safe_key(key: LessSafeKey) -> Self { - Self { key } - } } impl BlockDecryptor for RingGcmBlockDecryptor { fn decrypt(&self, length_and_ciphertext: &[u8], aad: &[u8]) -> Vec { let mut result = Vec::with_capacity( - length_and_ciphertext.len() - AES_128_GCM.tag_len() - AES_128_GCM.nonce_len(), + length_and_ciphertext.len() - SIZE_LEN - NONCE_LEN - TAG_LEN, ); - result.extend_from_slice(&length_and_ciphertext[AES_128_GCM.nonce_len()..]); + result.extend_from_slice(&length_and_ciphertext[SIZE_LEN + NONCE_LEN..]); let nonce = ring::aead::Nonce::try_assume_unique_for_key( - &length_and_ciphertext[0..AES_128_GCM.nonce_len()], + &length_and_ciphertext[SIZE_LEN..SIZE_LEN + NONCE_LEN], ) .unwrap(); @@ -232,93 +154,74 @@ impl BlockDecryptor for RingGcmBlockDecryptor { } } -#[cfg(test)] -mod tests { - use std::{ - hint::black_box, - time::{Duration, Instant}, - }; - - use ring::aead::{Aad, LessSafeKey, UnboundKey, AES_128_GCM}; - - #[test] - fn bench_ring_encdec() { - // Pick a block large enough to avoid measuring start-up cost but small - // enough to fit into any reasonable L1 cache - const BLOCK_BYTES: usize = 16 * (1 << 10); // 16 KiB - let mut input: [u8; BLOCK_BYTES] = [0; BLOCK_BYTES]; - for i in 0..input.len() { - input[i] = i as u8; - } +pub(crate) enum ModuleType { + Footer = 0, + ColumnMetaData = 1, + DataPage = 2, + DictionaryPage = 3, + DataPageHeader = 4, + DictionaryPageHeader = 5, + ColumnIndex = 6, + OffsetIndex = 7, + BloomFilterHeader = 8, + BloomFilterBitset = 9, +} - let key = LessSafeKey::new(UnboundKey::new(&AES_128_GCM, b"0123456789abcdef").unwrap()); - let nonce = || ring::aead::Nonce::assume_unique_for_key([0; 12]); - - // Benchmark the decryptor for some amount of time and see how much data has been processed - const TEST_MS: usize = 2000; - let mut enc_data: usize = 0; - let mut dec_data: usize = 0; - let mut enc_time: Duration = Duration::from_micros(0); - let mut dec_time = Duration::from_micros(0); - - // Warmup - for _ in 0..1000 { - let tag = black_box( - key.seal_in_place_separate_tag(nonce(), Aad::empty(), &mut input) - .unwrap(), - ); - - black_box( - key.open_in_place_separate_tag(nonce(), Aad::empty(), tag, &mut input, 0..) - .unwrap(), - ); - } +pub fn create_footer_aad(file_aad: &[u8]) -> Result> { + create_module_aad(file_aad, ModuleType::Footer, -1, -1, -1) +} - loop { - let start = Instant::now(); - let tag = black_box( - key.seal_in_place_separate_tag( - black_box(nonce()), - black_box(Aad::empty()), - black_box(&mut input), - ) - .unwrap(), - ); - enc_time += start.elapsed(); - enc_data += input.len(); - - let start = Instant::now(); - black_box( - key.open_in_place_separate_tag( - black_box(nonce()), - black_box(Aad::empty()), - black_box(tag), - black_box(&mut input), - black_box(0..), - ) - .unwrap(), - ); - dec_time += start.elapsed(); - dec_data += input.len(); - - if enc_time.as_millis() as usize >= TEST_MS && dec_time.as_millis() as usize >= TEST_MS - { - break; - } - } +pub fn create_module_aad(file_aad: &[u8], module_type: ModuleType, row_group_ordinal: i32, + column_ordinal: i32, page_ordinal: i32) -> Result> { - println!( - "Encryption performance: {} MiB processed in {} ms, throughput is {} MiB/s", - (enc_data as f32 / (1 << 20) as f32), - enc_time.as_millis(), - enc_data as f32 / (1 << 20) as f32 / enc_time.as_secs_f32() - ); + let module_buf = [module_type as u8]; - println!( - "Decryption performance: {} MiB processed in {} ms, throughput is {} MiB/s", - (dec_data as f32 / (1 << 20) as f32), - dec_time.as_millis(), - dec_data as f32 / (1 << 20) as f32 / dec_time.as_secs_f32() - ); + if module_buf[0] == (ModuleType::Footer as u8) { + let mut aad = Vec::with_capacity(file_aad.len() + 1); + aad.extend_from_slice(file_aad); + aad.extend_from_slice(module_buf.as_ref()); + return Ok(aad) + } + + if row_group_ordinal < 0 { + return Err(general_err!("Wrong row group ordinal: {}", row_group_ordinal)); + } + if row_group_ordinal > u16::MAX as i32 { + return Err(general_err!("Encrypted parquet files can't have more than {} row groups: {}", + u16::MAX, row_group_ordinal)); + } + + if column_ordinal < 0 { + return Err(general_err!("Wrong column ordinal: {}", column_ordinal)); + } + if column_ordinal > u16::MAX as i32 { + return Err(general_err!("Encrypted parquet files can't have more than {} columns: {}", + u16::MAX, column_ordinal)); } + + if module_buf[0] != (ModuleType::DataPageHeader as u8) && + module_buf[0] != (ModuleType::DataPage as u8) { + let mut aad = Vec::with_capacity(file_aad.len() + 5); + aad.extend_from_slice(file_aad); + aad.extend_from_slice(module_buf.as_ref()); + aad.extend_from_slice((row_group_ordinal as u16).to_le_bytes().as_ref()); + aad.extend_from_slice((column_ordinal as u16).to_le_bytes().as_ref()); + return Ok(aad) + } + + if page_ordinal < 0 { + return Err(general_err!("Wrong column ordinal: {}", page_ordinal)); + } + if page_ordinal > u16::MAX as i32 { + return Err(general_err!("Encrypted parquet files can't have more than {} pages in a chunk: {}", + u16::MAX, page_ordinal)); + } + + let mut aad = Vec::with_capacity(file_aad.len() + 7); + aad.extend_from_slice(file_aad); + aad.extend_from_slice(module_buf.as_ref()); + aad.extend_from_slice((row_group_ordinal as u16).to_le_bytes().as_ref()); + aad.extend_from_slice((column_ordinal as u16).to_le_bytes().as_ref()); + aad.extend_from_slice((page_ordinal as u16).to_le_bytes().as_ref()); + Ok(aad) } diff --git a/parquet/src/encryption/mod.rs b/parquet/src/encryption/mod.rs index aecb5f72580c..e0e7f5d81919 100644 --- a/parquet/src/encryption/mod.rs +++ b/parquet/src/encryption/mod.rs @@ -19,5 +19,3 @@ //! in the [spec](https://github.com/apache/parquet-format/blob/master/Encryption.md). pub mod ciphers; - -const GCM_NONCE_LENGTH: usize = 12; \ No newline at end of file diff --git a/parquet/src/file/footer.rs b/parquet/src/file/footer.rs index 622e2f657906..82de5df9fdc1 100644 --- a/parquet/src/file/footer.rs +++ b/parquet/src/file/footer.rs @@ -17,15 +17,17 @@ use std::{io::Read, sync::Arc}; -use crate::format::{ColumnOrder as TColumnOrder, FileMetaData as TFileMetaData}; +use crate::format::{ColumnOrder as TColumnOrder, FileMetaData as TFileMetaData, + FileCryptoMetaData as TFileCryptoMetaData, EncryptionAlgorithm}; use crate::thrift::{TCompactSliceInputProtocol, TSerializable}; use crate::basic::ColumnOrder; use crate::encryption::ciphers; -use crate::encryption::ciphers::{BlockDecryptor, BlockEncryptor}; +use crate::encryption::ciphers::{BlockDecryptor}; use crate::errors::{ParquetError, Result}; -use crate::file::{metadata::*, reader::ChunkReader, FOOTER_SIZE, PARQUET_MAGIC}; +use crate::file::{metadata::*, reader::ChunkReader, + FOOTER_SIZE, PARQUET_MAGIC, PARQUET_MAGIC_ENCR_FOOTER}; use crate::schema::types::{self, SchemaDescriptor}; @@ -51,7 +53,20 @@ pub fn parse_metadata(chunk_reader: &R) -> Result file_size as usize { @@ -64,22 +79,18 @@ pub fn parse_metadata(chunk_reader: &R) -> Result Result { - let aad: &[u8] = "abcdefgh".as_bytes(); - let key_code: Vec = vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15]; - - let mut encryptor = ciphers::AesGcmGcmBlockEncryptor::new(&key_code); - let encrypted_foot = encryptor.encrypt(buf, aad); - - let decryptor = ciphers::GcmBlockDecryptor::new(&key_code); - let decrypted_foot = decryptor.decrypt(encrypted_foot.as_ref(), aad); - // TODO: row group filtering - let mut prot = TCompactSliceInputProtocol::new(decrypted_foot.as_ref()); + let mut prot = TCompactSliceInputProtocol::new(buf.as_ref()); let t_file_metadata: TFileMetaData = TFileMetaData::read_from_in_protocol(&mut prot) .map_err(|e| ParquetError::General(format!("Could not parse metadata: {e}")))?; let schema = types::from_thrift(&t_file_metadata.schema)?; @@ -98,13 +109,35 @@ pub fn decode_metadata(buf: &[u8]) -> Result { schema_descr, column_orders, ); + Ok(ParquetMetaData::new(file_metadata, row_groups)) } +fn decode_encrypted_metadata(buf: &[u8]) -> Result { + // parse FileCryptoMetaData + let mut prot = TCompactSliceInputProtocol::new(buf.as_ref()); + let t_file_crypto_metadata: TFileCryptoMetaData = TFileCryptoMetaData::read_from_in_protocol(&mut prot) + .map_err(|e| ParquetError::General(format!("Could not parse crypto metadata: {e}")))?; + let algo = t_file_crypto_metadata.encryption_algorithm; + let aes_gcm_algo = if let EncryptionAlgorithm::AESGCMV1(a) = algo { a } + else { unreachable!() }; // todo add support for GCMCTRV1 + + // remaining buffer contains encrypted FileMetaData + // todo + let key_code: &[u8] = "0123456789012345".as_bytes(); + // todo: keep the object + let decryptor = ciphers::RingGcmBlockDecryptor::new(&key_code); + // todo get aad_prefix + let fmd_aad = ciphers::create_footer_aad(aes_gcm_algo.aad_file_unique.unwrap().as_ref()); + let decrypted_fmd_buf = decryptor.decrypt(prot.as_slice().as_ref(), fmd_aad.unwrap().as_ref()); + + decode_metadata(decrypted_fmd_buf.as_slice()) +} + /// Decodes the footer returning the metadata length in bytes pub fn decode_footer(slice: &[u8; FOOTER_SIZE]) -> Result { // check this is indeed a parquet file - if slice[4..] != PARQUET_MAGIC { + if slice[4..] != PARQUET_MAGIC && slice[4..] != PARQUET_MAGIC_ENCR_FOOTER { return Err(general_err!("Invalid Parquet file. Corrupt footer")); } diff --git a/parquet/src/file/mod.rs b/parquet/src/file/mod.rs index 6589d2efaf8b..309e5b5a9cce 100644 --- a/parquet/src/file/mod.rs +++ b/parquet/src/file/mod.rs @@ -107,3 +107,4 @@ pub mod writer; /// The length of the parquet footer in bytes pub const FOOTER_SIZE: usize = 8; const PARQUET_MAGIC: [u8; 4] = [b'P', b'A', b'R', b'1']; +const PARQUET_MAGIC_ENCR_FOOTER: [u8; 4] = [b'P', b'A', b'R', b'E']; diff --git a/tmp-bench1/Cargo.toml b/tmp-bench1/Cargo.toml deleted file mode 100644 index 4a1d398414d7..000000000000 --- a/tmp-bench1/Cargo.toml +++ /dev/null @@ -1,9 +0,0 @@ -[package] -name = "gcm-bench" -version = "0.1.0" -edition = "2021" - -[dependencies] -aes-gcm = "=0.10.3" -rand = "0.9.0-alpha.0" -ring="0.17" diff --git a/tmp-bench1/src/main.rs b/tmp-bench1/src/main.rs deleted file mode 100644 index d8ec7b97a86a..000000000000 --- a/tmp-bench1/src/main.rs +++ /dev/null @@ -1,75 +0,0 @@ -use aes_gcm::{aead::{Aead, KeyInit, Payload}, Aes128Gcm, Nonce, Key}; -use std::time::SystemTime; -use rand::prelude::*; - -fn main() { - const GCM_NONCE_LENGTH: usize = 12; - - const P_LEN: usize = 1 * 1024 * 1024; - - let mut input: [u8; P_LEN] = [0; P_LEN]; - for i in 1..input.len() { - input[i] = i as u8; - } - - let mut prev_total: usize = 0; - let mut new_total: usize = 0; - let ini_time = SystemTime::now(); - let mut prev_time = SystemTime::now(); - let mut counter: i32 = 0; - - let key_code: Vec = vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15]; - let key = Key::::from_slice(&key_code[..]); - let cipher = Aes128Gcm::new(&key); - - let aad: &[u8] = "abcdefgh".as_bytes(); - let plaintext = Payload { - msg: &input[..], - aad: &aad[..], - }; - - - println!("Plaintext length: {}", input.len()); - - let mut rng = rand::thread_rng(); - let mut nonce: [u8; GCM_NONCE_LENGTH] = [0; GCM_NONCE_LENGTH]; - nonce.shuffle(&mut rng); - let nonce = Nonce::from_slice(&nonce[..]); - - let ciphertext: Vec; - - // Encrypt - match cipher.encrypt(&nonce, plaintext) { - Ok(ciphertxt) => { - println!("Ciphertext length: {}", ciphertxt.len()); - ciphertext = ciphertxt; - }, - Err(e) => panic!("{}", e), - } - - loop { - let cipher_text = Payload { - msg: &ciphertext[..], - aad: &aad[..], - }; - - match cipher.decrypt(&nonce, cipher_text) { - Ok(decrypted) => new_total += decrypted.len(), - Err(e) => panic!("{}", e), - } - - counter += 1; - - if counter % 1000 == 0 { - let tdelta = prev_time.elapsed().unwrap().as_millis(); - if tdelta > 3000 { - let ddelta: usize = new_total - prev_total; - let rate: f64 = (ddelta as f64) / (tdelta as f64) / 1024.0; // ~ MB/sec - println!("Rate: {} MB/sec. Counter: {}. Total, MB: {}. Time: {}", - rate, counter, new_total / 1024 / 1024, (ini_time.elapsed().unwrap().as_millis()) / 1000); - prev_time = SystemTime::now(); - prev_total = new_total; - } - } - } -} From a602538141bd40421158085a775b9c812a4b1a9a Mon Sep 17 00:00:00 2001 From: Gidon Gershinsky Date: Thu, 28 Mar 2024 15:30:01 +0200 Subject: [PATCH 4/6] clean up --- Cargo.toml | 3 +-- parquet/Cargo.toml | 1 - parquet/src/file/footer.rs | 5 ++--- parquet/src/lib.rs | 1 - 4 files changed, 3 insertions(+), 7 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 53d3955115ca..e09660941d60 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -58,8 +58,7 @@ exclude = [ # scratch this way, this is a stand-alone package that compiles independently of the others. "arrow-pyarrow-integration-testing", # object_store is excluded because it follows a separate release cycle from the other arrow crates - "object_store", - "tmp-bench1" + "object_store" ] [workspace.package] diff --git a/parquet/Cargo.toml b/parquet/Cargo.toml index 8a4d4019681f..1d3dff450e18 100644 --- a/parquet/Cargo.toml +++ b/parquet/Cargo.toml @@ -83,7 +83,6 @@ arrow = { workspace = true, features = ["ipc", "test_utils", "prettyprint", "jso tokio = { version = "1.0", default-features = false, features = ["macros", "rt", "io-util", "fs"] } rand = { version = "0.8", default-features = false, features = ["std", "std_rng"] } object_store = { version = "0.9.0", default-features = false, features = ["azure"] } -aes-gcm = { version = "0.10.3" , default-features = false, features = ["std", "aes"] } [package.metadata.docs.rs] all-features = true diff --git a/parquet/src/file/footer.rs b/parquet/src/file/footer.rs index 82de5df9fdc1..0c926cfcbb6a 100644 --- a/parquet/src/file/footer.rs +++ b/parquet/src/file/footer.rs @@ -90,7 +90,7 @@ pub fn parse_metadata(chunk_reader: &R) -> Result Result { // TODO: row group filtering - let mut prot = TCompactSliceInputProtocol::new(buf.as_ref()); + let mut prot = TCompactSliceInputProtocol::new(buf); let t_file_metadata: TFileMetaData = TFileMetaData::read_from_in_protocol(&mut prot) .map_err(|e| ParquetError::General(format!("Could not parse metadata: {e}")))?; let schema = types::from_thrift(&t_file_metadata.schema)?; @@ -109,7 +109,6 @@ pub fn decode_metadata(buf: &[u8]) -> Result { schema_descr, column_orders, ); - Ok(ParquetMetaData::new(file_metadata, row_groups)) } @@ -137,7 +136,7 @@ fn decode_encrypted_metadata(buf: &[u8]) -> Result { /// Decodes the footer returning the metadata length in bytes pub fn decode_footer(slice: &[u8; FOOTER_SIZE]) -> Result { // check this is indeed a parquet file - if slice[4..] != PARQUET_MAGIC && slice[4..] != PARQUET_MAGIC_ENCR_FOOTER { + if slice[4..] != PARQUET_MAGIC { return Err(general_err!("Invalid Parquet file. Corrupt footer")); } diff --git a/parquet/src/lib.rs b/parquet/src/lib.rs index 16d22e5d8a95..318e7fb507ef 100644 --- a/parquet/src/lib.rs +++ b/parquet/src/lib.rs @@ -126,7 +126,6 @@ experimental!(mod encodings); pub mod bloom_filter; //#[cfg(feature = "encryption")] -//pub mod encryption; experimental!(mod encryption); pub mod file; From 30009b14e98bd3cd82f82fe12856a12c44a2e6b9 Mon Sep 17 00:00:00 2001 From: Gidon Gershinsky Date: Thu, 2 May 2024 15:34:38 +0300 Subject: [PATCH 5/6] update --- parquet/src/arrow/arrow_reader/mod.rs | 9 ++++- parquet/src/encryption/ciphers.rs | 55 +++++++++++++++++++++++++++ parquet/src/file/footer.rs | 31 ++++++++++----- 3 files changed, 85 insertions(+), 10 deletions(-) diff --git a/parquet/src/arrow/arrow_reader/mod.rs b/parquet/src/arrow/arrow_reader/mod.rs index 361b83de5963..bcfa0894ce05 100644 --- a/parquet/src/arrow/arrow_reader/mod.rs +++ b/parquet/src/arrow/arrow_reader/mod.rs @@ -43,6 +43,7 @@ use crate::file::footer; use crate::file::page_index::index_reader; pub use filter::{ArrowPredicate, ArrowPredicateFn, RowFilter}; pub use selection::{RowSelection, RowSelector}; +use crate::encryption::ciphers; /// A generic builder for constructing sync or async arrow parquet readers. This is not intended /// to be used directly, instead you should use the specialization for the type of reader @@ -250,7 +251,13 @@ impl ArrowReaderMetadata { /// /// See [`ParquetRecordBatchReaderBuilder::new_with_metadata`] for how this can be used pub fn load(reader: &T, options: ArrowReaderOptions) -> Result { - let mut metadata = footer::parse_metadata(reader)?; + // todo + let key_code: &[u8] = "0123456789012345".as_bytes(); + // todo + let decryption_properties = ciphers::FileDecryptionProperties::builder() + .with_footer_key(key_code.to_vec()) + .build(); + let mut metadata = footer::parse_metadata_with_decryption(reader, decryption_properties)?; if options.page_index { let column_index = metadata .row_groups() diff --git a/parquet/src/encryption/ciphers.rs b/parquet/src/encryption/ciphers.rs index 09c106e0c8f7..da35328c3e33 100644 --- a/parquet/src/encryption/ciphers.rs +++ b/parquet/src/encryption/ciphers.rs @@ -225,3 +225,58 @@ pub fn create_module_aad(file_aad: &[u8], module_type: ModuleType, row_group_ord aad.extend_from_slice((page_ordinal as u16).to_le_bytes().as_ref()); Ok(aad) } + +pub struct FileDecryptionProperties { + footer_key: Option> +} + +impl FileDecryptionProperties { + pub fn builder() -> DecryptionPropertiesBuilder { + DecryptionPropertiesBuilder::with_defaults() + } +} + +pub struct DecryptionPropertiesBuilder { + footer_key: Option> +} + +impl DecryptionPropertiesBuilder { + pub fn with_defaults() -> Self { + Self { + footer_key: None + } + } + + pub fn build(self) -> FileDecryptionProperties { + FileDecryptionProperties { + footer_key: self.footer_key + } + } + + // todo decr: doc comment + pub fn with_footer_key(mut self, value: Vec) -> Self { + self.footer_key = Some(value); + self + } +} + +pub struct FileDecryptor { + decryption_properties: FileDecryptionProperties, + // todo decr: change to BlockDecryptor + footer_decryptor: RingGcmBlockDecryptor +} + +impl FileDecryptor { + pub(crate) fn new(decryption_properties: FileDecryptionProperties) -> Self { + Self { + // todo decr: if no key available yet (not set in properties, will be retrieved from metadata) + footer_decryptor: RingGcmBlockDecryptor::new(decryption_properties.footer_key.clone().unwrap().as_ref()), + decryption_properties: decryption_properties + } + } + + // todo decr: change to BlockDecryptor + pub(crate) fn get_footer_decryptor(self) -> RingGcmBlockDecryptor { + self.footer_decryptor + } +} diff --git a/parquet/src/file/footer.rs b/parquet/src/file/footer.rs index 0c926cfcbb6a..463d739683dd 100644 --- a/parquet/src/file/footer.rs +++ b/parquet/src/file/footer.rs @@ -23,7 +23,7 @@ use crate::thrift::{TCompactSliceInputProtocol, TSerializable}; use crate::basic::ColumnOrder; use crate::encryption::ciphers; -use crate::encryption::ciphers::{BlockDecryptor}; +use crate::encryption::ciphers::{BlockDecryptor, FileDecryptionProperties, FileDecryptor}; use crate::errors::{ParquetError, Result}; use crate::file::{metadata::*, reader::ChunkReader, @@ -40,6 +40,10 @@ use crate::schema::types::{self, SchemaDescriptor}; /// The reader first reads DEFAULT_FOOTER_SIZE bytes from the end of the file. /// If it is not enough according to the length indicated in the footer, it reads more bytes. pub fn parse_metadata(chunk_reader: &R) -> Result { + parse_metadata_with_decryption(chunk_reader, FileDecryptionProperties::builder().build()) +} + +pub fn parse_metadata_with_decryption(chunk_reader: &R, decr_props: FileDecryptionProperties) -> Result { // check file is large enough to hold footer let file_size = chunk_reader.len(); if file_size < (FOOTER_SIZE as u64) { @@ -59,6 +63,7 @@ pub fn parse_metadata(chunk_reader: &R) -> Result(chunk_reader: &R) -> Result Result { } let column_orders = parse_column_orders(t_file_metadata.column_orders, &schema_descr); + if t_file_metadata.encryption_algorithm.is_some() { + // todo get key_metadata etc. Set file decryptor in return value + // todo check signature + } + let file_metadata = FileMetaData::new( t_file_metadata.version, t_file_metadata.num_rows, @@ -112,27 +122,30 @@ pub fn decode_metadata(buf: &[u8]) -> Result { Ok(ParquetMetaData::new(file_metadata, row_groups)) } -fn decode_encrypted_metadata(buf: &[u8]) -> Result { +fn decode_encrypted_metadata(buf: &[u8], file_decr_props: FileDecryptionProperties) -> Result { // parse FileCryptoMetaData let mut prot = TCompactSliceInputProtocol::new(buf.as_ref()); let t_file_crypto_metadata: TFileCryptoMetaData = TFileCryptoMetaData::read_from_in_protocol(&mut prot) .map_err(|e| ParquetError::General(format!("Could not parse crypto metadata: {e}")))?; let algo = t_file_crypto_metadata.encryption_algorithm; let aes_gcm_algo = if let EncryptionAlgorithm::AESGCMV1(a) = algo { a } - else { unreachable!() }; // todo add support for GCMCTRV1 + else { unreachable!() }; // todo decr: add support for GCMCTRV1 + + let file_decryptor = FileDecryptor::new(file_decr_props); + + // todo decr: get key_metadata // remaining buffer contains encrypted FileMetaData - // todo - let key_code: &[u8] = "0123456789012345".as_bytes(); - // todo: keep the object - let decryptor = ciphers::RingGcmBlockDecryptor::new(&key_code); - // todo get aad_prefix + let decryptor = file_decryptor.get_footer_decryptor(); + // todo decr: get aad_prefix + // todo decr: set both aad_prefix and aad_file_unique in file_decryptor let fmd_aad = ciphers::create_footer_aad(aes_gcm_algo.aad_file_unique.unwrap().as_ref()); let decrypted_fmd_buf = decryptor.decrypt(prot.as_slice().as_ref(), fmd_aad.unwrap().as_ref()); decode_metadata(decrypted_fmd_buf.as_slice()) } +// todo decr: add encryption support /// Decodes the footer returning the metadata length in bytes pub fn decode_footer(slice: &[u8; FOOTER_SIZE]) -> Result { // check this is indeed a parquet file From 32b0be3b6564481c8fa5a8c9de7a1ce29db1ee86 Mon Sep 17 00:00:00 2001 From: Gidon Gershinsky Date: Thu, 16 May 2024 11:18:54 +0300 Subject: [PATCH 6/6] move decr props to unitest --- parquet/src/arrow/arrow_reader/mod.rs | 35 +++++++++++++++++++++------ parquet/src/encryption/ciphers.rs | 2 +- parquet/src/file/footer.rs | 17 ++++++++----- 3 files changed, 39 insertions(+), 15 deletions(-) diff --git a/parquet/src/arrow/arrow_reader/mod.rs b/parquet/src/arrow/arrow_reader/mod.rs index bcfa0894ce05..52dcf1841dda 100644 --- a/parquet/src/arrow/arrow_reader/mod.rs +++ b/parquet/src/arrow/arrow_reader/mod.rs @@ -44,6 +44,7 @@ use crate::file::page_index::index_reader; pub use filter::{ArrowPredicate, ArrowPredicateFn, RowFilter}; pub use selection::{RowSelection, RowSelector}; use crate::encryption::ciphers; +use crate::encryption::ciphers::FileDecryptionProperties; /// A generic builder for constructing sync or async arrow parquet readers. This is not intended /// to be used directly, instead you should use the specialization for the type of reader @@ -251,13 +252,12 @@ impl ArrowReaderMetadata { /// /// See [`ParquetRecordBatchReaderBuilder::new_with_metadata`] for how this can be used pub fn load(reader: &T, options: ArrowReaderOptions) -> Result { - // todo - let key_code: &[u8] = "0123456789012345".as_bytes(); - // todo - let decryption_properties = ciphers::FileDecryptionProperties::builder() - .with_footer_key(key_code.to_vec()) - .build(); - let mut metadata = footer::parse_metadata_with_decryption(reader, decryption_properties)?; + Self::load_with_decryption(reader, options, FileDecryptionProperties::builder().build()) + } + + pub fn load_with_decryption(reader: &T, options: ArrowReaderOptions, + file_decryption_properties: FileDecryptionProperties) -> Result { + let mut metadata = footer::parse_metadata_with_decryption(reader, file_decryption_properties)?; if options.page_index { let column_index = metadata .row_groups() @@ -365,6 +365,11 @@ impl ParquetRecordBatchReaderBuilder { Ok(Self::new_with_metadata(reader, metadata)) } + pub fn try_new_with_decryption(reader: T, options: ArrowReaderOptions, file_decryption_properties: FileDecryptionProperties) -> Result { + let metadata = ArrowReaderMetadata::load_with_decryption(&reader, options, file_decryption_properties)?; + Ok(Self::new_with_metadata(reader, metadata)) + } + /// Create a [`ParquetRecordBatchReaderBuilder`] from the provided [`ArrowReaderMetadata`] /// /// This allows loading metadata once and using it to create multiple builders with @@ -613,6 +618,13 @@ impl ParquetRecordBatchReader { .build() } + pub fn try_new_with_decryption(reader: T, batch_size: usize, + file_decryption_properties: FileDecryptionProperties) -> Result { + ParquetRecordBatchReaderBuilder::try_new_with_decryption(reader, Default::default(), file_decryption_properties)? + .with_batch_size(batch_size) + .build() + } + /// Create a new [`ParquetRecordBatchReader`] from the provided [`RowGroups`] /// /// Note: this is a low-level interface see [`ParquetRecordBatchReader::try_new`] for a @@ -777,6 +789,7 @@ mod tests { BoolType, ByteArray, ByteArrayType, DataType, FixedLenByteArray, FixedLenByteArrayType, FloatType, Int32Type, Int64Type, Int96Type, }; + use crate::encryption::ciphers; use crate::errors::Result; use crate::file::properties::{EnabledStatistics, WriterProperties, WriterVersion}; use crate::file::writer::SerializedFileWriter; @@ -1424,7 +1437,13 @@ mod tests { arrow::util::test_util::parquet_test_data(), ); let file = File::open(path).unwrap(); - let record_reader = ParquetRecordBatchReader::try_new(file, 128).unwrap(); + // todo + let key_code: &[u8] = "0123456789012345".as_bytes(); + // todo + let decryption_properties = ciphers::FileDecryptionProperties::builder() + .with_footer_key(key_code.to_vec()) + .build(); + let record_reader = ParquetRecordBatchReader::try_new_with_decryption(file, 128, decryption_properties).unwrap(); // todo check contents } diff --git a/parquet/src/encryption/ciphers.rs b/parquet/src/encryption/ciphers.rs index da35328c3e33..db32146c6d5f 100644 --- a/parquet/src/encryption/ciphers.rs +++ b/parquet/src/encryption/ciphers.rs @@ -271,7 +271,7 @@ impl FileDecryptor { Self { // todo decr: if no key available yet (not set in properties, will be retrieved from metadata) footer_decryptor: RingGcmBlockDecryptor::new(decryption_properties.footer_key.clone().unwrap().as_ref()), - decryption_properties: decryption_properties + decryption_properties } } diff --git a/parquet/src/file/footer.rs b/parquet/src/file/footer.rs index 463d739683dd..c4ac894505d4 100644 --- a/parquet/src/file/footer.rs +++ b/parquet/src/file/footer.rs @@ -86,14 +86,20 @@ pub fn parse_metadata_with_decryption(chunk_reader: &R, decr_pro let start = file_size - footer_metadata_len as u64; if encrypted_footer { - decode_encrypted_metadata(chunk_reader.get_bytes(start, metadata_len)?.as_ref(), decr_props) + let file_decryptor = FileDecryptor::new(decr_props); + decode_encrypted_metadata(chunk_reader.get_bytes(start, metadata_len)?.as_ref(), file_decryptor) } else { decode_metadata(chunk_reader.get_bytes(start, metadata_len)?.as_ref()) } } -/// Decodes [`ParquetMetaData`] from the provided bytes pub fn decode_metadata(buf: &[u8]) -> Result { + decode_metadata_with_decryption(buf) +} + +/// Decodes [`ParquetMetaData`] from the provided bytes +// todo add file decryptor +pub fn decode_metadata_with_decryption(buf: &[u8]) -> Result { // TODO: row group filtering let mut prot = TCompactSliceInputProtocol::new(buf); let t_file_metadata: TFileMetaData = TFileMetaData::read_from_in_protocol(&mut prot) @@ -122,7 +128,7 @@ pub fn decode_metadata(buf: &[u8]) -> Result { Ok(ParquetMetaData::new(file_metadata, row_groups)) } -fn decode_encrypted_metadata(buf: &[u8], file_decr_props: FileDecryptionProperties) -> Result { +fn decode_encrypted_metadata(buf: &[u8], file_decryptor: FileDecryptor) -> Result { // parse FileCryptoMetaData let mut prot = TCompactSliceInputProtocol::new(buf.as_ref()); let t_file_crypto_metadata: TFileCryptoMetaData = TFileCryptoMetaData::read_from_in_protocol(&mut prot) @@ -131,8 +137,6 @@ fn decode_encrypted_metadata(buf: &[u8], file_decr_props: FileDecryptionProperti let aes_gcm_algo = if let EncryptionAlgorithm::AESGCMV1(a) = algo { a } else { unreachable!() }; // todo decr: add support for GCMCTRV1 - let file_decryptor = FileDecryptor::new(file_decr_props); - // todo decr: get key_metadata // remaining buffer contains encrypted FileMetaData @@ -142,7 +146,8 @@ fn decode_encrypted_metadata(buf: &[u8], file_decr_props: FileDecryptionProperti let fmd_aad = ciphers::create_footer_aad(aes_gcm_algo.aad_file_unique.unwrap().as_ref()); let decrypted_fmd_buf = decryptor.decrypt(prot.as_slice().as_ref(), fmd_aad.unwrap().as_ref()); - decode_metadata(decrypted_fmd_buf.as_slice()) + // todo add file decryptor + decode_metadata_with_decryption(decrypted_fmd_buf.as_slice()) } // todo decr: add encryption support