Skip to content

Commit

Permalink
feat: Streaming write for PrivateFile (#163)
Browse files Browse the repository at this point in the history
* Add initial structure

* Implement sharing

* refactor: Remove `content_key` from `PrivateRef`

It can always be derived from `revision_key`.
Storing it will only make it possible for `revision_key` and
`content_key` to get out-of-sync.

* refactor: Extract out `PrivateDirectoryContent`

* Key-wrap using AES-KWP

* refactor: use AES-KWP for encrypting previous links

* refactor: Previous to be a set of encrypted Cids

instead of an encrypted set of Cids

* refactor: Add "# of revisions back" to backpointers

* refactor: Extract private file content into struct

* refactor: Move AES-KWP logic into `RevisionKey`

* refactor: Move AES-GCM logic into `ContentKey`

* chore: Add TODO comments for missing docs

* fix: doctests due to refactor (whoops)

* refactor: remove `KeyType` struct

* refactor: Implement `load` & `store` for PNH

* refactor: Split header & content, add disambiguation

So:
- PrivateNodeHeader gets its own block
- PrivateFile and PrivateDirectory refer back to the header via a CID
- PrivateRef gets its own "disambiguation pointer" content_cid
- PrivateForest now resolves PrivateRefs
- PrivateRefs always refer to pre-existing content, never to "open slots"

* refactor: Introduce `RevisionRef` & fix examples

* refactor: Adjust doctests ✅

* refactor: remove `.derive_private_ref()`

* refactor: Simplify `SharePointer` creation

* refactor: Remove `Version` from private dir content

* refactor: move `persisted_as` into dir content

* refactor: Move `persisted_as` into private file content

* refactor: Docs & more

* clippy: configure to ignore `Encrypted` wrapper

* refactor: Rename to `TemporalKey` & `SnapshotKey`

instead of `RevisionKey` and `ContentKey`, respectively.

* refactor: Use `&mut Rc<PrivateForest>` and similar (#161)

Also, make use of `Rc::make_mut`, accordingly.

* refactor: Some fixes for wasm

* feat: Refactor & add stuff to the wasm interface

* feat: Allow wasm extracting values out of `PrivateRef`

* fix: Small fix in doctest

* feat: try implementing streaming write

Having an issue with it in tests though. It stack-overflows.

* fix: Fix streaming write implementation

* docs: Write 'em

* fix: Serialization and deserialization of share payloads

* chore: Remove logging 🔇

* refactor: Just use a different tagging mechanism

* refactor: Use `test_setup!` more consistently

* fix: incorrectly resolved merge conflicts

* fix: wasm tests. Also: Pin newest wasm-bindgen version

(it had some good bugfixes regarding FinalizationRegistries recently)

* refactor: Add a test fixture for testing streaming write

* fix: ✅ Fix tests

---------

Co-authored-by: Stephen Akinyemi <[email protected]>
  • Loading branch information
matheus23 and appcypher authored Feb 22, 2023
1 parent d7870bc commit 1bfe89b
Show file tree
Hide file tree
Showing 7 changed files with 253 additions and 12 deletions.
2 changes: 1 addition & 1 deletion wnfs-bench/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ edition = "2021"
[dev-dependencies]
async-std = { version = "1.11", features = ["attributes"] }
criterion = { version = "0.4", features = ["async_std"] }
proptest = "1.0"
proptest = "1.1"
wnfs = { path = "../wnfs", features = ["test_strategies"] }

[[bench]]
Expand Down
4 changes: 2 additions & 2 deletions wnfs/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ libipld = { version = "0.16", features = ["dag-cbor", "derive", "serde-codec"] }
log = "0.4"
multihash = "0.18"
once_cell = "1.16"
proptest = { version = "1.0", optional = true }
proptest = { version = "1.1", optional = true }
rand_core = "0.6"
rsa = "0.7"
semver = { version = "1.0", features = ["serde"] }
Expand All @@ -47,7 +47,7 @@ xxhash-rust = { version = "0.8", features = ["xxh3"] }
[dev-dependencies]
async-std = { version = "1.11", features = ["attributes"] }
env_logger = "0.10"
proptest = "1.0"
proptest = "1.1"
rand = "0.8"
rsa = "0.7"
test-log = "0.2"
Expand Down
7 changes: 7 additions & 0 deletions wnfs/proptest-regressions/private/node.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
# Seeds for failure cases proptest has generated in the past. It is
# automatically read and these particular cases re-run before any
# novel cases are generated.
#
# It is recommended to check this file in to source control so that
# everyone who runs the test benefits from these saved cases.
cc ff287a98919993cde9c4c1debe5a40170697949b611b0d37c24c9b6bce66ffc3 # shrinks to input = _SnapshotKeyCanEncryptAndDecryptDataInPlaceArgs { data: [], key_bytes: [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0], nonce: [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0] }
24 changes: 24 additions & 0 deletions wnfs/src/common/utils.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use crate::{error, FsError, HashOutput};
use anyhow::Result;
use futures::{AsyncRead, AsyncReadExt};
#[cfg(any(test, feature = "test_strategies"))]
use proptest::{
strategy::{Strategy, ValueTree},
Expand Down Expand Up @@ -66,6 +67,29 @@ pub(crate) fn split_last(path_segments: &[String]) -> Result<(&[String], &String
}
}

pub(crate) async fn read_fully(
stream: &mut (impl AsyncRead + Unpin),
buffer: &mut [u8],
) -> Result<(usize, bool)> {
let mut bytes_read = 0;
let mut done = false;
loop {
let bytes_read_in_iteration = stream.read(&mut buffer[bytes_read..]).await?;

bytes_read += bytes_read_in_iteration;

if bytes_read_in_iteration == 0 {
done = true;
break;
}

if bytes_read == buffer.len() {
break;
}
}
Ok((bytes_read, done))
}

/// Generates a random byte array of the given length.
///
/// # Examples
Expand Down
151 changes: 149 additions & 2 deletions wnfs/src/private/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use anyhow::Result;
use async_once_cell::OnceCell;
use async_stream::try_stream;
use chrono::{DateTime, Utc};
use futures::{future, Stream, StreamExt};
use futures::{future, AsyncRead, Stream, StreamExt};
use libipld::{cbor::DagCborCodec, prelude::Encode, Cid, IpldCodec};
use rand_core::RngCore;
use semver::Version;
Expand Down Expand Up @@ -199,6 +199,71 @@ impl PrivateFile {
})
}

/// Creates a file with provided content as a stream.
///
/// Depending on the BlockStore implementation this will
/// use essentially O(1) memory (roughly `2 * MAX_BLOCK_CONTENT_SIZE` bytes).
///
/// # Examples
///
/// ```
/// use std::rc::Rc;
/// use async_std::fs::File;
/// use chrono::Utc;
/// use rand::thread_rng;
/// use wnfs::{
/// private::{PrivateForest, PrivateRef},
/// MemoryBlockStore, Namefilter, PrivateFile,
/// MAX_BLOCK_SIZE
/// };
///
/// #[async_std::main]
/// async fn main() {
/// let disk_file = File::open("./test/fixtures/Clara Schumann, Scherzo no. 2, Op. 14.mp3")
/// .await
/// .unwrap();
///
/// let store = &mut MemoryBlockStore::default();
/// let rng = &mut thread_rng();
/// let forest = &mut Rc::new(PrivateForest::new());
///
/// let file = PrivateFile::with_content_streaming(
/// Namefilter::default(),
/// Utc::now(),
/// disk_file,
/// forest,
/// store,
/// rng,
/// )
/// .await
/// .unwrap();
///
/// println!("file = {:?}", file);
/// }
/// ```
pub async fn with_content_streaming(
parent_bare_name: Namefilter,
time: DateTime<Utc>,
content: impl AsyncRead + Unpin,
forest: &mut Rc<PrivateForest>,
store: &mut impl BlockStore,
rng: &mut impl RngCore,
) -> Result<Self> {
let header = PrivateNodeHeader::new(parent_bare_name, rng);
let content =
Self::prepare_content_streaming(&header.bare_name, content, forest, store, rng).await?;

Ok(Self {
header,
content: PrivateFileContent {
persisted_as: OnceCell::new(),
metadata: Metadata::new(time),
previous: BTreeSet::new(),
content,
},
})
}

/// Streams the content of a file as chunk of blocks.
///
/// # Examples
Expand Down Expand Up @@ -364,6 +429,60 @@ impl PrivateFile {
})
}

/// Drains the content streamed-in and puts it into the private forest
/// as blocks of encrypted data.
/// Returns an external `FileContent` that contains necessary information
/// to later retrieve the data.
pub(super) async fn prepare_content_streaming(
bare_name: &Namefilter,
mut content: impl AsyncRead + Unpin,
forest: &mut Rc<PrivateForest>,
store: &mut impl BlockStore,
rng: &mut impl RngCore,
) -> Result<FileContent> {
let key = SnapshotKey(AesKey::new(get_random_bytes(rng)));

let mut block_index = 0;

loop {
let mut current_block = vec![0u8; MAX_BLOCK_SIZE];
let nonce = SnapshotKey::generate_nonce(rng);
current_block[..NONCE_SIZE].copy_from_slice(&nonce);

// read up to MAX_BLOCK_CONTENT_SIZE content

let content_end = NONCE_SIZE + MAX_BLOCK_CONTENT_SIZE;
let (bytes_written, done) =
utils::read_fully(&mut content, &mut current_block[NONCE_SIZE..content_end])
.await?;

// truncate the vector to its actual length.
current_block.truncate(bytes_written + NONCE_SIZE);

let tag = key.encrypt_in_place(&nonce, &mut current_block[NONCE_SIZE..])?;
current_block.extend_from_slice(&tag);

let content_cid = store.put_block(current_block, IpldCodec::Raw).await?;

let label = Self::create_block_label(&key, block_index, bare_name);
forest
.put_encrypted(label, Some(content_cid), store)
.await?;

block_index += 1;

if done {
break;
}
}

Ok(FileContent::External {
key,
block_count: block_index,
block_content_size: MAX_BLOCK_CONTENT_SIZE,
})
}

/// Gets the upper bound of a file content size.
pub(crate) fn get_content_size_upper_bound(&self) -> usize {
match &self.content.content {
Expand Down Expand Up @@ -630,7 +749,9 @@ impl Id for PrivateFile {
#[cfg(test)]
mod tests {
use super::*;
use crate::utils::test_setup;
use crate::{utils::test_setup, MemoryBlockStore};
use async_std::fs::File;
use proptest::test_runner::{RngAlgorithm, TestRng};
use rand::Rng;

#[async_std::test]
Expand Down Expand Up @@ -668,6 +789,32 @@ mod tests {
content[2 * MAX_BLOCK_CONTENT_SIZE..4 * MAX_BLOCK_CONTENT_SIZE]
);
}

#[async_std::test]
async fn can_construct_file_from_stream() {
let disk_file = File::open("./test/fixtures/Clara Schumann, Scherzo no. 2, Op. 14.mp3")
.await
.unwrap();

let forest = &mut Rc::new(PrivateForest::new());
let store = &mut MemoryBlockStore::new();
let rng = &mut TestRng::deterministic_rng(RngAlgorithm::ChaCha);

let file = PrivateFile::with_content_streaming(
Namefilter::default(),
Utc::now(),
disk_file,
forest,
store,
rng,
)
.await
.unwrap();

assert!(
matches!(file.content.content, FileContent::External { block_count, .. } if block_count > 0)
);
}
}

#[cfg(test)]
Expand Down
77 changes: 70 additions & 7 deletions wnfs/src/private/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,10 @@ use super::{
use crate::{
dagcbor, utils, AesError, BlockStore, FsError, HashOutput, Id, NodeType, HASH_BYTE_SIZE,
};
use aes_gcm::{aead::Aead, Aes256Gcm, KeyInit, Nonce};
use aes_gcm::{
aead::{consts::U12, Aead},
AeadInPlace, Aes256Gcm, KeyInit, Nonce, Tag,
};
use aes_kw::KekAes256;
use anyhow::{bail, Result};
use async_recursion::async_recursion;
Expand Down Expand Up @@ -821,14 +824,32 @@ impl SnapshotKey {
/// assert_eq!(plaintext, &decrypted[..]);
/// ```
pub fn encrypt(&self, data: &[u8], rng: &mut impl RngCore) -> Result<Vec<u8>> {
let nonce_bytes = utils::get_random_bytes::<NONCE_SIZE>(rng);
let nonce = Nonce::from_slice(&nonce_bytes);
let nonce = Self::generate_nonce(rng);

let cipher_text = Aes256Gcm::new_from_slice(self.0.as_bytes())?
.encrypt(nonce, data)
.encrypt(&nonce, data)
.map_err(|e| AesError::UnableToEncrypt(format!("{e}")))?;

Ok([nonce_bytes.to_vec(), cipher_text].concat())
Ok([nonce.to_vec(), cipher_text].concat())
}

/// Generates a random 12-byte nonce for encryption.
pub(crate) fn generate_nonce(rng: &mut impl RngCore) -> Nonce<U12> {
let mut nonce = Nonce::default();
rng.fill_bytes(&mut nonce);
nonce
}

/// Encrypts the cleartext in the given buffer in-place, with given key.
///
/// The nonce is usually pre-pended to the ciphertext.
///
/// The authentication tag is required for decryption and usually appended to the ciphertext.
pub(crate) fn encrypt_in_place(&self, nonce: &Nonce<U12>, buffer: &mut [u8]) -> Result<Tag> {
let tag = Aes256Gcm::new_from_slice(self.0.as_bytes())?
.encrypt_in_place_detached(nonce, &[], buffer)
.map_err(|e| AesError::UnableToEncrypt(format!("{e}")))?;
Ok(tag)
}

/// Decrypts the given ciphertext using the key.
Expand Down Expand Up @@ -856,6 +877,23 @@ impl SnapshotKey {
.decrypt(Nonce::from_slice(nonce_bytes), data)
.map_err(|e| AesError::UnableToDecrypt(format!("{e}")))?)
}

/// Decrypts the ciphertext in the given buffer in-place, with given key.
///
/// Usually the nonce is stored as the cipher's prefix and the tag as
/// the cipher's suffix.
#[allow(dead_code)] // I figured it makes sense to have this for completeness sake.
pub(crate) fn decrypt_in_place(
&self,
nonce: &Nonce<U12>,
tag: &Tag,
buffer: &mut [u8],
) -> Result<()> {
Aes256Gcm::new_from_slice(self.0.as_bytes())?
.decrypt_in_place_detached(nonce, &[], buffer, tag)
.map_err(|e| AesError::UnableToDecrypt(format!("{e}")))?;
Ok(())
}
}

//--------------------------------------------------------------------------------------------------
Expand Down Expand Up @@ -912,7 +950,7 @@ mod proptests {
use super::*;
use proptest::{
prelude::any,
prop_assert_eq,
prop_assert_eq, prop_assert_ne,
test_runner::{RngAlgorithm, TestRng},
};
use test_strategy::proptest;
Expand All @@ -929,6 +967,31 @@ mod proptests {
let encrypted = key.encrypt(&data, rng).unwrap();
let decrypted = key.decrypt(&encrypted).unwrap();

prop_assert_eq!(decrypted, data);
if data.len() > 0 {
let cipher_part = &encrypted[NONCE_SIZE..NONCE_SIZE + data.len()];
prop_assert_ne!(cipher_part, &decrypted);
}
prop_assert_eq!(&decrypted, &data);
}

#[proptest(cases = 100)]
fn snapshot_key_can_encrypt_and_decrypt_data_in_place(
data: Vec<u8>,
key_bytes: [u8; KEY_BYTE_SIZE],
nonce: [u8; NONCE_SIZE],
) {
let mut buffer = data.clone();
let nonce = Nonce::from_slice(&nonce);
let key = SnapshotKey(AesKey::new(key_bytes));

let tag = key.encrypt_in_place(nonce, &mut buffer).unwrap();

if buffer.len() > 0 {
prop_assert_ne!(&buffer, &data);
}

key.decrypt_in_place(nonce, &tag, &mut buffer).unwrap();

prop_assert_eq!(&buffer, &data);
}
}
Binary file not shown.

0 comments on commit 1bfe89b

Please sign in to comment.