diff --git a/src/algs.rs b/src/algs.rs index b67f283..4f0048c 100644 --- a/src/algs.rs +++ b/src/algs.rs @@ -101,7 +101,10 @@ pub fn ipc_valid(v: String) -> std::result::Result<(), String> { /// } /// /// fn main() { -/// portus::start!("unix", None, MyCongestionControlAlgorithm(Default::default())); +/// let handle = portus::spawn!("unix", None, MyCongestionControlAlgorithm(Default::default()), 4u32); +/// std::thread::sleep(std::time::Duration::from_secs(2)); +/// handle.kill(); +/// handle.wait(); /// } /// ``` #[macro_export] @@ -111,32 +114,88 @@ macro_rules! start { $crate::start!($ipc, $log, $alg, Blocking) }}; ($ipc:expr, $log:expr, $alg: expr, $blk: ty) => {{ - use $crate::ipc::BackendBuilder; + use $crate::ipc::SingleBackendBuilder; match $ipc { "unix" => { use $crate::ipc::unix::Socket; - let b = Socket::<$blk>::new("in", "out") - .map(|sk| BackendBuilder { sock: sk }) + let b = Socket::<$blk>::new(0, "in", "out") + .map(|sk| SingleBackendBuilder { sock: sk }) .expect("ipc initialization"); - $crate::run::<_, _>(b, $crate::Config { logger: $log }, $alg) + $crate::run::<_, _, SingleBackendBuilder<_>>( + b, + $crate::Config { logger: $log }, + $alg, + ) } #[cfg(all(target_os = "linux"))] "netlink" => { use $crate::ipc::netlink::Socket; let b = Socket::<$blk>::new() - .map(|sk| BackendBuilder { sock: sk }) + .map(|sk| SingleBackendBuilder { sock: sk }) .expect("ipc initialization"); - $crate::run::<_, _>(b, $crate::Config { logger: $log }, $alg) + $crate::run::<_, _, SingleBackendBuilder<_>>( + b, + $crate::Config { logger: $log }, + $alg, + ) } #[cfg(all(target_os = "linux"))] "char" => { use $crate::ipc::kp::Socket; let b = Socket::<$blk>::new() - .map(|sk| BackendBuilder { sock: sk }) + .map(|sk| SingleBackendBuilder { sock: sk }) .expect("ipc initialization"); - $crate::run::<_, _>(b, $crate::Config { logger: $log }, $alg) + $crate::run::<_, _, SingleBackendBuilder<_>>( + b, + $crate::Config { logger: $log }, + $alg, + ) } _ => unreachable!(), } }}; + ($ipc:expr, $log:expr, $alg:expr, $blk:ty, $nthreads: expr) => {{ + use std::convert::TryInto; + use $crate::ipc::MultiBackendBuilder; + match $ipc { + "unix" => { + use $crate::ipc::unix::Socket; + let mut v = vec![]; + for i in 0..$nthreads { + v.push(Socket::<$blk>::new(i.try_into().unwrap(), "in", "out").unwrap()) + } + let b = MultiBackendBuilder { socks: v }; + $crate::run::<_, _, MultiBackendBuilder<_>>( + b, + $crate::Config { logger: $log }, + $alg, + ) + } + _ => unimplemented!(), + } + }}; +} +#[macro_export] +macro_rules! spawn { + ($ipc:expr, $log:expr, $alg:expr, $nthreads: expr) => {{ + use std::convert::TryInto; + use $crate::ipc::Blocking; + use $crate::ipc::MultiBackendBuilder; + match $ipc { + "unix" => { + use $crate::ipc::unix::Socket; + let mut v = vec![]; + for i in 0..$nthreads { + v.push(Socket::::new(i.try_into().unwrap(), "in", "out").unwrap()) + } + let b = MultiBackendBuilder { socks: v }; + $crate::spawn::<_, _, MultiBackendBuilder<_>>( + b, + $crate::Config { logger: $log }, + $alg, + ) + } + _ => unimplemented!(), + } + }}; } diff --git a/src/bin/ipc_latency.rs b/src/bin/ipc_latency.rs index a51a255..ce07fec 100644 --- a/src/bin/ipc_latency.rs +++ b/src/bin/ipc_latency.rs @@ -14,9 +14,10 @@ use std::sync::{atomic, Arc}; use time::Duration; #[derive(Debug)] -struct TimeMsg(time::Timespec); +pub struct TimeMsg(time::Timespec); use std::io::prelude::*; + impl portus::serialize::AsRawMsg for TimeMsg { fn get_hdr(&self) -> (u8, u32, u32) { (0xff, portus::serialize::HDR_LENGTH + 8 + 4, 0) @@ -38,16 +39,19 @@ impl portus::serialize::AsRawMsg for TimeMsg { Ok(()) } - fn from_raw_msg(msg: portus::serialize::RawMsg) -> portus::Result { - let b = msg.get_bytes()?; - let sec = LittleEndian::read_i64(&b[0..8]); - let nsec = LittleEndian::read_i32(&b[8..12]); - Ok(TimeMsg(time::Timespec::new(sec, nsec))) + fn from_raw_msg(_msg: portus::serialize::RawMsg) -> portus::Result { + unimplemented!() } } +pub fn deserialize_timemsg(msg: portus::serialize::other::Msg) -> portus::Result { + let b = msg.get_raw_bytes(); + let sec = LittleEndian::read_i64(&b[0..8]); + let nsec = LittleEndian::read_i32(&b[8..12]); + Ok(TimeMsg(time::Timespec::new(sec, nsec))) +} #[derive(Debug)] -struct NlTimeMsg { +pub struct NlTimeMsg { kern_rt: time::Timespec, kern_st: time::Timespec, } @@ -74,29 +78,32 @@ impl portus::serialize::AsRawMsg for NlTimeMsg { Ok(()) } - fn from_raw_msg(msg: portus::serialize::RawMsg) -> portus::Result { - let b = msg.get_bytes()?; - let up_sec = LittleEndian::read_i64(&b[0..8]); - let up_nsec = LittleEndian::read_i32(&b[8..12]); - let down_sec = LittleEndian::read_i64(&b[12..20]); - let down_nsec = LittleEndian::read_i32(&b[20..24]); - Ok(NlTimeMsg { - kern_rt: time::Timespec::new(up_sec, up_nsec), - kern_st: time::Timespec::new(down_sec, down_nsec), - }) + fn from_raw_msg(_msg: portus::serialize::RawMsg) -> portus::Result { + unimplemented!() } } +pub fn deserialize_nltimemsg(msg: portus::serialize::other::Msg) -> portus::Result { + let b = msg.get_raw_bytes(); + let up_sec = LittleEndian::read_i64(&b[0..8]); + let up_nsec = LittleEndian::read_i32(&b[8..12]); + let down_sec = LittleEndian::read_i64(&b[12..20]); + let down_nsec = LittleEndian::read_i32(&b[20..24]); + Ok(NlTimeMsg { + kern_rt: time::Timespec::new(up_sec, up_nsec), + kern_st: time::Timespec::new(down_sec, down_nsec), + }) +} -use portus::serialize::AsRawMsg; +use portus::ipc::SingleBackend; use std::sync::mpsc; -fn bench(b: BackendSender, mut l: Backend, iter: u32) -> Vec { +fn bench(b: BackendSender, mut l: SingleBackend, iter: u32) -> Vec { (0..iter) .map(|_| { let then = time::get_time(); let msg = portus::serialize::serialize(&TimeMsg(then)).expect("serialize"); b.send_msg(&msg[..]).expect("send ts"); if let portus::serialize::Msg::Other(raw) = l.next().expect("receive echo") { - let then = TimeMsg::from_raw_msg(raw).expect("get time from raw"); + let then = deserialize_timemsg(raw).expect("get time from raw"); time::get_time() - then.0 } else { panic!("wrong type"); @@ -134,11 +141,8 @@ macro_rules! netlink_bench { // listen let c1 = thread::spawn(move || { - let mut buf = [0u8; 1024]; let mut nl = portus::ipc::netlink::Socket::<$mode>::new() - .map(|sk| { - Backend::new(sk, Arc::new(atomic::AtomicBool::new(true)), &mut buf[..]) - }) + .map(|sk| SingleBackend::new(sk, Arc::new(atomic::AtomicBool::new(true)))) .expect("nl ipc initialization"); tx.send(vec![]).expect("ok to insmod"); nl.next().expect("receive echo"); @@ -153,7 +157,7 @@ macro_rules! netlink_bench { if let portus::serialize::Msg::Other(raw) = nl.next().expect("recv echo") { let portus_rt = time::get_time(); let kern_recv_msg = - NlTimeMsg::from_raw_msg(raw).expect("get time from raw"); + deserialize_nltimemsg(raw).expect("get time from raw"); return NlDuration( portus_rt - portus_send_time, kern_recv_msg.kern_rt - portus_send_time, @@ -229,15 +233,8 @@ macro_rules! kp_bench { .expect("load failed"); let c1 = thread::spawn(move || { - let mut receive_buf = [0u8; 1024]; let kp = portus::ipc::kp::Socket::<$mode>::new() - .map(|sk| { - Backend::new( - sk, - Arc::new(atomic::AtomicBool::new(true)), - &mut receive_buf[..], - ) - }) + .map(|sk| SingleBackend::new(sk, Arc::new(atomic::AtomicBool::new(true)))) .expect("kp ipc initialization"); tx.send(bench(kp.sender(), kp, iter)).expect("report rtts"); }); @@ -269,15 +266,8 @@ macro_rules! unix_bench { // listen let c1 = thread::spawn(move || { - let mut receive_buf = [0u8; 1024]; - let unix = portus::ipc::unix::Socket::<$mode>::new("in", "out") - .map(|sk| { - Backend::new( - sk, - Arc::new(atomic::AtomicBool::new(true)), - &mut receive_buf[..], - ) - }) + let unix = portus::ipc::unix::Socket::<$mode>::new(1, "in", "out") + .map(|sk| SingleBackend::new(sk, Arc::new(atomic::AtomicBool::new(true)))) .expect("unix ipc initialization"); ready_rx.recv().expect("sync"); tx.send(bench(unix.sender(), unix, iter)) @@ -286,7 +276,8 @@ macro_rules! unix_bench { // echo-er let c2 = thread::spawn(move || { - let sk = portus::ipc::unix::Socket::::new("out", "in").expect("sk init"); + let sk = + portus::ipc::unix::Socket::::new(1, "out", "in").expect("sk init"); let mut buf = [0u8; 1024]; ready_tx.send(true).expect("sync"); for _ in 0..iter { diff --git a/src/ipc/mod.rs b/src/ipc/mod.rs index e3629d9..ae71ddd 100644 --- a/src/ipc/mod.rs +++ b/src/ipc/mod.rs @@ -1,8 +1,8 @@ //! A library wrapping various IPC mechanisms with a datagram-oriented //! messaging layer. This is how CCP communicates with the datapath. -use std::rc::{Rc, Weak}; -use std::sync::{atomic, Arc}; +//use std::rc::{Rc, Weak}; +use std::sync::{atomic, Arc, Weak}; use super::Error; use super::Result; @@ -30,24 +30,40 @@ pub trait Ipc: 'static + Send { fn close(&mut self) -> Result<()>; } +/// This trait allows for the backend to be either single or mutli-threaded +pub trait Backend { + //fn new(sock: T, continue_listening: Arc) -> Self; + fn sender(&self) -> BackendSender; + fn broadcaster(&self) -> BackendBroadcaster; + fn clone_atomic_bool(&self) -> Arc; + fn next(&mut self) -> Option; +} + /// Marker type specifying that the IPC socket should make blocking calls to the underlying socket pub struct Blocking; /// Marker type specifying that the IPC socket should make nonblocking calls to the underlying socket pub struct Nonblocking; +pub trait BackendBuilder +where + T: Ipc + std::marker::Sync, +{ + type Back: Backend; + fn build(self, atomic_bool: Arc) -> Self::Back; +} /// Backend builder contains the objects /// needed to build a new backend. -pub struct BackendBuilder { +pub struct SingleBackendBuilder { pub sock: T, } -impl BackendBuilder { - pub fn build<'a>( - self, - atomic_bool: Arc, - receive_buf: &'a mut [u8], - ) -> Backend<'a, T> { - Backend::new(self.sock, atomic_bool, receive_buf) +impl BackendBuilder for SingleBackendBuilder +where + T: Ipc + std::marker::Sync, +{ + type Back = SingleBackend; + fn build(self, atomic_bool: Arc) -> Self::Back { + SingleBackend::new(self.sock, atomic_bool) } } @@ -69,47 +85,155 @@ impl Clone for BackendSender { } } +pub struct BackendBroadcaster(Vec>); + +impl BackendBroadcaster { + pub fn broadcast_msg(&self, msg: &[u8]) -> Result<()> { + for s in &self.0 { + s.send_msg(msg)?; + } + Ok(()) + } +} + +pub struct MultiBackendBuilder { + pub socks: Vec, +} +impl BackendBuilder for MultiBackendBuilder +where + T: Ipc + std::marker::Sync, +{ + type Back = MultiBackend; + fn build(self, atomic_bool: Arc) -> Self::Back { + MultiBackend::new(self.socks, atomic_bool) + } +} + +use crossbeam::channel::{unbounded, Receiver, Select}; +pub struct MultiBackend { + last_recvd: Option, + continue_listening: Arc, + sel: Select<'static>, + backends: Vec>, + receivers: &'static [Receiver>], + receivers_ptr: *mut [Receiver>], +} + +impl MultiBackend +where + T: Ipc + std::marker::Sync, +{ + pub fn new(socks: Vec, continue_listening: Arc) -> Self { + let mut backends = Vec::new(); + let mut receivers = Vec::new(); + + for sock in socks { + let mut backend = SingleBackend::new(sock, Arc::clone(&continue_listening)); + backends.push(backend.sender()); + + let (s, r) = unbounded(); + receivers.push(r); + + std::thread::spawn(move || loop { + let m = backend.next(); + let done = m.is_none(); + if s.send(m).is_err() { + panic!("send failed: receiver not listening"); + } + if done { + break; + } + }); + } + + let mut sel = Select::new(); + let recv_ptr = Box::into_raw(Vec::into_boxed_slice(receivers)); + let recv_slice: &'static [Receiver>] = unsafe { &*recv_ptr }; + for r in recv_slice { + sel.recv(r); + } + + MultiBackend { + last_recvd: None, + continue_listening, + sel, + backends, + receivers: recv_slice, + receivers_ptr: recv_ptr, + } + } +} +impl Drop for MultiBackend { + fn drop(&mut self) { + // clear the select + std::mem::replace(&mut self.sel, Select::new()); + // recover the box, but don't drop it just yet so we can clear the slice first + let _b = unsafe { Box::from_raw(self.receivers_ptr) }; + // clear the slice + std::mem::replace(&mut self.receivers, &[]); + // now we can drop the box safely + } +} + +impl Backend for MultiBackend { + fn sender(&self) -> BackendSender { + match self.last_recvd { + Some(i) => self.backends[i as usize].clone(), + None => { + panic!("No messages have been received yet, so there is no corresponding sender") + } + } + } + + fn broadcaster(&self) -> BackendBroadcaster { + BackendBroadcaster(self.backends.clone()) + } + + /// Return a copy of the flag variable that indicates that the + /// `Backend` should continue listening (i.e., not exit). + fn clone_atomic_bool(&self) -> Arc { + Arc::clone(&(self.continue_listening)) + } + + fn next(&mut self) -> Option { + let oper = self.sel.select(); + let index = oper.index(); + self.last_recvd = Some(index); + oper.recv(&self.receivers[index]).unwrap() // TODO don't just unwrap + } +} + /// Backend will yield incoming IPC messages forever via `next()`. /// It owns the socket; `BackendSender` holds weak references. /// The atomic bool is a way to stop iterating. -pub struct Backend<'a, T: Ipc> { - sock: Rc, +pub struct SingleBackend { + sock: Arc, continue_listening: Arc, - receive_buf: &'a mut [u8], + receive_buf: [u8; 1024], tot_read: usize, read_until: usize, } use crate::serialize::Msg; -impl<'a, T: Ipc> Backend<'a, T> { - pub fn new( - sock: T, - continue_listening: Arc, - receive_buf: &'a mut [u8], - ) -> Backend<'a, T> { - Backend { - sock: Rc::new(sock), - continue_listening, - receive_buf, - tot_read: 0, - read_until: 0, - } +impl Backend for SingleBackend { + fn sender(&self) -> BackendSender { + BackendSender(Arc::downgrade(&self.sock)) } - pub fn sender(&self) -> BackendSender { - BackendSender(Rc::downgrade(&self.sock)) + fn broadcaster(&self) -> BackendBroadcaster { + BackendBroadcaster(vec![self.sender()]) } /// Return a copy of the flag variable that indicates that the /// `Backend` should continue listening (i.e., not exit). - pub fn clone_atomic_bool(&self) -> Arc { + fn clone_atomic_bool(&self) -> Arc { Arc::clone(&(self.continue_listening)) } /// Get the next IPC message. // This is similar to `impl Iterator`, but the returned value is tied to the lifetime // of `self`, so we cannot implement that trait. - pub fn next(&mut self) -> Option> { + fn next(&mut self) -> Option { // if we have leftover buffer from the last read, parse another message. if self.read_until < self.tot_read { let (msg, consumed) = Msg::from_buf(&self.receive_buf[self.read_until..]).ok()?; @@ -125,6 +249,18 @@ impl<'a, T: Ipc> Backend<'a, T> { Some(msg) } } +} + +impl SingleBackend { + pub fn new(sock: T, continue_listening: Arc) -> Self { + SingleBackend { + sock: Arc::new(sock), + continue_listening, + receive_buf: [0u8; 1024], + tot_read: 0, + read_until: 0, + } + } // calls IPC repeatedly to read one or more messages. // Returns a slice into self.receive_buf covering the read data @@ -135,7 +271,7 @@ impl<'a, T: Ipc> Backend<'a, T> { return Err(Error(String::from("Done"))); } - let read = match self.sock.recv(self.receive_buf) { + let read = match self.sock.recv(&mut self.receive_buf) { Ok(l) => l, _ => continue, }; @@ -149,9 +285,9 @@ impl<'a, T: Ipc> Backend<'a, T> { } } -impl<'a, T: Ipc> Drop for Backend<'a, T> { +impl Drop for SingleBackend { fn drop(&mut self) { - Rc::get_mut(&mut self.sock) + Arc::get_mut(&mut self.sock) .ok_or_else(|| { Error(String::from( "Could not get exclusive ref to socket to close", diff --git a/src/ipc/test.rs b/src/ipc/test.rs index 705baff..7a47848 100644 --- a/src/ipc/test.rs +++ b/src/ipc/test.rs @@ -1,6 +1,8 @@ use super::Ipc; use std::sync::{Arc, Mutex}; +use crate::ipc::Backend; + #[derive(Clone)] pub struct FakeIpc(Arc>>); @@ -49,9 +51,8 @@ fn test_unix() { let c2 = thread::spawn(move || { rx.recv().expect("chan rcv"); - let sk2 = super::unix::Socket::::new("out", "in").expect("init socket"); - let mut buf = [0u8; 1024]; - let b2 = super::Backend::new(sk2, Arc::new(atomic::AtomicBool::new(true)), &mut buf[..]); + let sk2 = super::unix::Socket::::new(1, "out", "in").expect("init socket"); + let b2 = super::SingleBackend::new(sk2, Arc::new(atomic::AtomicBool::new(true))); let test_msg = TestMsg(String::from("hello, world")); let test_msg_buf = serialize::serialize(&test_msg).expect("serialize test msg"); b2.sender() @@ -59,16 +60,15 @@ fn test_unix() { .expect("send message"); }); - let sk1 = super::unix::Socket::::new("in", "out").expect("init socket"); - let mut buf = [0u8; 1024]; - let mut b1 = super::Backend::new(sk1, Arc::new(atomic::AtomicBool::new(true)), &mut buf[..]); + let sk1 = super::unix::Socket::::new(1, "in", "out").expect("init socket"); + let mut b1 = super::SingleBackend::new(sk1, Arc::new(atomic::AtomicBool::new(true))); tx.send(true).expect("chan send"); match b1.next().expect("receive message") { // Msg::Other(RawMsg) Msg::Other(r) => { assert_eq!(r.typ, 0xff); assert_eq!(r.len, serialize::HDR_LENGTH + "hello, world".len() as u32); - assert_eq!(r.get_bytes().unwrap(), "hello, world".as_bytes()); + assert_eq!(r.get_raw_bytes(), "hello, world".as_bytes()); } _ => unreachable!(), } @@ -93,8 +93,7 @@ fn test_chan() { let c2 = thread::spawn(move || { rx.recv().expect("chan rcv"); let sk2 = super::chan::Socket::::new(s1, r2); - let mut buf = [0u8; 1024]; - let b2 = super::Backend::new(sk2, Arc::new(atomic::AtomicBool::new(true)), &mut buf[..]); + let b2 = super::SingleBackend::new(sk2, Arc::new(atomic::AtomicBool::new(true))); let test_msg = TestMsg(String::from("hello, world")); let test_msg_buf = serialize::serialize(&test_msg).expect("serialize test msg"); b2.sender() @@ -103,15 +102,14 @@ fn test_chan() { }); let sk1 = super::chan::Socket::::new(s2, r1); - let mut buf = [0u8; 1024]; - let mut b1 = super::Backend::new(sk1, Arc::new(atomic::AtomicBool::new(true)), &mut buf[..]); + let mut b1 = super::SingleBackend::new(sk1, Arc::new(atomic::AtomicBool::new(true))); tx.send(true).expect("chan send"); match b1.next().expect("receive message") { // Msg::Other(RawMsg) Msg::Other(r) => { assert_eq!(r.typ, 0xff); assert_eq!(r.len, serialize::HDR_LENGTH + "hello, world".len() as u32); - assert_eq!(r.get_bytes().unwrap(), "hello, world".as_bytes()); + assert_eq!(r.get_raw_bytes(), "hello, world".as_bytes()); } _ => unreachable!(), } diff --git a/src/ipc/unix.rs b/src/ipc/unix.rs index a18b6b5..0419ce6 100644 --- a/src/ipc/unix.rs +++ b/src/ipc/unix.rs @@ -6,9 +6,11 @@ use super::Result; use std::marker::PhantomData; macro_rules! unix_addr { - // TODO for now assumes just a single CCP (id=0) - ($x:expr) => { - format!("/tmp/ccp/0/{}", $x) + ($id:expr) => { + format!("/tmp/ccp/{}", $id) + }; + ($id:expr, $dir:expr) => { + format!("/tmp/ccp/{}/{}", $id, $dir) }; } @@ -16,17 +18,18 @@ pub struct Socket { sk: UnixDatagram, dest: String, _phantom: PhantomData, + _id: u8 } impl Socket { // Only the CCP process is allowed to use id = 0. // For all other datapaths, they should use a known unique identifier // such as the port number. - fn __new(bind_to: &str, send_to: &str) -> Result { - let bind_to_addr = unix_addr!(bind_to.to_string()); - let send_to_addr = unix_addr!(send_to.to_string()); + fn __new(id: u8, bind_to: &str, send_to: &str) -> Result { + let bind_to_addr = unix_addr!(id, bind_to); + let send_to_addr = unix_addr!(id, send_to); // create dir if not already exists - match std::fs::create_dir_all("/tmp/ccp/0").err() { + match std::fs::create_dir_all(unix_addr!(id)).err() { Some(ref e) if e.kind() == std::io::ErrorKind::AlreadyExists => Ok(()), Some(e) => Err(e), None => Ok(()), @@ -45,6 +48,7 @@ impl Socket { sk: sock, dest: send_to_addr, _phantom: PhantomData, + _id : id }) } } @@ -73,15 +77,15 @@ impl super::Ipc for Socket { use super::Blocking; impl Socket { - pub fn new(bind_to: &str, send_to: &str) -> Result { - Socket::__new(bind_to, send_to) + pub fn new(id: u8, bind_to: &str, send_to: &str) -> Result { + Socket::__new(id, bind_to, send_to) } } use super::Nonblocking; impl Socket { - pub fn new(bind_to: &str, send_to: &str) -> Result { - let sk = Socket::__new(bind_to, send_to)?; + pub fn new(id: u8, bind_to: &str, send_to: &str) -> Result { + let sk = Socket::__new(id, bind_to, send_to)?; sk.sk.set_nonblocking(true).map_err(Error::from)?; Ok(sk) } diff --git a/src/lib.rs b/src/lib.rs index 495e58d..1c5d666 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -102,7 +102,7 @@ mod errors; pub use crate::errors::*; use crate::ipc::Ipc; -use crate::ipc::{BackendBuilder, BackendSender}; +use crate::ipc::{BackendBroadcaster, BackendBuilder, BackendSender}; use crate::lang::{Bin, Reg, Scope}; use crate::serialize::Msg; @@ -222,7 +222,12 @@ impl DatapathTrait for Datapath { } } -fn send_and_install(sock_id: u32, sender: &BackendSender, bin: Bin, sc: &Scope) -> Result<()> +fn send_and_install( + sock_id: u32, + sender: &BackendBroadcaster, + bin: Bin, + sc: &Scope, +) -> Result<()> where I: Ipc, { @@ -234,7 +239,7 @@ where instrs: bin, }; let buf = serialize::serialize(&msg)?; - sender.send_msg(&buf[..])?; + sender.broadcast_msg(&buf[..])?; Ok(()) } @@ -426,10 +431,11 @@ impl CCPHandle { /// Run() or spawn() create arc objects, /// which are passed into run_inner to build the backend, so spawn() can create a CCPHandle that references this /// boolean to kill the thread. -pub fn run(backend_builder: BackendBuilder, cfg: Config, alg: U) -> Result +pub fn run(backend_builder: B, cfg: Config, alg: U) -> Result where - I: Ipc, + I: Ipc + Sync, U: CongAlg, + B: BackendBuilder, { // call run_inner match run_inner( @@ -452,10 +458,11 @@ where /// 3. The caller calls `CCPHandle::kill()` /// /// See [`run`](./fn.run.html) for more information. -pub fn spawn(backend_builder: BackendBuilder, cfg: Config, alg: U) -> CCPHandle +pub fn spawn(backend_builder: B, cfg: Config, alg: U) -> CCPHandle where - I: Ipc, + I: Ipc + Sync, U: CongAlg + 'static + Send, + B: BackendBuilder + 'static + Send, { let stop_signal = Arc::new(atomic::AtomicBool::new(true)); CCPHandle { @@ -464,6 +471,39 @@ where } } +fn install_programs( + programs: &HashMap<&'static str, String>, + mut scope_map: &mut Rc>, + b: &BackendBroadcaster, +) -> Result<()> { + for (program_name, program) in programs.iter() { + match lang::compile(program.as_bytes(), &[]) { + Ok((bin, sc)) => { + match send_and_install(0, b, bin, &sc) { + Ok(_) => {} + Err(e) => { + return Err(Error(format!( + "Failed to install datapath program \"{}\": {:?}", + program_name, e + ))); + } + } + Rc::get_mut(&mut scope_map) + .unwrap() + .insert(program_name.to_string(), sc.clone()); + } + Err(e) => { + return Err(Error(format!( + "Datapath program \"{}\" failed to compile: {:?}", + program_name, e + ))); + } + } + } + Ok(()) +} + +use crate::ipc::Backend; // Main execution inner loop of ccp. // Blocks "forever", or until the iterator stops iterating. // @@ -475,20 +515,19 @@ where // It returns any error, either from: // 1. the IPC channel failing // 2. Receiving an install control message (only the datapath should receive these). -fn run_inner( +fn run_inner( continue_listening: Arc, - backend_builder: BackendBuilder, + backend_builder: B, cfg: Config, alg: U, ) -> Result<()> where - I: Ipc, + I: Ipc + Sync, U: CongAlg, + B: BackendBuilder, { - let mut receive_buf = [0u8; 1024]; - let mut b = backend_builder.build(continue_listening.clone(), &mut receive_buf[..]); + let mut b = backend_builder.build(continue_listening.clone()); let mut flows = HashMap::::default(); - let backend = b.sender(); if let Some(log) = cfg.logger.as_ref() { info!(log, "starting CCP"; @@ -500,30 +539,35 @@ where let mut scope_map = Rc::new(HashMap::::default()); let programs = alg.datapath_programs(); - for (program_name, program) in programs.iter() { - match lang::compile(program.as_bytes(), &[]) { - Ok((bin, sc)) => { - match send_and_install(0, &backend, bin, &sc) { - Ok(_) => {} - Err(e) => { - return Err(Error(format!( - "Failed to install datapath program \"{}\": {:?}", - program_name, e - ))); - } + let broadcaster = b.broadcaster(); + match install_programs(&programs, &mut scope_map, &broadcaster) { + Ok(()) => {} // great, the datapath is ready, keep going! + Err(_) => { + if let Some(log) = cfg.logger.as_ref() { + info!( + log, + "could not connect to datapath, waiting for datapath to initiate..." + ); + } + // the datapath is not ready yet, let's wait for it to send us a msg + let got = b.next(); + if got.is_none() { + if !continue_listening.load(atomic::Ordering::SeqCst) { + return Ok(()); + } else { + return Err(Error(String::from("The IPC channel has closed."))); } - Rc::get_mut(&mut scope_map) - .unwrap() - .insert(program_name.to_string(), sc.clone()); } - Err(e) => { - return Err(Error(format!( - "Datapath program \"{}\" failed to compile: {:?}", - program_name, e - ))); + if let Some(log) = cfg.logger.as_ref() { + info!(log, "got message from datapath, installing programs..."); } + // got a msg from the datapath, it must be up, so let's try to install programs again + install_programs(&programs, &mut scope_map, &broadcaster).unwrap(); } } + if let Some(log) = cfg.logger.as_ref() { + info!(log, "programs installed, CCP ready"); + } while let Some(msg) = b.next() { match msg { @@ -549,7 +593,7 @@ where let f = alg.new_flow( Datapath { sock_id: c.sid, - sender: backend.clone(), + sender: b.sender(), // backend.clone(), programs: scope_map.clone(), }, DatapathInfo { diff --git a/src/serialize/mod.rs b/src/serialize/mod.rs index 3ef1109..7fb7971 100644 --- a/src/serialize/mod.rs +++ b/src/serialize/mod.rs @@ -156,6 +156,7 @@ pub mod changeprog; pub mod create; pub mod install; pub mod measure; +pub mod other; mod testmsg; pub mod update_field; @@ -193,21 +194,21 @@ fn deserialize(buf: &[u8]) -> Result { /// a Msg of the corresponding type. If the message type is unkown, returns a /// wrapper with direct access to the message bytes. #[derive(Debug, PartialEq)] -pub enum Msg<'a> { +pub enum Msg { Cr(create::Msg), Ms(measure::Msg), Ins(install::Msg), - Other(RawMsg<'a>), + Other(other::Msg), } -impl<'a> Msg<'a> { +impl Msg { fn from_raw_msg(m: RawMsg) -> Result { match m.typ { create::CREATE => Ok(Msg::Cr(create::Msg::from_raw_msg(m)?)), measure::MEASURE => Ok(Msg::Ms(measure::Msg::from_raw_msg(m)?)), install::INSTALL => Ok(Msg::Ins(install::Msg::from_raw_msg(m)?)), update_field::UPDATE_FIELD => unimplemented!(), - _ => Ok(Msg::Other(m)), + _ => Ok(Msg::Other(other::Msg::from_raw_msg(m)?)), } } @@ -288,13 +289,13 @@ mod tests { #[test] fn test_other_msg() { use super::testmsg; - use super::AsRawMsg; + let m = testmsg::Msg(String::from("testing")); let buf: Vec = super::serialize::(&m.clone()).expect("serialize"); let (msg, _) = Msg::from_buf(&buf[..]).expect("deserialize"); match msg { - Msg::Other(raw) => { - let got = testmsg::Msg::from_raw_msg(raw).expect("get raw msg"); + Msg::Other(o) => { + let got = testmsg::Msg::from_other_msg(o).expect("get raw msg"); assert_eq!(m, got); } _ => panic!("wrong type for message"), @@ -304,7 +305,6 @@ mod tests { #[test] fn test_multi_msg() { use super::testmsg; - use super::AsRawMsg; let m1 = testmsg::Msg(String::from("foo")); let m2 = testmsg::Msg(String::from("bar")); @@ -313,8 +313,8 @@ mod tests { let (msg, len1) = Msg::from_buf(&buf[..]).expect("deserialize"); match msg { - Msg::Other(raw) => { - let got = testmsg::Msg::from_raw_msg(raw).expect("get raw msg"); + Msg::Other(o) => { + let got = testmsg::Msg::from_other_msg(o).expect("get raw msg"); assert_eq!(m1, got); } _ => panic!("wrong type for message"), @@ -322,8 +322,8 @@ mod tests { let (msg, len2) = Msg::from_buf(&buf[len1..]).expect("deserialize"); match msg { - Msg::Other(raw) => { - let got = testmsg::Msg::from_raw_msg(raw).expect("get raw msg"); + Msg::Other(o) => { + let got = testmsg::Msg::from_other_msg(o).expect("get raw msg"); assert_eq!(m2, got); } _ => panic!("wrong type for message"), diff --git a/src/serialize/other.rs b/src/serialize/other.rs new file mode 100644 index 0000000..cc282e9 --- /dev/null +++ b/src/serialize/other.rs @@ -0,0 +1,41 @@ +//! Message sent from datapath to CCP when a new flow starts. + +use super::{AsRawMsg, RawMsg, HDR_LENGTH}; +use crate::Result; +use std::io::prelude::*; + +pub(crate) const OTHER: u8 = 255; + +#[derive(Clone, Debug, PartialEq)] +pub struct Msg { + pub typ: u8, + pub len: u32, + pub sid: u32, + bytes: Vec, +} + +impl AsRawMsg for Msg { + fn get_hdr(&self) -> (u8, u32, u32) { + (OTHER, HDR_LENGTH + self.bytes.len() as u32, self.sid) + } + + fn get_bytes(&self, w: &mut W) -> Result<()> { + w.write_all(&self.bytes)?; + Ok(()) + } + + fn from_raw_msg(msg: RawMsg) -> Result { + Ok(Msg { + typ: msg.typ, + len: msg.len, + sid: msg.sid, + bytes: msg.get_bytes().unwrap().to_vec(), + }) + } +} + +impl Msg { + pub fn get_raw_bytes(&self) -> &[u8] { + &self.bytes[..] + } +} diff --git a/src/serialize/testmsg.rs b/src/serialize/testmsg.rs index 71f268d..0c1e7be 100644 --- a/src/serialize/testmsg.rs +++ b/src/serialize/testmsg.rs @@ -23,3 +23,12 @@ impl AsRawMsg for Msg { Ok(Msg(st)) } } +#[cfg(test)] +impl Msg { + pub fn from_other_msg(msg: super::other::Msg) -> Result { + let b = msg.get_raw_bytes(); + let s = std::str::from_utf8(b)?; + let st = String::from(s); + Ok(Msg(st)) + } +} diff --git a/src/test.rs b/src/test.rs index 76a5b9e..33ff386 100644 --- a/src/test.rs +++ b/src/test.rs @@ -3,14 +3,15 @@ use super::serialize; use std::sync::{atomic, Arc}; use std::thread; +use crate::ipc::Backend; + #[test] fn test_ser_over_ipc() { let (tx, rx) = crossbeam::channel::unbounded(); let sk = ipc::test::FakeIpc::new(); let sk1 = sk.clone(); let c1 = thread::spawn(move || { - let mut buf = [0u8; 1024]; - let mut b1 = ipc::Backend::new(sk1, Arc::new(atomic::AtomicBool::new(true)), &mut buf[..]); + let mut b1 = ipc::SingleBackend::new(sk1, Arc::new(atomic::AtomicBool::new(true))); tx.send(true).expect("ready chan send"); let msg = b1.next().expect("receive message"); assert_eq!( @@ -27,8 +28,7 @@ fn test_ser_over_ipc() { let sk2 = sk.clone(); let c2 = thread::spawn(move || { rx.recv().expect("ready chan rcv"); - let mut buf = [0u8; 1024]; - let b2 = ipc::Backend::new(sk2, Arc::new(atomic::AtomicBool::new(true)), &mut buf[..]); + let b2 = ipc::SingleBackend::new(sk2, Arc::new(atomic::AtomicBool::new(true))); // serialize a message let m = serialize::measure::Msg { @@ -55,13 +55,11 @@ fn bench_ser_over_ipc(b: &mut Bencher) { let (s1, r1) = crossbeam::channel::unbounded(); let (s2, r2) = crossbeam::channel::unbounded(); - let mut buf = [0u8; 1024]; let sk1 = ipc::chan::Socket::::new(s1, r2); - let mut b1 = ipc::Backend::new(sk1, Arc::new(atomic::AtomicBool::new(true)), &mut buf[..]); + let mut b1 = ipc::SingleBackend::new(sk1, Arc::new(atomic::AtomicBool::new(true))); - let mut buf2 = [0u8; 1024]; let sk2 = ipc::chan::Socket::::new(s2, r1); - let b2 = ipc::Backend::new(sk2, Arc::new(atomic::AtomicBool::new(true)), &mut buf2[..]); + let b2 = ipc::SingleBackend::new(sk2, Arc::new(atomic::AtomicBool::new(true))); let m = serialize::measure::Msg { sid: 42, diff --git a/tests/basic.rs b/tests/basic.rs index 67a620a..26b4b6a 100644 --- a/tests/basic.rs +++ b/tests/basic.rs @@ -76,5 +76,9 @@ impl IntegrationTest for TestBasicSerialize { fn basic() { let log = libccp_integration::logger(); info!(log, "starting basic test"); - libccp_integration::run_test::(log, 1); + libccp_integration::run_test::(log, 1, false); + + let log2 = libccp_integration::logger(); + info!(log2, "starting basic test (multi)"); + libccp_integration::run_test::(log2, 1, true); } diff --git a/tests/libccp_integration/mod.rs b/tests/libccp_integration/mod.rs index 30f949e..6b90570 100644 --- a/tests/libccp_integration/mod.rs +++ b/tests/libccp_integration/mod.rs @@ -80,7 +80,7 @@ impl Flow for TestBase { } use portus::ipc::chan::Socket; -use portus::ipc::{BackendBuilder, Blocking}; +use portus::ipc::{Blocking, MultiBackendBuilder, SingleBackendBuilder}; use std; use std::thread; @@ -90,8 +90,23 @@ fn start_ccp( log: slog::Logger, tx: mpsc::Sender>, ) -> portus::CCPHandle { - let b = BackendBuilder { sock: sk }; - portus::spawn::, TestBaseConfig>( + let b = SingleBackendBuilder { sock: sk }; + portus::spawn::, TestBaseConfig, SingleBackendBuilder>>( + b, + portus::Config { + logger: Some(log.clone()), + }, + TestBaseConfig(tx, Some(log.clone()), PhantomData::), + ) +} + +fn start_ccp_multi( + sks: Vec>, + log: slog::Logger, + tx: mpsc::Sender>, +) -> portus::CCPHandle { + let b = MultiBackendBuilder { socks: sks }; + portus::spawn::, TestBaseConfig, MultiBackendBuilder>>( b, portus::Config { logger: Some(log.clone()), @@ -101,7 +116,11 @@ fn start_ccp( } // Runs a specific intergration test -pub fn run_test(log: slog::Logger, num_flows: usize) { +pub fn run_test( + log: slog::Logger, + num_flows: usize, + use_multi: bool, +) { let (tx, rx) = std::sync::mpsc::channel(); // Channel for IPC @@ -113,14 +132,17 @@ pub fn run_test(log: slog::Logger, num_flow let (dp_handle, conn_handles) = mock_datapath::start(dp_log, num_flows, s2, r1); let sk = Socket::::new(s1, r2); - let ccp_handle = start_ccp::(sk, log.clone(), tx); + let ccp_handle = match use_multi { + false => start_ccp::(sk, log.clone(), tx), + true => start_ccp_multi::(vec![sk], log.clone(), tx), + }; // wait for program to finish let wait_for_done = thread::spawn(move || { rx.recv_timeout(std::time::Duration::from_secs(20)) .unwrap() .unwrap(); - ccp_handle.kill(); // causes backend to stop iterating + ccp_handle.kill(); ccp_handle.wait().unwrap(); for h in conn_handles { diff --git a/tests/preset.rs b/tests/preset.rs index 66ad6f1..b2c935d 100644 --- a/tests/preset.rs +++ b/tests/preset.rs @@ -69,5 +69,5 @@ impl IntegrationTest for TestPresetVars { fn preset() { let log = libccp_integration::logger(); info!(log, "starting preset test"); - libccp_integration::run_test::(log, 1); + libccp_integration::run_test::(log, 1, false); } diff --git a/tests/timing.rs b/tests/timing.rs index c025af2..75e1b89 100644 --- a/tests/timing.rs +++ b/tests/timing.rs @@ -83,5 +83,5 @@ impl IntegrationTest for TestTiming { fn timing() { let log = libccp_integration::logger(); info!(log, "starting timing test"); - libccp_integration::run_test::(log, 1); + libccp_integration::run_test::(log, 1, false); } diff --git a/tests/twoflow.rs b/tests/twoflow.rs index 1eb4dd4..8bb881a 100644 --- a/tests/twoflow.rs +++ b/tests/twoflow.rs @@ -88,5 +88,5 @@ impl IntegrationTest for TestTwoFlows { fn twoflow() { let log = libccp_integration::logger(); info!(log, "starting twoflow test"); - libccp_integration::run_test::(log, 2); + libccp_integration::run_test::(log, 2, false); } diff --git a/tests/update.rs b/tests/update.rs index 37d9a61..f9c53df 100644 --- a/tests/update.rs +++ b/tests/update.rs @@ -88,5 +88,5 @@ impl IntegrationTest for TestUpdateFields { fn update() { let log = libccp_integration::logger(); info!(log, "starting update test"); - libccp_integration::run_test::(log, 1); + libccp_integration::run_test::(log, 1, false); } diff --git a/tests/volatile.rs b/tests/volatile.rs index 2701aab..8c1c0c5 100644 --- a/tests/volatile.rs +++ b/tests/volatile.rs @@ -74,5 +74,5 @@ impl IntegrationTest for TestVolatileVars { fn volatile() { let log = libccp_integration::logger(); info!(log, "starting volatile test"); - libccp_integration::run_test::(log, 1); + libccp_integration::run_test::(log, 1, false); }