Skip to content
This repository was archived by the owner on Jan 22, 2025. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 30 additions & 23 deletions src/accountant_skel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,40 +92,40 @@ impl AccountantSkel {
fn process(
&mut self,
r_reader: &streamer::Receiver,
s_sender: &streamer::Sender,
recycler: &streamer::Recycler,
s_responder: &streamer::Responder,
packet_recycler: &streamer::PacketRecycler,
response_recycler: &streamer::ResponseRecycler,
) -> Result<()> {
let timer = Duration::new(1, 0);
let msgs = r_reader.recv_timeout(timer)?;
let msgs_ = msgs.clone();
let msgs__ = msgs.clone();
let rsps = streamer::allocate(recycler);
let rsps = streamer::allocate(response_recycler);
let rsps_ = rsps.clone();
let l = msgs__.read().unwrap().packets.len();
rsps.write()
.unwrap()
.packets
.resize(l, streamer::Packet::default());
{
let mut num = 0;
let mut ursps = rsps.write().unwrap();
for packet in &msgs.read().unwrap().packets {
let sz = packet.size;
let sz = packet.meta.size;
let req = deserialize(&packet.data[0..sz])?;
if let Some(resp) = self.process_request(req) {
let rsp = &mut ursps.packets[num];
if ursps.responses.len() <= num {
ursps
.responses
.resize((num + 1) * 2, streamer::Response::default());
}
let rsp = &mut ursps.responses[num];
let v = serialize(&resp)?;
let len = v.len();
rsp.data[0..len].copy_from_slice(&v);
rsp.size = len;
rsp.set_addr(&packet.get_addr());
rsp.data[..len].copy_from_slice(&v);
rsp.meta.size = len;
rsp.meta.set_addr(&packet.meta.get_addr());
num += 1;
}
}
ursps.packets.resize(num, streamer::Packet::default());
ursps.responses.resize(num, streamer::Response::default());
}
s_sender.send(rsps_)?;
streamer::recycle(recycler, msgs_);
s_responder.send(rsps_)?;
streamer::recycle(packet_recycler, msgs_);
Ok(())
}

Expand All @@ -141,23 +141,30 @@ impl AccountantSkel {
local.set_port(0);
let write = UdpSocket::bind(local)?;

let recycler = Arc::new(Mutex::new(Vec::new()));
let packet_recycler = Arc::new(Mutex::new(Vec::new()));
let response_recycler = Arc::new(Mutex::new(Vec::new()));
let (s_reader, r_reader) = channel();
let t_receiver = streamer::receiver(read, exit.clone(), recycler.clone(), s_reader)?;
let t_receiver = streamer::receiver(read, exit.clone(), packet_recycler.clone(), s_reader)?;

let (s_sender, r_sender) = channel();
let t_sender = streamer::sender(write, exit.clone(), recycler.clone(), r_sender);
let (s_responder, r_responder) = channel();
let t_responder =
streamer::responder(write, exit.clone(), response_recycler.clone(), r_responder);

let t_server = spawn(move || {
if let Ok(me) = Arc::try_unwrap(obj) {
loop {
let e = me.lock().unwrap().process(&r_reader, &s_sender, &recycler);
let e = me.lock().unwrap().process(
&r_reader,
&s_responder,
&packet_recycler,
&response_recycler,
);
if e.is_err() && exit.load(Ordering::Relaxed) {
break;
}
}
}
});
Ok(vec![t_receiver, t_sender, t_server])
Ok(vec![t_receiver, t_responder, t_server])
}
}
1 change: 1 addition & 0 deletions src/result.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ pub enum Error {
RecvTimeoutError(std::sync::mpsc::RecvTimeoutError),
Serialize(std::boxed::Box<bincode::ErrorKind>),
SendError,
Services,
}

pub type Result<T> = std::result::Result<T, Error>;
Expand Down
Loading