Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implemented future notes support in block-producer #390

Merged
merged 18 commits into from
Jul 1, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 36 additions & 16 deletions crates/block-producer/src/batch_builder/batch.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
use std::collections::BTreeSet;

use miden_objects::{
accounts::AccountId,
batches::BatchNoteTree,
block::BlockAccountUpdate,
crypto::hash::blake::{Blake3Digest, Blake3_256},
notes::Nullifier,
notes::{NoteId, Nullifier},
transaction::{OutputNote, TransactionId, TxAccountUpdate},
Digest, MAX_NOTES_PER_BATCH,
};
Expand All @@ -24,6 +26,7 @@ pub type BatchId = Blake3Digest<32>;
pub struct TransactionBatch {
id: BatchId,
updated_accounts: Vec<(TransactionId, TxAccountUpdate)>,
future_input_notes: BTreeSet<NoteId>,
produced_nullifiers: Vec<Nullifier>,
created_notes_smt: BatchNoteTree,
created_notes: Vec<OutputNote>,
bobbinth marked this conversation as resolved.
Show resolved Hide resolved
Expand All @@ -44,21 +47,32 @@ impl TransactionBatch {
pub fn new(txs: Vec<ProvenTransaction>) -> Result<Self, BuildBatchError> {
let id = Self::compute_id(&txs);

// TODO: we need to handle a possibility that a batch contains multiple transactions against
// the same account (e.g., transaction `x` takes account from state `A` to `B` and
// transaction `y` takes account from state `B` to `C`). These will need to be merged
// into a single "update" `A` to `C`.
let updated_accounts =
txs.iter().map(|tx| (tx.id(), tx.account_update().clone())).collect();

let produced_nullifiers = txs
.iter()
.flat_map(|tx| tx.input_notes().iter())
.map(|note| note.nullifier())
.collect();

let created_notes: Vec<_> =
txs.iter().flat_map(|tx| tx.output_notes().iter()).cloned().collect();
let mut updated_accounts = vec![];
let mut produced_nullifiers = vec![];
let mut future_input_notes = BTreeSet::new();
let mut created_notes = vec![];
for tx in &txs {
// TODO: we need to handle a possibility that a batch contains multiple transactions against
// the same account (e.g., transaction `x` takes account from state `A` to `B` and
// transaction `y` takes account from state `B` to `C`). These will need to be merged
// into a single "update" `A` to `C`.
updated_accounts.push((tx.id(), tx.account_update().clone()));

for note in tx.input_notes() {
produced_nullifiers.push(note.nullifier());
if let Some(note_id) = note.note_id() {
future_input_notes.insert(note_id);
}
}

// Populate batch created notes, filtering out notes consumed in the same batch
created_notes.extend(
tx.output_notes()
.iter()
.filter(|&note| !future_input_notes.contains(&note.id()))
.cloned(),
);
bobbinth marked this conversation as resolved.
Show resolved Hide resolved
}

if created_notes.len() > MAX_NOTES_PER_BATCH {
return Err(BuildBatchError::TooManyNotesCreated(created_notes.len(), txs));
Expand All @@ -73,6 +87,7 @@ impl TransactionBatch {
Ok(Self {
id,
updated_accounts,
future_input_notes,
produced_nullifiers,
created_notes_smt,
created_notes,
Expand Down Expand Up @@ -108,6 +123,11 @@ impl TransactionBatch {
})
}

/// Returns future input notes set consumed by the transactions in this batch.
pub fn future_input_notes(&self) -> &BTreeSet<NoteId> {
&self.future_input_notes
}

/// Returns an iterator over produced nullifiers for all consumed notes.
pub fn produced_nullifiers(&self) -> impl Iterator<Item = Nullifier> + '_ {
self.produced_nullifiers.iter().cloned()
Expand Down
65 changes: 55 additions & 10 deletions crates/block-producer/src/batch_builder/mod.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
use std::{cmp::min, sync::Arc, time::Duration};
use std::{cmp::min, collections::BTreeSet, sync::Arc, time::Duration};

use async_trait::async_trait;
use tokio::{sync::RwLock, time};
use miden_objects::{
notes::NoteId,
transaction::{InputNoteCommitment, OutputNote},
};
use tokio::time;
use tracing::{debug, info, instrument, Span};

use crate::{block_builder::BlockBuilder, ProvenTransaction, SharedRwVec, COMPONENT};
Expand All @@ -13,7 +17,7 @@ pub mod batch;
pub use batch::TransactionBatch;
use miden_node_utils::formatting::{format_array, format_blake3_digest};

use crate::errors::BuildBatchError;
use crate::{errors::BuildBatchError, store::Store};

// BATCH BUILDER
// ================================================================================================
Expand Down Expand Up @@ -45,31 +49,35 @@ pub struct DefaultBatchBuilderOptions {
pub max_batches_per_block: usize,
}

pub struct DefaultBatchBuilder<BB> {
/// Batches ready to be included in a block
ready_batches: SharedRwVec<TransactionBatch>,
pub struct DefaultBatchBuilder<S, BB> {
store: Arc<S>,

block_builder: Arc<BB>,

options: DefaultBatchBuilderOptions,

/// Batches ready to be included in a block
ready_batches: SharedRwVec<TransactionBatch>,
}

// FIXME: remove the allow when the upstream clippy issue is fixed:
// https://github.com/rust-lang/rust-clippy/issues/12281
#[allow(clippy::blocks_in_conditions)]
impl<BB> DefaultBatchBuilder<BB>
impl<S, BB> DefaultBatchBuilder<S, BB>
where
S: Store,
BB: BlockBuilder,
{
// CONSTRUCTOR
// --------------------------------------------------------------------------------------------
/// Returns an new [BatchBuilder] instantiated with the provided [BlockBuilder] and the
/// specified options.
pub fn new(block_builder: Arc<BB>, options: DefaultBatchBuilderOptions) -> Self {
pub fn new(store: Arc<S>, block_builder: Arc<BB>, options: DefaultBatchBuilderOptions) -> Self {
Self {
ready_batches: Arc::new(RwLock::new(Vec::new())),
store,
block_builder,
options,
ready_batches: Default::default(),
}
}

Expand Down Expand Up @@ -112,14 +120,39 @@ where
},
}
}

async fn find_dangling_notes(&self, txs: &[ProvenTransaction]) -> Vec<NoteId> {
// TODO: We can optimize this by looking at the notes created in the previous batches
let note_created: BTreeSet<NoteId> = txs
.iter()
.flat_map(|tx| tx.output_notes().iter().map(OutputNote::id))
.chain(
self.ready_batches
.read()
.await
.iter()
.flat_map(|batch| batch.created_notes().iter().map(OutputNote::id)),
)
.collect();

txs.iter()
.flat_map(|tx| {
tx.input_notes()
.iter()
.filter_map(InputNoteCommitment::note_id)
.filter(|note_id| !note_created.contains(note_id))
})
.collect()
}
}

// FIXME: remove the allow when the upstream clippy issue is fixed:
// https://github.com/rust-lang/rust-clippy/issues/12281
#[allow(clippy::blocks_in_conditions)]
#[async_trait]
impl<BB> BatchBuilder for DefaultBatchBuilder<BB>
impl<S, BB> BatchBuilder for DefaultBatchBuilder<S, BB>
where
S: Store,
BB: BlockBuilder,
{
#[instrument(target = "miden-block-producer", skip_all, err, fields(batch_id))]
Expand All @@ -129,6 +162,18 @@ where
info!(target: COMPONENT, num_txs, "Building a transaction batch");
debug!(target: COMPONENT, txs = %format_array(txs.iter().map(|tx| tx.id().to_hex())));

let dangling_notes = self.find_dangling_notes(&txs).await;
if !dangling_notes.is_empty() {
let missed_notes = match self.store.get_missed_notes(&dangling_notes).await {
Ok(notes) => notes,
Err(err) => return Err(BuildBatchError::GetMissedNotesRequestError(err, txs)),
};

if !missed_notes.is_empty() {
return Err(BuildBatchError::FutureNotesNotFound(missed_notes, txs));
}
}

bobbinth marked this conversation as resolved.
Show resolved Hide resolved
let batch = TransactionBatch::new(txs)?;

info!(target: COMPONENT, "Transaction batch built");
Expand Down
15 changes: 14 additions & 1 deletion crates/block-producer/src/batch_builder/tests/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,12 @@
use std::iter;

use tokio::sync::RwLock;

use super::*;
use crate::{errors::BuildBlockError, test_utils::MockProvenTxBuilder};
use crate::{
errors::BuildBlockError,
test_utils::{MockProvenTxBuilder, MockStoreSuccessBuilder},
};

// STRUCTS
// ================================================================================================
Expand Down Expand Up @@ -43,9 +50,11 @@ async fn test_block_size_doesnt_exceed_limit() {
let block_frequency = Duration::from_millis(20);
let max_batches_per_block = 2;

let store = Arc::new(MockStoreSuccessBuilder::from_accounts(iter::empty()).build());
let block_builder = Arc::new(BlockBuilderSuccess::default());

let batch_builder = Arc::new(DefaultBatchBuilder::new(
store,
block_builder.clone(),
DefaultBatchBuilderOptions { block_frequency, max_batches_per_block },
));
Expand Down Expand Up @@ -81,9 +90,11 @@ async fn test_build_block_called_when_no_batches() {
let block_frequency = Duration::from_millis(20);
let max_batches_per_block = 2;

let store = Arc::new(MockStoreSuccessBuilder::from_accounts(iter::empty()).build());
let block_builder = Arc::new(BlockBuilderSuccess::default());

let batch_builder = Arc::new(DefaultBatchBuilder::new(
store,
block_builder.clone(),
DefaultBatchBuilderOptions { block_frequency, max_batches_per_block },
));
Expand All @@ -106,9 +117,11 @@ async fn test_batches_added_back_to_queue_on_block_build_failure() {
let block_frequency = Duration::from_millis(20);
let max_batches_per_block = 2;

let store = Arc::new(MockStoreSuccessBuilder::from_accounts(iter::empty()).build());
let block_builder = Arc::new(BlockBuilderFailure);

let batch_builder = Arc::new(DefaultBatchBuilder::new(
store,
block_builder.clone(),
DefaultBatchBuilderOptions { block_frequency, max_batches_per_block },
));
Expand Down
17 changes: 15 additions & 2 deletions crates/block-producer/src/block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,11 @@ use miden_node_proto::{
};
use miden_objects::{
accounts::AccountId,
crypto::merkle::{MerklePath, MmrPeaks, SmtProof},
notes::Nullifier,
crypto::{
hash::rpo::RpoDigest,
merkle::{MerklePath, MmrPeaks, SmtProof},
},
notes::{NoteId, Nullifier},
BlockHeader, Digest,
};

Expand All @@ -31,6 +34,9 @@ pub struct BlockInputs {

/// The requested nullifiers and their authentication paths
pub nullifiers: BTreeMap<Nullifier, SmtProof>,

/// List of notes that were not found in the store
pub missed_notes: Vec<NoteId>,
bobbinth marked this conversation as resolved.
Show resolved Hide resolved
}

#[derive(Clone, Debug, Default)]
Expand Down Expand Up @@ -87,11 +93,18 @@ impl TryFrom<GetBlockInputsResponse> for BlockInputs {
})
.collect::<Result<BTreeMap<_, _>, ConversionError>>()?;

let missed_notes = get_block_inputs
bobbinth marked this conversation as resolved.
Show resolved Hide resolved
.missed_notes
.into_iter()
.map(|digest| Ok(RpoDigest::try_from(digest)?.into()))
.collect::<Result<Vec<_>, ConversionError>>()?;

Ok(Self {
block_header,
chain_peaks,
accounts,
nullifiers,
missed_notes,
})
}
}
20 changes: 18 additions & 2 deletions crates/block-producer/src/block_builder/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::sync::Arc;
use std::{collections::BTreeSet, sync::Arc};

use async_trait::async_trait;
use miden_node_utils::formatting::{format_array, format_blake3_digest};
Expand Down Expand Up @@ -76,19 +76,35 @@ where
let updated_accounts: Vec<_> =
batches.iter().flat_map(TransactionBatch::updated_accounts).collect();

let created_notes = batches.iter().map(TransactionBatch::created_notes).cloned().collect();
let created_notes: Vec<_> =
batches.iter().map(TransactionBatch::created_notes).cloned().collect();

let produced_nullifiers: Vec<Nullifier> =
batches.iter().flat_map(TransactionBatch::produced_nullifiers).collect();

let created_notes_set: BTreeSet<_> = created_notes
.iter()
.flat_map(|batch| batch.iter().map(|note| note.id()))
.collect();

let dangling_notes = batches
.iter()
.flat_map(TransactionBatch::future_input_notes)
.filter(|&note_id| !created_notes_set.contains(note_id));

let block_inputs = self
.store
.get_block_inputs(
updated_accounts.iter().map(BlockAccountUpdate::account_id),
produced_nullifiers.iter(),
dangling_notes,
)
.await?;

if !block_inputs.missed_notes.is_empty() {
return Err(BuildBlockError::FutureNotesNotFound(block_inputs.missed_notes));
}

let block_header_witness = BlockWitness::new(block_inputs, batches)?;

let new_block_header = self.block_kernel.prove(block_header_witness)?;
Expand Down
Loading
Loading