diff --git a/parquet/Cargo.toml b/parquet/Cargo.toml index e6d612e0cc62..1d3dff450e18 100644 --- a/parquet/Cargo.toml +++ b/parquet/Cargo.toml @@ -67,6 +67,7 @@ hashbrown = { version = "0.14", default-features = false } twox-hash = { version = "1.6", default-features = false } paste = { version = "1.0" } half = { version = "2.1", default-features = false, features = ["num-traits"] } +ring = { version = "0.17", default-features = false, features = ["std"]} [dev-dependencies] base64 = { version = "0.22", default-features = false, features = ["std"] } @@ -104,6 +105,7 @@ experimental = [] async = ["futures", "tokio"] # Enable object_store integration object_store = ["dep:object_store", "async"] +#encryption = ["aes-gcm", "base64"] [[example]] name = "read_parquet" diff --git a/parquet/src/arrow/arrow_reader/mod.rs b/parquet/src/arrow/arrow_reader/mod.rs index 83d6f6f553fc..52dcf1841dda 100644 --- a/parquet/src/arrow/arrow_reader/mod.rs +++ b/parquet/src/arrow/arrow_reader/mod.rs @@ -43,6 +43,8 @@ use crate::file::footer; use crate::file::page_index::index_reader; pub use filter::{ArrowPredicate, ArrowPredicateFn, RowFilter}; pub use selection::{RowSelection, RowSelector}; +use crate::encryption::ciphers; +use crate::encryption::ciphers::FileDecryptionProperties; /// A generic builder for constructing sync or async arrow parquet readers. This is not intended /// to be used directly, instead you should use the specialization for the type of reader @@ -250,7 +252,12 @@ impl ArrowReaderMetadata { /// /// See [`ParquetRecordBatchReaderBuilder::new_with_metadata`] for how this can be used pub fn load(reader: &T, options: ArrowReaderOptions) -> Result { - let mut metadata = footer::parse_metadata(reader)?; + Self::load_with_decryption(reader, options, FileDecryptionProperties::builder().build()) + } + + pub fn load_with_decryption(reader: &T, options: ArrowReaderOptions, + file_decryption_properties: FileDecryptionProperties) -> Result { + let mut metadata = footer::parse_metadata_with_decryption(reader, file_decryption_properties)?; if options.page_index { let column_index = metadata .row_groups() @@ -358,6 +365,11 @@ impl ParquetRecordBatchReaderBuilder { Ok(Self::new_with_metadata(reader, metadata)) } + pub fn try_new_with_decryption(reader: T, options: ArrowReaderOptions, file_decryption_properties: FileDecryptionProperties) -> Result { + let metadata = ArrowReaderMetadata::load_with_decryption(&reader, options, file_decryption_properties)?; + Ok(Self::new_with_metadata(reader, metadata)) + } + /// Create a [`ParquetRecordBatchReaderBuilder`] from the provided [`ArrowReaderMetadata`] /// /// This allows loading metadata once and using it to create multiple builders with @@ -606,6 +618,13 @@ impl ParquetRecordBatchReader { .build() } + pub fn try_new_with_decryption(reader: T, batch_size: usize, + file_decryption_properties: FileDecryptionProperties) -> Result { + ParquetRecordBatchReaderBuilder::try_new_with_decryption(reader, Default::default(), file_decryption_properties)? + .with_batch_size(batch_size) + .build() + } + /// Create a new [`ParquetRecordBatchReader`] from the provided [`RowGroups`] /// /// Note: this is a low-level interface see [`ParquetRecordBatchReader::try_new`] for a @@ -770,6 +789,7 @@ mod tests { BoolType, ByteArray, ByteArrayType, DataType, FixedLenByteArray, FixedLenByteArrayType, FloatType, Int32Type, Int64Type, Int96Type, }; + use crate::encryption::ciphers; use crate::errors::Result; use crate::file::properties::{EnabledStatistics, WriterProperties, WriterVersion}; use crate::file::writer::SerializedFileWriter; @@ -1410,6 +1430,23 @@ mod tests { assert!(col.value(2).is_nan()); } + #[test] + fn test_uniform_encryption() { + let path = format!( + "{}/uniform_encryption.parquet.encrypted", + arrow::util::test_util::parquet_test_data(), + ); + let file = File::open(path).unwrap(); + // todo + let key_code: &[u8] = "0123456789012345".as_bytes(); + // todo + let decryption_properties = ciphers::FileDecryptionProperties::builder() + .with_footer_key(key_code.to_vec()) + .build(); + let record_reader = ParquetRecordBatchReader::try_new_with_decryption(file, 128, decryption_properties).unwrap(); + // todo check contents + } + #[test] fn test_read_float32_float64_byte_stream_split() { let path = format!( diff --git a/parquet/src/encryption/ciphers.rs b/parquet/src/encryption/ciphers.rs new file mode 100644 index 000000000000..db32146c6d5f --- /dev/null +++ b/parquet/src/encryption/ciphers.rs @@ -0,0 +1,282 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Encryption implementation specific to Parquet, as described +//! in the [spec](https://github.com/apache/parquet-format/blob/master/Encryption.md). + +use ring::aead::{Aad, LessSafeKey, NonceSequence, UnboundKey, AES_128_GCM}; +use ring::rand::{SecureRandom, SystemRandom}; +use crate::errors::{ParquetError, Result}; + +pub trait BlockEncryptor { + fn encrypt(&mut self, plaintext: &[u8], aad: &[u8]) -> Vec; +} + +pub trait BlockDecryptor { + fn decrypt(&self, length_and_ciphertext: &[u8], aad: &[u8]) -> Vec; +} + +const RIGHT_TWELVE: u128 = 0x0000_0000_ffff_ffff_ffff_ffff_ffff_ffff; +const NONCE_LEN: usize = 12; +const TAG_LEN: usize = 16; +const SIZE_LEN: usize = 4; + +struct CounterNonce { + start: u128, + counter: u128, +} + +impl CounterNonce { + pub fn new(rng: &SystemRandom) -> Self { + let mut buf = [0; 16]; + rng.fill(&mut buf).unwrap(); + + // Since this is a random seed value, endianess doesn't matter at all, + // and we can use whatever is platform-native. + let start = u128::from_ne_bytes(buf) & RIGHT_TWELVE; + let counter = start.wrapping_add(1); + + Self { start, counter } + } + + /// One accessor for the nonce bytes to avoid potentially flipping endianess + #[inline] + pub fn get_bytes(&self) -> [u8; NONCE_LEN] { + self.counter.to_le_bytes()[0..NONCE_LEN].try_into().unwrap() + } +} + +impl NonceSequence for CounterNonce { + fn advance(&mut self) -> Result { + // If we've wrapped around, we've exhausted this nonce sequence + if (self.counter & RIGHT_TWELVE) == (self.start & RIGHT_TWELVE) { + Err(ring::error::Unspecified) + } else { + // Otherwise, just advance and return the new value + let buf: [u8; NONCE_LEN] = self.get_bytes(); + self.counter = self.counter.wrapping_add(1); + Ok(ring::aead::Nonce::assume_unique_for_key(buf)) + } + } +} + +pub(crate) struct RingGcmBlockEncryptor { + key: LessSafeKey, + nonce_sequence: CounterNonce, +} + +impl RingGcmBlockEncryptor { + // todo TBD: some KMS systems produce data keys, need to be able to pass them to Encryptor. + // todo TBD: for other KMSs, we will create data keys inside arrow-rs, making sure to use SystemRandom + /// Create a new `RingGcmBlockEncryptor` with a given key and random nonce. + /// The nonce will advance appropriately with each block encryption and + /// return an error if it wraps around. + pub(crate) fn new(key_bytes: &[u8]) -> Self { + let rng = SystemRandom::new(); + + // todo support other key sizes + let key = UnboundKey::new(&AES_128_GCM, key_bytes.as_ref()).unwrap(); + let nonce = CounterNonce::new(&rng); + + Self { + key: LessSafeKey::new(key), + nonce_sequence: nonce, + } + } +} + +impl BlockEncryptor for RingGcmBlockEncryptor { + fn encrypt(&mut self, plaintext: &[u8], aad: &[u8]) -> Vec { + let nonce = self.nonce_sequence.advance().unwrap(); + let ciphertext_len = plaintext.len() + NONCE_LEN + TAG_LEN; + // todo TBD: add first 4 bytes with the length, per https://github.com/apache/parquet-format/blob/master/Encryption.md#51-encrypted-module-serialization + let mut result = Vec::with_capacity(SIZE_LEN + ciphertext_len); + result.extend_from_slice((ciphertext_len as i32).to_le_bytes().as_ref()); + result.extend_from_slice(nonce.as_ref()); + result.extend_from_slice(plaintext); + + let tag = self + .key + .seal_in_place_separate_tag(nonce, Aad::from(aad), &mut result[SIZE_LEN + NONCE_LEN..]) + .unwrap(); + result.extend_from_slice(tag.as_ref()); + + result + } +} + +pub(crate) struct RingGcmBlockDecryptor { + key: LessSafeKey, +} + +impl RingGcmBlockDecryptor { + pub(crate) fn new(key_bytes: &[u8]) -> Self { + // todo support other key sizes + let key = UnboundKey::new(&AES_128_GCM, key_bytes).unwrap(); + + Self { + key: LessSafeKey::new(key), + } + } +} + +impl BlockDecryptor for RingGcmBlockDecryptor { + fn decrypt(&self, length_and_ciphertext: &[u8], aad: &[u8]) -> Vec { + let mut result = Vec::with_capacity( + length_and_ciphertext.len() - SIZE_LEN - NONCE_LEN - TAG_LEN, + ); + result.extend_from_slice(&length_and_ciphertext[SIZE_LEN + NONCE_LEN..]); + + let nonce = ring::aead::Nonce::try_assume_unique_for_key( + &length_and_ciphertext[SIZE_LEN..SIZE_LEN + NONCE_LEN], + ) + .unwrap(); + + self.key + .open_in_place(nonce, Aad::from(aad), &mut result) + .unwrap(); + + result + } +} + +pub(crate) enum ModuleType { + Footer = 0, + ColumnMetaData = 1, + DataPage = 2, + DictionaryPage = 3, + DataPageHeader = 4, + DictionaryPageHeader = 5, + ColumnIndex = 6, + OffsetIndex = 7, + BloomFilterHeader = 8, + BloomFilterBitset = 9, +} + +pub fn create_footer_aad(file_aad: &[u8]) -> Result> { + create_module_aad(file_aad, ModuleType::Footer, -1, -1, -1) +} + +pub fn create_module_aad(file_aad: &[u8], module_type: ModuleType, row_group_ordinal: i32, + column_ordinal: i32, page_ordinal: i32) -> Result> { + + let module_buf = [module_type as u8]; + + if module_buf[0] == (ModuleType::Footer as u8) { + let mut aad = Vec::with_capacity(file_aad.len() + 1); + aad.extend_from_slice(file_aad); + aad.extend_from_slice(module_buf.as_ref()); + return Ok(aad) + } + + if row_group_ordinal < 0 { + return Err(general_err!("Wrong row group ordinal: {}", row_group_ordinal)); + } + if row_group_ordinal > u16::MAX as i32 { + return Err(general_err!("Encrypted parquet files can't have more than {} row groups: {}", + u16::MAX, row_group_ordinal)); + } + + if column_ordinal < 0 { + return Err(general_err!("Wrong column ordinal: {}", column_ordinal)); + } + if column_ordinal > u16::MAX as i32 { + return Err(general_err!("Encrypted parquet files can't have more than {} columns: {}", + u16::MAX, column_ordinal)); + } + + if module_buf[0] != (ModuleType::DataPageHeader as u8) && + module_buf[0] != (ModuleType::DataPage as u8) { + let mut aad = Vec::with_capacity(file_aad.len() + 5); + aad.extend_from_slice(file_aad); + aad.extend_from_slice(module_buf.as_ref()); + aad.extend_from_slice((row_group_ordinal as u16).to_le_bytes().as_ref()); + aad.extend_from_slice((column_ordinal as u16).to_le_bytes().as_ref()); + return Ok(aad) + } + + if page_ordinal < 0 { + return Err(general_err!("Wrong column ordinal: {}", page_ordinal)); + } + if page_ordinal > u16::MAX as i32 { + return Err(general_err!("Encrypted parquet files can't have more than {} pages in a chunk: {}", + u16::MAX, page_ordinal)); + } + + let mut aad = Vec::with_capacity(file_aad.len() + 7); + aad.extend_from_slice(file_aad); + aad.extend_from_slice(module_buf.as_ref()); + aad.extend_from_slice((row_group_ordinal as u16).to_le_bytes().as_ref()); + aad.extend_from_slice((column_ordinal as u16).to_le_bytes().as_ref()); + aad.extend_from_slice((page_ordinal as u16).to_le_bytes().as_ref()); + Ok(aad) +} + +pub struct FileDecryptionProperties { + footer_key: Option> +} + +impl FileDecryptionProperties { + pub fn builder() -> DecryptionPropertiesBuilder { + DecryptionPropertiesBuilder::with_defaults() + } +} + +pub struct DecryptionPropertiesBuilder { + footer_key: Option> +} + +impl DecryptionPropertiesBuilder { + pub fn with_defaults() -> Self { + Self { + footer_key: None + } + } + + pub fn build(self) -> FileDecryptionProperties { + FileDecryptionProperties { + footer_key: self.footer_key + } + } + + // todo decr: doc comment + pub fn with_footer_key(mut self, value: Vec) -> Self { + self.footer_key = Some(value); + self + } +} + +pub struct FileDecryptor { + decryption_properties: FileDecryptionProperties, + // todo decr: change to BlockDecryptor + footer_decryptor: RingGcmBlockDecryptor +} + +impl FileDecryptor { + pub(crate) fn new(decryption_properties: FileDecryptionProperties) -> Self { + Self { + // todo decr: if no key available yet (not set in properties, will be retrieved from metadata) + footer_decryptor: RingGcmBlockDecryptor::new(decryption_properties.footer_key.clone().unwrap().as_ref()), + decryption_properties + } + } + + // todo decr: change to BlockDecryptor + pub(crate) fn get_footer_decryptor(self) -> RingGcmBlockDecryptor { + self.footer_decryptor + } +} diff --git a/parquet/src/encryption/mod.rs b/parquet/src/encryption/mod.rs new file mode 100644 index 000000000000..e0e7f5d81919 --- /dev/null +++ b/parquet/src/encryption/mod.rs @@ -0,0 +1,21 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Encryption implementation specific to Parquet, as described +//! in the [spec](https://github.com/apache/parquet-format/blob/master/Encryption.md). + +pub mod ciphers; diff --git a/parquet/src/file/footer.rs b/parquet/src/file/footer.rs index 9695dbeae6e1..c4ac894505d4 100644 --- a/parquet/src/file/footer.rs +++ b/parquet/src/file/footer.rs @@ -17,13 +17,17 @@ use std::{io::Read, sync::Arc}; -use crate::format::{ColumnOrder as TColumnOrder, FileMetaData as TFileMetaData}; +use crate::format::{ColumnOrder as TColumnOrder, FileMetaData as TFileMetaData, + FileCryptoMetaData as TFileCryptoMetaData, EncryptionAlgorithm}; use crate::thrift::{TCompactSliceInputProtocol, TSerializable}; use crate::basic::ColumnOrder; +use crate::encryption::ciphers; +use crate::encryption::ciphers::{BlockDecryptor, FileDecryptionProperties, FileDecryptor}; use crate::errors::{ParquetError, Result}; -use crate::file::{metadata::*, reader::ChunkReader, FOOTER_SIZE, PARQUET_MAGIC}; +use crate::file::{metadata::*, reader::ChunkReader, + FOOTER_SIZE, PARQUET_MAGIC, PARQUET_MAGIC_ENCR_FOOTER}; use crate::schema::types::{self, SchemaDescriptor}; @@ -36,6 +40,10 @@ use crate::schema::types::{self, SchemaDescriptor}; /// The reader first reads DEFAULT_FOOTER_SIZE bytes from the end of the file. /// If it is not enough according to the length indicated in the footer, it reads more bytes. pub fn parse_metadata(chunk_reader: &R) -> Result { + parse_metadata_with_decryption(chunk_reader, FileDecryptionProperties::builder().build()) +} + +pub fn parse_metadata_with_decryption(chunk_reader: &R, decr_props: FileDecryptionProperties) -> Result { // check file is large enough to hold footer let file_size = chunk_reader.len(); if file_size < (FOOTER_SIZE as u64) { @@ -49,7 +57,21 @@ pub fn parse_metadata(chunk_reader: &R) -> Result file_size as usize { @@ -62,11 +84,22 @@ pub fn parse_metadata(chunk_reader: &R) -> Result Result { + decode_metadata_with_decryption(buf) +} + +/// Decodes [`ParquetMetaData`] from the provided bytes +// todo add file decryptor +pub fn decode_metadata_with_decryption(buf: &[u8]) -> Result { // TODO: row group filtering let mut prot = TCompactSliceInputProtocol::new(buf); let t_file_metadata: TFileMetaData = TFileMetaData::read_from_in_protocol(&mut prot) @@ -79,6 +112,11 @@ pub fn decode_metadata(buf: &[u8]) -> Result { } let column_orders = parse_column_orders(t_file_metadata.column_orders, &schema_descr); + if t_file_metadata.encryption_algorithm.is_some() { + // todo get key_metadata etc. Set file decryptor in return value + // todo check signature + } + let file_metadata = FileMetaData::new( t_file_metadata.version, t_file_metadata.num_rows, @@ -90,6 +128,29 @@ pub fn decode_metadata(buf: &[u8]) -> Result { Ok(ParquetMetaData::new(file_metadata, row_groups)) } +fn decode_encrypted_metadata(buf: &[u8], file_decryptor: FileDecryptor) -> Result { + // parse FileCryptoMetaData + let mut prot = TCompactSliceInputProtocol::new(buf.as_ref()); + let t_file_crypto_metadata: TFileCryptoMetaData = TFileCryptoMetaData::read_from_in_protocol(&mut prot) + .map_err(|e| ParquetError::General(format!("Could not parse crypto metadata: {e}")))?; + let algo = t_file_crypto_metadata.encryption_algorithm; + let aes_gcm_algo = if let EncryptionAlgorithm::AESGCMV1(a) = algo { a } + else { unreachable!() }; // todo decr: add support for GCMCTRV1 + + // todo decr: get key_metadata + + // remaining buffer contains encrypted FileMetaData + let decryptor = file_decryptor.get_footer_decryptor(); + // todo decr: get aad_prefix + // todo decr: set both aad_prefix and aad_file_unique in file_decryptor + let fmd_aad = ciphers::create_footer_aad(aes_gcm_algo.aad_file_unique.unwrap().as_ref()); + let decrypted_fmd_buf = decryptor.decrypt(prot.as_slice().as_ref(), fmd_aad.unwrap().as_ref()); + + // todo add file decryptor + decode_metadata_with_decryption(decrypted_fmd_buf.as_slice()) +} + +// todo decr: add encryption support /// Decodes the footer returning the metadata length in bytes pub fn decode_footer(slice: &[u8; FOOTER_SIZE]) -> Result { // check this is indeed a parquet file diff --git a/parquet/src/file/mod.rs b/parquet/src/file/mod.rs index 6589d2efaf8b..309e5b5a9cce 100644 --- a/parquet/src/file/mod.rs +++ b/parquet/src/file/mod.rs @@ -107,3 +107,4 @@ pub mod writer; /// The length of the parquet footer in bytes pub const FOOTER_SIZE: usize = 8; const PARQUET_MAGIC: [u8; 4] = [b'P', b'A', b'R', b'1']; +const PARQUET_MAGIC_ENCR_FOOTER: [u8; 4] = [b'P', b'A', b'R', b'E']; diff --git a/parquet/src/lib.rs b/parquet/src/lib.rs index db5d72634389..318e7fb507ef 100644 --- a/parquet/src/lib.rs +++ b/parquet/src/lib.rs @@ -124,6 +124,10 @@ pub mod column; experimental!(mod compression); experimental!(mod encodings); pub mod bloom_filter; + +//#[cfg(feature = "encryption")] +experimental!(mod encryption); + pub mod file; pub mod record; pub mod schema;