Skip to content
This repository was archived by the owner on Jan 22, 2025. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
83 changes: 59 additions & 24 deletions src/accountant.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
//! event log to record transactions. Its users can deposit funds and
//! transfer funds to other users.

use log::{Event, PublicKey, Sha256Hash, Signature};
use log::{Entry, Event, PublicKey, Sha256Hash, Signature};
use historian::Historian;
use ring::signature::Ed25519KeyPair;
use std::sync::mpsc::{RecvError, SendError};
Expand All @@ -11,6 +11,7 @@ use std::collections::HashMap;
pub struct Accountant {
pub historian: Historian<u64>,
pub balances: HashMap<PublicKey, u64>,
pub signatures: HashMap<Signature, bool>,
pub end_hash: Sha256Hash,
}

Expand All @@ -20,13 +21,19 @@ impl Accountant {
Accountant {
historian: hist,
balances: HashMap::new(),
signatures: HashMap::new(),
end_hash: *start_hash,
}
}

pub fn process_event(self: &mut Self, event: &Event<u64>) {
match *event {
Event::Claim { key, data, .. } => {
Event::Claim { key, data, sig } => {
if self.signatures.contains_key(&sig) {
return;
}
self.signatures.insert(sig, true);

if self.balances.contains_key(&key) {
if let Some(x) = self.balances.get_mut(&key) {
*x += data;
Expand All @@ -35,7 +42,16 @@ impl Accountant {
self.balances.insert(key, data);
}
}
Event::Transaction { from, to, data, .. } => {
Event::Transaction {
from,
to,
data,
sig,
} => {
if self.signatures.contains_key(&sig) {
return;
}
self.signatures.insert(sig, true);
if let Some(x) = self.balances.get_mut(&from) {
*x -= data;
}
Expand All @@ -51,7 +67,7 @@ impl Accountant {
}
}

pub fn sync(self: &mut Self) {
pub fn sync(self: &mut Self) -> Vec<Entry<u64>> {
let mut entries = vec![];
while let Ok(entry) = self.historian.receiver.try_recv() {
entries.push(entry);
Expand All @@ -67,6 +83,8 @@ impl Accountant {
for e in &entries {
self.process_event(&e.event);
}

entries
}

pub fn deposit_signed(
Expand All @@ -83,11 +101,11 @@ impl Accountant {
self: &Self,
n: u64,
keypair: &Ed25519KeyPair,
) -> Result<(), SendError<Event<u64>>> {
) -> Result<Signature, SendError<Event<u64>>> {
use log::{get_pubkey, sign_serialized};
let key = get_pubkey(keypair);
let sig = sign_serialized(&n, keypair);
self.deposit_signed(key, n, sig)
self.deposit_signed(key, n, sig).map(|_| sig)
}

pub fn transfer_signed(
Expand Down Expand Up @@ -116,25 +134,41 @@ impl Accountant {
n: u64,
keypair: &Ed25519KeyPair,
to: PublicKey,
) -> Result<(), SendError<Event<u64>>> {
) -> Result<Signature, SendError<Event<u64>>> {
use log::{get_pubkey, sign_transaction_data};

let from = get_pubkey(keypair);
let sig = sign_transaction_data(&n, keypair, &to);
self.transfer_signed(from, to, n, sig)
self.transfer_signed(from, to, n, sig).map(|_| sig)
}

pub fn get_balance(self: &mut Self, pubkey: &PublicKey) -> Result<u64, RecvError> {
self.sync();
Ok(*self.balances.get(pubkey).unwrap_or(&0))
}

pub fn wait_on_signature(self: &mut Self, wait_sig: &Signature) {
use std::thread::sleep;
use std::time::Duration;
let mut entries = self.sync();
let mut found = false;
while !found {
found = entries.iter().any(|e| match e.event {
Event::Claim { sig, .. } => sig == *wait_sig,
Event::Transaction { sig, .. } => sig == *wait_sig,
_ => false,
});
if !found {
sleep(Duration::from_millis(30));
entries = self.sync();
}
}
}
}

#[cfg(test)]
mod tests {
use super::*;
use std::thread::sleep;
use std::time::Duration;
use log::{generate_keypair, get_pubkey};
use historian::ExitReason;

Expand All @@ -145,13 +179,13 @@ mod tests {
let alice_keypair = generate_keypair();
let bob_keypair = generate_keypair();
acc.deposit(10_000, &alice_keypair).unwrap();
acc.deposit(1_000, &bob_keypair).unwrap();
let sig = acc.deposit(1_000, &bob_keypair).unwrap();
acc.wait_on_signature(&sig);

sleep(Duration::from_millis(30));
let bob_pubkey = get_pubkey(&bob_keypair);
acc.transfer(500, &alice_keypair, bob_pubkey).unwrap();
let sig = acc.transfer(500, &alice_keypair, bob_pubkey).unwrap();
acc.wait_on_signature(&sig);

sleep(Duration::from_millis(30));
assert_eq!(acc.get_balance(&bob_pubkey).unwrap(), 1_500);

drop(acc.historian.sender);
Expand All @@ -163,18 +197,20 @@ mod tests {

#[test]
fn test_invalid_transfer() {
use std::thread::sleep;
use std::time::Duration;
let zero = Sha256Hash::default();
let mut acc = Accountant::new(&zero, Some(2));
let alice_keypair = generate_keypair();
let bob_keypair = generate_keypair();
acc.deposit(10_000, &alice_keypair).unwrap();
acc.deposit(1_000, &bob_keypair).unwrap();
let sig = acc.deposit(1_000, &bob_keypair).unwrap();
acc.wait_on_signature(&sig);

sleep(Duration::from_millis(30));
let bob_pubkey = get_pubkey(&bob_keypair);
acc.transfer(10_001, &alice_keypair, bob_pubkey).unwrap();

sleep(Duration::from_millis(30));

let alice_pubkey = get_pubkey(&alice_keypair);
assert_eq!(acc.get_balance(&alice_pubkey).unwrap(), 10_000);
assert_eq!(acc.get_balance(&bob_pubkey).unwrap(), 1_000);
Expand All @@ -192,10 +228,10 @@ mod tests {
let mut acc = Accountant::new(&zero, Some(2));
let keypair = generate_keypair();
acc.deposit(1, &keypair).unwrap();
acc.deposit(2, &keypair).unwrap();
let sig = acc.deposit(2, &keypair).unwrap();
acc.wait_on_signature(&sig);

let pubkey = get_pubkey(&keypair);
sleep(Duration::from_millis(30));
assert_eq!(acc.get_balance(&pubkey).unwrap(), 3);

drop(acc.historian.sender);
Expand All @@ -211,13 +247,12 @@ mod tests {
let mut acc = Accountant::new(&zero, Some(2));
let alice_keypair = generate_keypair();
let bob_keypair = generate_keypair();
acc.deposit(10_000, &alice_keypair).unwrap();
let sig = acc.deposit(10_000, &alice_keypair).unwrap();
acc.wait_on_signature(&sig);

sleep(Duration::from_millis(30));
let bob_pubkey = get_pubkey(&bob_keypair);
acc.transfer(500, &alice_keypair, bob_pubkey).unwrap();

sleep(Duration::from_millis(30));
let sig = acc.transfer(500, &alice_keypair, bob_pubkey).unwrap();
acc.wait_on_signature(&sig);
assert_eq!(acc.get_balance(&bob_pubkey).unwrap(), 500);

drop(acc.historian.sender);
Expand Down
22 changes: 14 additions & 8 deletions src/accountant_skel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,15 @@ pub enum Request {
GetBalance {
key: PublicKey,
},
Wait {
sig: Signature,
},
}

#[derive(Serialize, Deserialize, Debug)]
pub enum Response {
Balance { key: PublicKey, val: u64 },
Confirmed { sig: Signature },
}

impl AccountantSkel {
Expand All @@ -49,26 +53,28 @@ impl AccountantSkel {
let val = self.obj.get_balance(&key).unwrap();
Some(Response::Balance { key, val })
}
Request::Wait { sig } => {
self.obj.wait_on_signature(&sig);
Some(Response::Confirmed { sig })
}
}
}

/// TCP Server that forwards messages to Accountant methods.
/// UDP Server that forwards messages to Accountant methods.
pub fn serve(self: &mut Self, addr: &str) -> io::Result<()> {
use std::net::TcpListener;
use std::io::{Read, Write};
use std::net::UdpSocket;
use bincode::{deserialize, serialize};
let listener = TcpListener::bind(addr)?;
let socket = UdpSocket::bind(addr)?;
let mut buf = vec![0u8; 1024];
loop {
//println!("skel: Waiting for incoming connections...");
let (mut stream, _from_addr) = listener.accept()?;
let _sz = stream.read(&mut buf)?;
//println!("skel: Waiting for incoming packets...");
let (_sz, src) = socket.recv_from(&mut buf)?;

// TODO: Return a descriptive error message if deserialization fails.
let req = deserialize(&buf).expect("deserialize request");

if let Some(resp) = self.process_request(req) {
stream.write(&serialize(&resp).expect("serialize response"))?;
socket.send_to(&serialize(&resp).expect("serialize response"), &src)?;
}
}
}
Expand Down
63 changes: 39 additions & 24 deletions src/accountant_stub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,22 +2,23 @@
//! event log to record transactions. Its users can deposit funds and
//! transfer funds to other users.

use std::net::TcpStream;
use std::net::UdpSocket;
use std::io;
use std::io::{Read, Write};
use bincode::{deserialize, serialize};
use log::{PublicKey, Signature};
use ring::signature::Ed25519KeyPair;
use accountant_skel::{Request, Response};

pub struct AccountantStub {
pub addr: String,
pub socket: UdpSocket,
}

impl AccountantStub {
pub fn new(addr: &str) -> Self {
pub fn new(addr: &str, socket: UdpSocket) -> Self {
AccountantStub {
addr: addr.to_string(),
socket,
}
}

Expand All @@ -29,15 +30,14 @@ impl AccountantStub {
) -> io::Result<usize> {
let req = Request::Deposit { key, val, sig };
let data = serialize(&req).unwrap();
let mut stream = TcpStream::connect(&self.addr)?;
stream.write(&data)
self.socket.send_to(&data, &self.addr)
}

pub fn deposit(self: &mut Self, n: u64, keypair: &Ed25519KeyPair) -> io::Result<usize> {
pub fn deposit(self: &mut Self, n: u64, keypair: &Ed25519KeyPair) -> io::Result<Signature> {
use log::{get_pubkey, sign_serialized};
let key = get_pubkey(keypair);
let sig = sign_serialized(&n, keypair);
self.deposit_signed(key, n, sig)
self.deposit_signed(key, n, sig).map(|_| sig)
}

pub fn transfer_signed(
Expand All @@ -49,33 +49,47 @@ impl AccountantStub {
) -> io::Result<usize> {
let req = Request::Transfer { from, to, val, sig };
let data = serialize(&req).unwrap();
let mut stream = TcpStream::connect(&self.addr)?;
stream.write(&data)
self.socket.send_to(&data, &self.addr)
}

pub fn transfer(
self: &mut Self,
n: u64,
keypair: &Ed25519KeyPair,
to: PublicKey,
) -> io::Result<usize> {
) -> io::Result<Signature> {
use log::{get_pubkey, sign_transaction_data};
let from = get_pubkey(keypair);
let sig = sign_transaction_data(&n, keypair, &to);
self.transfer_signed(from, to, n, sig)
self.transfer_signed(from, to, n, sig).map(|_| sig)
}

pub fn get_balance(self: &mut Self, pubkey: &PublicKey) -> io::Result<u64> {
let mut stream = TcpStream::connect(&self.addr)?;
let req = Request::GetBalance { key: *pubkey };
let data = serialize(&req).expect("serialize GetBalance");
stream.write(&data)?;
self.socket.send_to(&data, &self.addr)?;
let mut buf = vec![0u8; 1024];
stream.read(&mut buf)?;
self.socket.recv_from(&mut buf)?;
let resp = deserialize(&buf).expect("deserialize balance");
let Response::Balance { key, val } = resp;
assert_eq!(key, *pubkey);
Ok(val)
if let Response::Balance { key, val } = resp {
assert_eq!(key, *pubkey);
return Ok(val);
}
Ok(0)
}

pub fn wait_on_signature(self: &mut Self, wait_sig: &Signature) -> io::Result<()> {
let req = Request::Wait { sig: *wait_sig };
let data = serialize(&req).unwrap();
self.socket.send_to(&data, &self.addr).map(|_| ())?;

let mut buf = vec![0u8; 1024];
self.socket.recv_from(&mut buf)?;
let resp = deserialize(&buf).expect("deserialize signature");
if let Response::Confirmed { sig } = resp {
assert_eq!(sig, *wait_sig);
}
Ok(())
}
}

Expand All @@ -90,7 +104,8 @@ mod tests {

#[test]
fn test_accountant_stub() {
let addr = "127.0.0.1:8000";
let addr = "127.0.0.1:9000";
let send_addr = "127.0.0.1:9001";
spawn(move || {
let zero = Sha256Hash::default();
let acc = Accountant::new(&zero, None);
Expand All @@ -100,17 +115,17 @@ mod tests {

sleep(Duration::from_millis(30));

let mut acc = AccountantStub::new(addr);
let socket = UdpSocket::bind(send_addr).unwrap();
let mut acc = AccountantStub::new(addr, socket);
let alice_keypair = generate_keypair();
let bob_keypair = generate_keypair();
acc.deposit(10_000, &alice_keypair).unwrap();
acc.deposit(1_000, &bob_keypair).unwrap();
let sig = acc.deposit(1_000, &bob_keypair).unwrap();
acc.wait_on_signature(&sig).unwrap();

sleep(Duration::from_millis(30));
let bob_pubkey = get_pubkey(&bob_keypair);
acc.transfer(500, &alice_keypair, bob_pubkey).unwrap();

sleep(Duration::from_millis(300));
let sig = acc.transfer(500, &alice_keypair, bob_pubkey).unwrap();
acc.wait_on_signature(&sig).unwrap();
assert_eq!(acc.get_balance(&bob_pubkey).unwrap(), 1_500);
}
}
Loading