Skip to content

Commit

Permalink
Transaction queue & Batch builder: alternative architecture (0xPolygo…
Browse files Browse the repository at this point in the history
…nMiden#47)

* TransactionQueueTrait

* StateViewTrait

* txqueue: add_transaction

* impl TxQueueTrait

* batch builder

* fmt & clippy

* Rename TransactionQueue

* Rename BatchBuilder

* Rename StateView to TransactionVerifier

* Txqueue: remove get_transactions()

* rename trait methods

* StateView -> TV

* TxQueue: send_batches scaffold

* tx queue pushes batches

* BatchBuilder::add_batches

* Remove state_view mod

* block_builder mod

* BatchBuilder: change to add_tx_groups

* BlockBuilder::add_batches

* BatchBuilder uses BlockProducer

* SharedProvenTx type

* SharedRwVec type

* always send batch to block builder, even if empty

* batch builder: max_batches_per_block param

* sections

* TransactionBatch::updated_accounts

* confirm_at_most_one_tx_per_account

* fmt

* TxBatch: consumed_notes_script_roots

* docs

* transaction batch: created_notes_hashes

* transaction batch: consumed_notes_nullifiers

* make errors concrete

* Rename to build_batch

* build_batch

* Rename build_block

* refactor batch builder

* clippy

* dep order

* tx queue: rename param

* txqueue tests

* test_utils: DummyProvenTxGenerator

* test_build_batch_success

* test_tx_verify_failure

* test_build_batch_failure

* fmt

* batch builder: first test

* change `BlockBuilder::build_block()` signature

* SharedTxBatch type

* remove useless test type

* test 2

* test 3

* revert `build_block()` taking an `Option`

* fix comment
  • Loading branch information
plafer authored Oct 26, 2023
1 parent 0c050c7 commit e18b122
Show file tree
Hide file tree
Showing 8 changed files with 730 additions and 6 deletions.
3 changes: 2 additions & 1 deletion block-producer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,6 @@ winterfell = "0.6"

[dependencies]
async-trait = "0.1"
itertools = "0.11"
miden_objects = { workspace = true }
tokio = { version = "1.29", features = ["rt-multi-thread", "macros", "time"] }
tokio = { version = "1.29", features = ["rt-multi-thread", "macros", "sync", "time"] }
165 changes: 165 additions & 0 deletions block-producer/src/batch_builder/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,165 @@
use std::{cmp::min, fmt::Debug, sync::Arc, time::Duration};

use async_trait::async_trait;
use itertools::Itertools;
use miden_objects::{accounts::AccountId, Digest};
use tokio::{sync::RwLock, time};

use crate::{block_builder::BlockBuilder, SharedProvenTx, SharedRwVec, SharedTxBatch};

#[cfg(test)]
mod tests;

// TRANSACTION BATCH
// ================================================================================================

/// A batch of transactions that share a common proof. For any given account, at most 1 transaction
/// in the batch must be addressing that account.
///
/// Note: Until recursive proofs are available in the Miden VM, we don't include the common proof.
pub struct TransactionBatch {
txs: Vec<SharedProvenTx>,
}

impl TransactionBatch {
pub fn new(txs: Vec<SharedProvenTx>) -> Self {
Self { txs }
}

/// Returns an iterator over account ids that were modified in the transaction batch, and their
/// corresponding new hash
pub fn updated_accounts(&self) -> impl Iterator<Item = (AccountId, Digest)> + '_ {
self.txs.iter().map(|tx| (tx.account_id(), tx.final_account_hash()))
}

/// Returns the script root of all consumed notes
pub fn consumed_notes_script_roots(&self) -> impl Iterator<Item = Digest> + '_ {
let mut script_roots: Vec<Digest> = self
.txs
.iter()
.flat_map(|tx| tx.consumed_notes())
.map(|consumed_note| consumed_note.script_root())
.collect();

script_roots.sort();

// Removes duplicates in consecutive items
script_roots.into_iter().dedup()
}

/// Returns the nullifier of all consumed notes
pub fn consumed_notes_nullifiers(&self) -> impl Iterator<Item = Digest> + '_ {
self.txs
.iter()
.flat_map(|tx| tx.consumed_notes())
.map(|consumed_note| consumed_note.nullifier())
}

/// Returns the hash of created notes
pub fn created_notes_hashes(&self) -> impl Iterator<Item = Digest> + '_ {
self.txs
.iter()
.flat_map(|tx| tx.created_notes())
.map(|note_envelope| note_envelope.note_hash())
}
}

// BATCH BUILDER
// ================================================================================================

#[derive(Debug)]
pub enum BuildBatchError {
Dummy,
}

#[async_trait]
pub trait BatchBuilder: Send + Sync + 'static {
async fn build_batch(
&self,
txs: Vec<SharedProvenTx>,
) -> Result<(), BuildBatchError>;
}

pub struct DefaultBatchBuilderOptions {
/// The frequency at which blocks are created
pub block_frequency: Duration,

/// Maximum number of batches in any given block
pub max_batches_per_block: usize,
}

pub struct DefaultBatchBuilder<BB>
where
BB: BlockBuilder,
{
/// Batches ready to be included in a block
ready_batches: SharedRwVec<SharedTxBatch>,

block_builder: Arc<BB>,

options: DefaultBatchBuilderOptions,
}

impl<BB> DefaultBatchBuilder<BB>
where
BB: BlockBuilder,
{
pub fn new(
block_builder: Arc<BB>,
options: DefaultBatchBuilderOptions,
) -> Self {
Self {
ready_batches: Arc::new(RwLock::new(Vec::new())),
block_builder,
options,
}
}

pub async fn run(self) {
let mut interval = time::interval(self.options.block_frequency);

loop {
interval.tick().await;
self.try_build_block().await;
}
}

/// Note that we call `build_block()` regardless of whether the `ready_batches` queue is empty.
/// A call to an empty `build_block()` indicates that an empty block should be created.
async fn try_build_block(&self) {
let mut batches_in_block: Vec<SharedTxBatch> = {
let mut locked_ready_batches = self.ready_batches.write().await;

let num_batches_in_block =
min(self.options.max_batches_per_block, locked_ready_batches.len());

locked_ready_batches.drain(..num_batches_in_block).collect()
};

match self.block_builder.build_block(batches_in_block.clone()).await {
Ok(_) => {
// block successfully built, do nothing
},
Err(_) => {
// Block building failed; add back the batches at the end of the queue
self.ready_batches.write().await.append(&mut batches_in_block);
},
}
}
}

#[async_trait]
impl<BB> BatchBuilder for DefaultBatchBuilder<BB>
where
BB: BlockBuilder,
{
async fn build_batch(
&self,
txs: Vec<SharedProvenTx>,
) -> Result<(), BuildBatchError> {
let batch = Arc::new(TransactionBatch::new(txs));
self.ready_batches.write().await.push(batch);

Ok(())
}
}
171 changes: 171 additions & 0 deletions block-producer/src/batch_builder/tests/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,171 @@
use super::*;
use crate::{block_builder::BuildBlockError, test_utils::DummyProvenTxGenerator, SharedTxBatch};

// STRUCTS
// ================================================================================================

#[derive(Default)]
struct BlockBuilderSuccess {
batch_groups: SharedRwVec<Vec<SharedTxBatch>>,
num_empty_batches_received: Arc<RwLock<usize>>,
}

#[async_trait]
impl BlockBuilder for BlockBuilderSuccess {
async fn build_block(
&self,
batches: Vec<SharedTxBatch>,
) -> Result<(), BuildBlockError> {
if batches.is_empty() {
*self.num_empty_batches_received.write().await += 1;
} else {
self.batch_groups.write().await.push(batches);
}

Ok(())
}
}

#[derive(Default)]
struct BlockBuilderFailure;

#[async_trait]
impl BlockBuilder for BlockBuilderFailure {
async fn build_block(
&self,
_batches: Vec<SharedTxBatch>,
) -> Result<(), BuildBlockError> {
Err(BuildBlockError::Dummy)
}
}

// TESTS
// ================================================================================================

/// Tests that the number of batches in a block doesn't exceed `max_batches_per_block`
#[tokio::test]
async fn test_block_size_doesnt_exceed_limit() {
let block_frequency = Duration::from_millis(20);
let max_batches_per_block = 2;

let block_builder = Arc::new(BlockBuilderSuccess::default());

let batch_builder = DefaultBatchBuilder::new(
block_builder.clone(),
DefaultBatchBuilderOptions {
block_frequency,
max_batches_per_block,
},
);

// Add 3 batches in internal queue (remember: 2 batches/block)
{
let tx_gen = DummyProvenTxGenerator::new();

let mut batch_group = vec![
dummy_tx_batch(&tx_gen, 2),
dummy_tx_batch(&tx_gen, 2),
dummy_tx_batch(&tx_gen, 2),
];

batch_builder.ready_batches.write().await.append(&mut batch_group);
}

// start batch builder
tokio::spawn(batch_builder.run());

// Wait for 2 blocks to be produced
time::sleep(block_frequency * 3).await;

// Ensure the block builder received 2 batches of the expected size
{
let batch_groups = block_builder.batch_groups.read().await;

assert_eq!(batch_groups.len(), 2);
assert_eq!(batch_groups[0].len(), max_batches_per_block);
assert_eq!(batch_groups[1].len(), 1);
}
}

/// Tests that `BlockBuilder::build_block()` is still called when there are no transactions
#[tokio::test]
async fn test_build_block_called_when_no_batches() {
let block_frequency = Duration::from_millis(20);
let max_batches_per_block = 2;

let block_builder = Arc::new(BlockBuilderSuccess::default());

let batch_builder = DefaultBatchBuilder::new(
block_builder.clone(),
DefaultBatchBuilderOptions {
block_frequency,
max_batches_per_block,
},
);

// start batch builder
tokio::spawn(batch_builder.run());

// Wait for at least 1 block to be produced
time::sleep(block_frequency * 2).await;

// Ensure the block builder received at least 1 empty batch Note: we check `> 0` instead of an
// exact number to make the test flaky in case timings change in the implementation
assert!(*block_builder.num_empty_batches_received.read().await > 0);
}

/// Tests that if `BlockBuilder::build_block()` fails, then batches are added back on the queue
#[tokio::test]
async fn test_batches_added_back_to_queue_on_block_build_failure() {
let block_frequency = Duration::from_millis(20);
let max_batches_per_block = 2;

let block_builder = Arc::new(BlockBuilderFailure::default());

let batch_builder = DefaultBatchBuilder::new(
block_builder.clone(),
DefaultBatchBuilderOptions {
block_frequency,
max_batches_per_block,
},
);

let internal_ready_batches = batch_builder.ready_batches.clone();

// Add 3 batches in internal queue
{
let tx_gen = DummyProvenTxGenerator::new();

let mut batch_group = vec![
dummy_tx_batch(&tx_gen, 2),
dummy_tx_batch(&tx_gen, 2),
dummy_tx_batch(&tx_gen, 2),
];

batch_builder.ready_batches.write().await.append(&mut batch_group);
}

// start batch builder
tokio::spawn(batch_builder.run());

// Wait for 2 blocks to failed to be produced
time::sleep(block_frequency * 2 + (block_frequency / 2)).await;

// Ensure the transaction batches are all still on the queue
assert_eq!(internal_ready_batches.read().await.len(), 3);
}

// HELPERS
// ================================================================================================

fn dummy_tx_batch(
tx_gen: &DummyProvenTxGenerator,
num_txs_in_batch: usize,
) -> SharedTxBatch {
let txs: Vec<_> = (0..num_txs_in_batch)
.into_iter()
.map(|_| Arc::new(tx_gen.dummy_proven_tx()))
.collect();

Arc::new(TransactionBatch::new(txs))
}
21 changes: 21 additions & 0 deletions block-producer/src/block_builder/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
use async_trait::async_trait;

use crate::SharedTxBatch;

#[derive(Debug)]
pub enum BuildBlockError {
Dummy,
}

#[async_trait]
pub trait BlockBuilder: Send + Sync + 'static {
/// Receive batches to be included in a block. An empty vector indicates that no batches were
/// ready, and that an empty block should be created.
///
/// The `BlockBuilder` relies on `build_block()` to be called as a precondition to creating a
/// block. In other words, if `build_block()` is never called, then no blocks are produced.
async fn build_block(
&self,
batches: Vec<SharedTxBatch>,
) -> Result<(), BuildBlockError>;
}
15 changes: 15 additions & 0 deletions block-producer/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,2 +1,17 @@
use std::sync::Arc;

use batch_builder::TransactionBatch;
use miden_objects::transaction::ProvenTransaction;
use tokio::sync::RwLock;

#[cfg(test)]
pub mod test_utils;

pub mod batch_builder;
pub mod block_builder;
pub mod txqueue;

/// A proven transaction that can be shared across threads
pub(crate) type SharedProvenTx = Arc<ProvenTransaction>;
pub(crate) type SharedTxBatch = Arc<TransactionBatch>;
pub(crate) type SharedRwVec<T> = Arc<RwLock<Vec<T>>>;
Loading

0 comments on commit e18b122

Please sign in to comment.