Skip to content
This repository was archived by the owner on Jan 22, 2025. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 13 additions & 7 deletions src/accountant.rs
Original file line number Diff line number Diff line change
Expand Up @@ -218,13 +218,18 @@ impl Accountant {
(trs, rest)
}

pub fn process_verified_events(&self, events: Vec<Event>) -> Result<()> {
pub fn process_verified_events(&self, events: Vec<Event>) -> Vec<Result<Event>> {
let (trs, rest) = Self::partition_events(events);
self.process_verified_transactions(trs);
let mut results: Vec<_> = self.process_verified_transactions(trs)
.into_iter()
.map(|x| x.map(Event::Transaction))
.collect();

for event in rest {
self.process_verified_event(&event)?;
results.push(self.process_verified_event(event));
}
Ok(())

results
}

/// Process a Witness Signature that has already been verified.
Expand Down Expand Up @@ -278,12 +283,13 @@ impl Accountant {
}

/// Process an Transaction or Witness that has already been verified.
pub fn process_verified_event(&self, event: &Event) -> Result<()> {
match *event {
pub fn process_verified_event(&self, event: Event) -> Result<Event> {
match event {
Event::Transaction(ref tr) => self.process_verified_transaction(tr),
Event::Signature { from, tx_sig, .. } => self.process_verified_sig(from, tx_sig),
Event::Timestamp { from, dt, .. } => self.process_verified_timestamp(from, dt),
}
}?;
Ok(event)
}

/// Create, sign, and process a Transaction from `keypair` to `to` of
Expand Down
85 changes: 44 additions & 41 deletions src/accountant_skel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -308,37 +308,30 @@ impl AccountantSkel {
/// Split Request list into verified transactions and the rest
fn partition_requests(
req_vers: Vec<(Request, SocketAddr, u8)>,
) -> (Vec<Transaction>, Vec<(Request, SocketAddr)>) {
let mut trs = vec![];
) -> (Vec<Event>, Vec<(Request, SocketAddr)>) {
let mut events = vec![];
let mut reqs = vec![];
for (msg, rsp_addr, verify) in req_vers {
match msg {
Request::Transaction(tr) => {
if verify != 0 {
trs.push(tr);
events.push(Event::Transaction(tr));
}
}
_ => reqs.push((msg, rsp_addr)),
}
}
(trs, reqs)
(events, reqs)
}

fn process_packets(
&self,
req_vers: Vec<(Request, SocketAddr, u8)>,
) -> Result<Vec<(Response, SocketAddr)>> {
debug!("partitioning");
let (trs, reqs) = Self::partition_requests(req_vers);
debug!("trs: {} reqs: {}", trs.len(), reqs.len());

// Process the transactions in parallel and then log the successful ones.
for result in self.acc.lock().unwrap().process_verified_transactions(trs) {
if let Ok(tr) = result {
/// Process the transactions in parallel and then log the successful ones.
fn process_events(&self, events: Vec<Event>) -> Result<()> {
for result in self.acc.lock().unwrap().process_verified_events(events) {
if let Ok(event) = result {
self.historian_input
.lock()
.unwrap()
.send(Signal::Event(Event::Transaction(tr)))?;
.send(Signal::Event(event))?;
}
}

Expand All @@ -347,17 +340,15 @@ impl AccountantSkel {
// Let validators know they should not attempt to process additional
// transactions in parallel.
self.historian_input.lock().unwrap().send(Signal::Tick)?;

debug!("after historian_input");

// Process the remaining requests serially.
let rsps = reqs.into_iter()
.filter_map(|(req, rsp_addr)| self.process_request(req, rsp_addr))
.collect();

debug!("returning rsps");
Ok(())
}

Ok(rsps)
fn process_requests(&self, reqs: Vec<(Request, SocketAddr)>) -> Vec<(Response, SocketAddr)> {
reqs.into_iter()
.filter_map(|(req, rsp_addr)| self.process_request(req, rsp_addr))
.collect()
}

fn serialize_response(
Expand Down Expand Up @@ -409,9 +400,19 @@ impl AccountantSkel {
v
})
.collect();
debug!("process_packets");
let rsps = obj.process_packets(req_vers)?;
debug!("done process_packets");

debug!("partitioning");
let (events, reqs) = Self::partition_requests(req_vers);
debug!("events: {} reqs: {}", events.len(), reqs.len());

debug!("process_events");
obj.process_events(events)?;
debug!("done process_events");

debug!("process_requests");
let rsps = obj.process_requests(reqs);
debug!("done process_requests");

let blobs = Self::serialize_responses(rsps, blob_recycler)?;
if !blobs.is_empty() {
info!("process: sending blobs: {}", blobs.len());
Expand All @@ -436,13 +437,12 @@ impl AccountantSkel {
for msgs in &blobs {
let blob = msgs.read().unwrap();
let entries: Vec<Entry> = deserialize(&blob.data()[..blob.meta.size]).unwrap();
let acc = obj.acc.lock().unwrap();
for entry in entries {
obj.acc.lock().unwrap().register_entry_id(&entry.id);

obj.acc
.lock()
.unwrap()
.process_verified_events(entry.events)?;
acc.register_entry_id(&entry.id);
for result in acc.process_verified_events(entry.events) {
result?;
}
}
//TODO respond back to leader with hash of the state
}
Expand Down Expand Up @@ -732,7 +732,7 @@ mod tests {
use signature::{KeyPair, KeyPairUtil};
use std::collections::VecDeque;
use std::io::sink;
use std::net::{SocketAddr, UdpSocket};
use std::net::UdpSocket;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::mpsc::channel;
use std::sync::mpsc::sync_channel;
Expand Down Expand Up @@ -775,21 +775,20 @@ mod tests {
// Entry OR if the verifier tries to parallelize across multiple Entries.
let mint = Mint::new(2);
let acc = Accountant::new(&mint);
let rsp_addr: SocketAddr = "0.0.0.0:0".parse().expect("socket address");
let (input, event_receiver) = sync_channel(10);
let historian = Historian::new(event_receiver, &mint.last_id(), None);
let skel = AccountantSkel::new(acc, input, historian);

// Process a batch that includes a transaction that receives two tokens.
let alice = KeyPair::new();
let tr = Transaction::new(&mint.keypair(), alice.pubkey(), 2, mint.last_id());
let req_vers = vec![(Request::Transaction(tr), rsp_addr, 1_u8)];
assert!(skel.process_packets(req_vers).is_ok());
let events = vec![Event::Transaction(tr)];
assert!(skel.process_events(events).is_ok());

// Process a second batch that spends one of those tokens.
let tr = Transaction::new(&alice, mint.pubkey(), 1, mint.last_id());
let req_vers = vec![(Request::Transaction(tr), rsp_addr, 1_u8)];
assert!(skel.process_packets(req_vers).is_ok());
let events = vec![Event::Transaction(tr)];
assert!(skel.process_events(events).is_ok());

// Collect the ledger and feed it to a new accountant.
skel.historian_input
Expand All @@ -805,7 +804,11 @@ mod tests {
// the account balance below zero before the credit is added.
let acc = Accountant::new(&mint);
for entry in entries {
acc.process_verified_events(entry.events).unwrap();
assert!(
acc.process_verified_events(entry.events)
.into_iter()
.all(|x| x.is_ok())
);
}
assert_eq!(acc.get_balance(&alice.pubkey()), Some(1));
}
Expand Down Expand Up @@ -1099,7 +1102,7 @@ mod bench {
let skel = AccountantSkel::new(acc, input, historian);

let now = Instant::now();
assert!(skel.process_packets(req_vers).is_ok());
assert!(skel.process_events(req_vers).is_ok());
let duration = now.elapsed();
let sec = duration.as_secs() as f64 + duration.subsec_nanos() as f64 / 1_000_000_000.0;
let tps = txs as f64 / sec;
Expand Down
8 changes: 7 additions & 1 deletion src/bin/testnode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,13 @@ fn main() {
let mut last_id = entry1.id;
for entry in entries {
last_id = entry.id;
acc.process_verified_events(entry.events).unwrap();
let results = acc.process_verified_events(entry.events);
for result in results {
if let Err(e) = result {
eprintln!("failed to process event {:?}", e);
exit(1);
}
}
acc.register_entry_id(&last_id);
}

Expand Down