Skip to content
This repository was archived by the owner on Jan 22, 2025. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 16 additions & 19 deletions src/accountant.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@ pub struct Accountant {
pub historian: Historian,
pub balances: HashMap<PublicKey, i64>,
pub first_id: Hash,
pub last_id: Hash,
pending: HashMap<Signature, Plan>,
time_sources: HashSet<PublicKey>,
last_time: DateTime<Utc>,
Expand All @@ -67,7 +66,6 @@ impl Accountant {
historian: hist,
balances: HashMap::new(),
first_id: start_hash,
last_id: start_hash,
pending: HashMap::new(),
time_sources: HashSet::new(),
last_time: Utc.timestamp(0, 0),
Expand All @@ -91,13 +89,6 @@ impl Accountant {
Self::new_from_entries(mint.create_entries(), ms_per_tick)
}

pub fn sync(self: &mut Self) -> Hash {
while let Ok(entry) = self.historian.receiver.try_recv() {
self.last_id = entry.id;
}
self.last_id
}

fn is_deposit(allow_deposits: bool, from: &PublicKey, plan: &Plan) -> bool {
if let Plan::Pay(ref payment) = *plan {
allow_deposits && *from == payment.to
Expand Down Expand Up @@ -210,8 +201,9 @@ impl Accountant {
n: i64,
keypair: &KeyPair,
to: PublicKey,
last_id: Hash,
) -> Result<Signature> {
let tr = Transaction::new(keypair, to, n, self.last_id);
let tr = Transaction::new(keypair, to, n, last_id);
let sig = tr.sig;
self.process_transaction(tr).map(|_| sig)
}
Expand All @@ -222,8 +214,9 @@ impl Accountant {
keypair: &KeyPair,
to: PublicKey,
dt: DateTime<Utc>,
last_id: Hash,
) -> Result<Signature> {
let tr = Transaction::new_on_date(keypair, to, dt, n, self.last_id);
let tr = Transaction::new_on_date(keypair, to, dt, n, last_id);
let sig = tr.sig;
self.process_transaction(tr).map(|_| sig)
}
Expand All @@ -244,10 +237,12 @@ mod tests {
let alice = Mint::new(10_000);
let bob_pubkey = KeyPair::new().pubkey();
let mut acc = Accountant::new(&alice, Some(2));
acc.transfer(1_000, &alice.keypair(), bob_pubkey).unwrap();
acc.transfer(1_000, &alice.keypair(), bob_pubkey, alice.seed())
.unwrap();
assert_eq!(acc.get_balance(&bob_pubkey).unwrap(), 1_000);

acc.transfer(500, &alice.keypair(), bob_pubkey).unwrap();
acc.transfer(500, &alice.keypair(), bob_pubkey, alice.seed())
.unwrap();
assert_eq!(acc.get_balance(&bob_pubkey).unwrap(), 1_500);

drop(acc.historian.sender);
Expand All @@ -262,9 +257,10 @@ mod tests {
let alice = Mint::new(11_000);
let mut acc = Accountant::new(&alice, Some(2));
let bob_pubkey = KeyPair::new().pubkey();
acc.transfer(1_000, &alice.keypair(), bob_pubkey).unwrap();
acc.transfer(1_000, &alice.keypair(), bob_pubkey, alice.seed())
.unwrap();
assert_eq!(
acc.transfer(10_001, &alice.keypair(), bob_pubkey),
acc.transfer(10_001, &alice.keypair(), bob_pubkey, alice.seed()),
Err(AccountingError::InsufficientFunds)
);

Expand Down Expand Up @@ -309,7 +305,8 @@ mod tests {
let mut acc = Accountant::new(&alice, Some(2));
let alice_keypair = alice.keypair();
let bob_pubkey = KeyPair::new().pubkey();
acc.transfer(500, &alice_keypair, bob_pubkey).unwrap();
acc.transfer(500, &alice_keypair, bob_pubkey, alice.seed())
.unwrap();
assert_eq!(acc.get_balance(&bob_pubkey).unwrap(), 500);

drop(acc.historian.sender);
Expand All @@ -326,7 +323,7 @@ mod tests {
let alice_keypair = alice.keypair();
let bob_pubkey = KeyPair::new().pubkey();
let dt = Utc::now();
acc.transfer_on_date(1, &alice_keypair, bob_pubkey, dt)
acc.transfer_on_date(1, &alice_keypair, bob_pubkey, dt, alice.seed())
.unwrap();

// Alice's balance will be zero because all funds are locked up.
Expand Down Expand Up @@ -355,7 +352,7 @@ mod tests {
acc.process_verified_timestamp(alice.pubkey(), dt).unwrap();

// It's now past now, so this transfer should be processed immediately.
acc.transfer_on_date(1, &alice_keypair, bob_pubkey, dt)
acc.transfer_on_date(1, &alice_keypair, bob_pubkey, dt, alice.seed())
.unwrap();

assert_eq!(acc.get_balance(&alice.pubkey()), Some(0));
Expand All @@ -369,7 +366,7 @@ mod tests {
let alice_keypair = alice.keypair();
let bob_pubkey = KeyPair::new().pubkey();
let dt = Utc::now();
let sig = acc.transfer_on_date(1, &alice_keypair, bob_pubkey, dt)
let sig = acc.transfer_on_date(1, &alice_keypair, bob_pubkey, dt, alice.seed())
.unwrap();

// Alice's balance will be zero because all funds are locked up.
Expand Down
34 changes: 30 additions & 4 deletions src/accountant_skel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,12 @@ use std::time::Duration;
use std::sync::mpsc::channel;
use std::thread::{spawn, JoinHandle};
use std::default::Default;
use serde_json;

pub struct AccountantSkel {
pub acc: Accountant,
pub last_id: Hash,
pub ledger: Vec<Entry>,
}

#[derive(Serialize, Deserialize, Debug)]
Expand All @@ -34,7 +37,21 @@ pub enum Response {

impl AccountantSkel {
pub fn new(acc: Accountant) -> Self {
AccountantSkel { acc }
let last_id = acc.first_id;
AccountantSkel {
acc,
last_id,
ledger: vec![],
}
}

pub fn sync(self: &mut Self) -> Hash {
while let Ok(entry) = self.acc.historian.receiver.try_recv() {
self.last_id = entry.id;
println!("{}", serde_json::to_string(&entry).unwrap());
self.ledger.push(entry);
}
self.last_id
}

pub fn process_request(self: &mut Self, msg: Request) -> Option<Response> {
Expand All @@ -49,11 +66,20 @@ impl AccountantSkel {
let val = self.acc.get_balance(&key);
Some(Response::Balance { key, val })
}
Request::GetEntries { .. } => Some(Response::Entries { entries: vec![] }),
Request::GetEntries { last_id } => {
self.sync();
let entries = self.ledger
.iter()
.skip_while(|x| x.id != last_id) // log(n) way to find Entry with id == last_id.
.skip(1) // Skip the entry with last_id.
.take(256) // TODO: Take while the serialized entries fit into a 64k UDP packet.
.cloned()
.collect();
Some(Response::Entries { entries })
}
Request::GetId { is_last } => Some(Response::Id {
id: if is_last {
self.acc.sync();
self.acc.last_id
self.sync()
} else {
self.acc.first_id
},
Expand Down
51 changes: 29 additions & 22 deletions src/accountant_stub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,13 @@ use accountant_skel::{Request, Response};
pub struct AccountantStub {
pub addr: String,
pub socket: UdpSocket,
pub last_id: Option<Hash>,
}

impl AccountantStub {
pub fn new(addr: &str, socket: UdpSocket) -> Self {
AccountantStub {
addr: addr.to_string(),
socket,
last_id: None,
}
}

Expand Down Expand Up @@ -75,38 +73,47 @@ impl AccountantStub {
self.get_id(true)
}

pub fn wait_on_signature(&mut self, wait_sig: &Signature) -> io::Result<()> {
let last_id = match self.last_id {
None => {
let first_id = self.get_id(false)?;
self.last_id = Some(first_id);
first_id
}
Some(last_id) => last_id,
};

pub fn check_on_signature(
&mut self,
wait_sig: &Signature,
last_id: &Hash,
) -> io::Result<(bool, Hash)> {
let mut last_id = *last_id;
let req = Request::GetEntries { last_id };
let data = serialize(&req).unwrap();
self.socket.send_to(&data, &self.addr).map(|_| ())?;

let mut buf = vec![0u8; 1024];
let mut buf = vec![0u8; 65_535];
self.socket.recv_from(&mut buf)?;
let resp = deserialize(&buf).expect("deserialize signature");
let mut found = false;
if let Response::Entries { entries } = resp {
for Entry { id, events, .. } in entries {
self.last_id = Some(id);
for event in events {
if let Some(sig) = event.get_signature() {
if sig == *wait_sig {
return Ok(());
last_id = id;
if !found {
for event in events {
if let Some(sig) = event.get_signature() {
if sig == *wait_sig {
found = true;
}
}
}
}
}
}

// TODO: Loop until we found it.
Ok(())
Ok((found, last_id))
}

pub fn wait_on_signature(&mut self, wait_sig: &Signature, last_id: &Hash) -> io::Result<Hash> {
let mut found = false;
let mut last_id = *last_id;
while !found {
let ret = self.check_on_signature(wait_sig, &last_id)?;
found = ret.0;
last_id = ret.1;
}
Ok(last_id)
}
}

Expand All @@ -126,7 +133,7 @@ mod tests {
let addr = "127.0.0.1:9000";
let send_addr = "127.0.0.1:9001";
let alice = Mint::new(10_000);
let acc = Accountant::new(&alice, None);
let acc = Accountant::new(&alice, Some(30));
let bob_pubkey = KeyPair::new().pubkey();
let exit = Arc::new(Mutex::new(false));
let acc = Arc::new(Mutex::new(AccountantSkel::new(acc)));
Expand All @@ -138,7 +145,7 @@ mod tests {
let last_id = acc.get_last_id().unwrap();
let sig = acc.transfer(500, &alice.keypair(), bob_pubkey, &last_id)
.unwrap();
acc.wait_on_signature(&sig).unwrap();
acc.wait_on_signature(&sig, &last_id).unwrap();
assert_eq!(acc.get_balance(&bob_pubkey).unwrap().unwrap(), 500);
*exit.lock().unwrap() = true;
for t in threads.iter() {
Expand Down
2 changes: 1 addition & 1 deletion src/bin/client-demo.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ fn main() {
acc.transfer_signed(tr).unwrap();
}
println!("Waiting for last transaction to be confirmed...",);
acc.wait_on_signature(&sig).unwrap();
acc.wait_on_signature(&sig, &last_id).unwrap();

let duration = now.elapsed();
let ns = duration.as_secs() * 1_000_000_000 + duration.subsec_nanos() as u64;
Expand Down
12 changes: 4 additions & 8 deletions src/historian.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use std::thread::{spawn, JoinHandle};
use std::collections::HashSet;
use std::sync::mpsc::{sync_channel, Receiver, SyncSender};
use std::time::Instant;
use hash::{hash, Hash};
use hash::Hash;
use entry::Entry;
use recorder::{ExitReason, Recorder, Signal};
use signature::Signature;
Expand Down Expand Up @@ -48,8 +48,7 @@ impl Historian {
return err;
}
if ms_per_tick.is_some() {
recorder.last_id = hash(&recorder.last_id);
recorder.num_hashes += 1;
recorder.hash();
}
}
})
Expand Down Expand Up @@ -127,12 +126,9 @@ mod tests {
hist.sender.send(Signal::Tick).unwrap();
drop(hist.sender);
let entries: Vec<Entry> = hist.receiver.iter().collect();
assert!(entries.len() > 1);

// Ensure one entry is sent back for each tick sent in.
assert_eq!(entries.len(), 1);

// Ensure the ID is not the seed, which indicates another Tick
// was recorded before the one we sent.
// Ensure the ID is not the seed.
assert_ne!(entries[0].id, zero);
}
}
37 changes: 20 additions & 17 deletions src/recorder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,9 @@
use std::sync::mpsc::{Receiver, SyncSender, TryRecvError};
use std::time::{Duration, Instant};
use std::mem;
use hash::Hash;
use hash::{hash, Hash};
use entry::{create_entry_mut, Entry};
use event::Event;
use serde_json;

pub enum Signal {
Tick,
Expand All @@ -25,31 +24,38 @@ pub enum ExitReason {
}

pub struct Recorder {
pub sender: SyncSender<Entry>,
pub receiver: Receiver<Signal>,
pub last_id: Hash,
pub events: Vec<Event>,
pub num_hashes: u64,
pub num_ticks: u64,
sender: SyncSender<Entry>,
receiver: Receiver<Signal>,
last_hash: Hash,
events: Vec<Event>,
num_hashes: u64,
num_ticks: u64,
}

impl Recorder {
pub fn new(receiver: Receiver<Signal>, sender: SyncSender<Entry>, start_hash: Hash) -> Self {
Recorder {
receiver,
sender,
last_id: start_hash,
last_hash: start_hash,
events: vec![],
num_hashes: 0,
num_ticks: 0,
}
}

pub fn record_entry(&mut self) -> Result<Entry, ExitReason> {
pub fn hash(&mut self) {
self.last_hash = hash(&self.last_hash);
self.num_hashes += 1;
}

pub fn record_entry(&mut self) -> Result<(), ExitReason> {
let events = mem::replace(&mut self.events, vec![]);
let entry = create_entry_mut(&mut self.last_id, &mut self.num_hashes, events);
println!("{}", serde_json::to_string(&entry).unwrap());
Ok(entry)
let entry = create_entry_mut(&mut self.last_hash, &mut self.num_hashes, events);
self.sender
.send(entry)
.or(Err(ExitReason::SendDisconnected))?;
Ok(())
}

pub fn process_events(
Expand All @@ -68,10 +74,7 @@ impl Recorder {
match self.receiver.try_recv() {
Ok(signal) => match signal {
Signal::Tick => {
let entry = self.record_entry()?;
self.sender
.send(entry)
.or(Err(ExitReason::SendDisconnected))?;
self.record_entry()?;
}
Signal::Event(event) => {
self.events.push(event);
Expand Down
Loading